Change Data Capture (CDC) is a technique you can use to track row-level changes in database tables in response to create, update, and delete operations. Debezium is a distributed platform that builds on top of CDC features available in different databases (for example, binlog replication with MySQL). Debezium provides a set of Kafka Connect connectors that tap into row-level changes in database table(s), convert those changes into event streams, and then send those streams to Apache Kafka.
Most commonly, Debezium is deployed by using Apache Kafka Connect. Kafka Connect is a framework and runtime for implementing and operating:
- Source connectors such as Debezium that send records into Kafka.
- Sink connectors that propagate records from Kafka topics to other systems.
This tutorial walks you through setting up a CDC-based system on Azure by using Azure Event Hubs (for Kafka), Azure Database for MySQL- Flexible Server, and Debezium. In this case, the Debezium MySQL connector will stream database modifications from MySQL to Kafka topics in Azure Event Hubs.
The architecture of a CDC pipeline based on Debezium, Apache Kafka, and Azure Event Hubs is shown in the following diagram:
In this post, you’ll learn how to:
- Configure and run Kafka Connect with a Debezium MySQL connector.
- Install the Debezium connector.
- Configure Kafka Connect for Event Hubs.
- Start a Kafka Connect cluster with the Debezium connector.
- Test CDC.
- (Optional) Install the FileStreamSink connector.
Prerequisites
Before you begin completing the process outlined in this post, ensure that you have:
- Created an Azure VM running Linux (preferably Ubuntu 20.4) by using the Azure portal.
- Installed Kafka (version 1.1.1, Scala version 2.11), available from kafka.apache.org, or your Azure VM running Linux. For more information, see How to install Kafka on Linux Machine.
- Created an Instance of Azure Database for MySQL - Flexible Server.
- Read through the Event Hubs for Apache Kafka introduction article.
- Create an Event Hubs namespace. An Event Hubs namespace is required to send and receive from any Event Hubs service. See Creating an event hub for instructions on creating a namespace and an event hub. Record the Event Hubs connection string and fully qualified domain name (FQDN) for later use. For instructions, see Get an Event Hubs connection string.
- Install kafkacat (version later than 1.4) from the GitHub repository here.
Configure and run Kafka Connect with the Debezium MySQL connector
This section covers the following topics:
- Installing the Debezium MySQL connector
- Configuring Kafka Connect for Event Hubs
- Starting a Kafka Connect cluster with the Debezium connector
Install the Debezium MySQL connector
- To download the Debezium mysql connector from the archive https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.2.Final/debezium-connector-mysql-1.6.2.Final-plugin.tar.gz, run the following command:
curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.2.Final/debezium-connector-mysql-1.6.2.Final-plugin.tar.gz --output debezium-connector-mysql.tar.gz
2. To extract the tar file contents, run the following command:
tar -xvzf debezium-connector-mysql.tar.gz
You should now see a new folder named debezium-connector-mysql.
3. To copy the connector JAR files to your Kafka installation, run the following command:
export KAFKA_HOME=[path to kafka installation e.g./home/kafka/kafka]
cp debezium-connector-mysql/*.jar $KAFKA_HOME/libs
Note: To confirm that the binaries have been copied, run the following command:
ls -lrt $KAFKA_HOME/libs | grep mysql
Note: For more information on how to install and latest builds, see the Debezium documentation.
Configure Kafka Connect for Event Hubs
When redirecting Kafka Connect throughput from Kafka to Event Hubs, some minimal reconfiguration is necessary. The following sample configuration file, contains detail that shows how to configure the Kafka endpoint on Event Hubs.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=dbz-mysql-cdc-connect-cluster-configs
offset.storage.topic=dbz-mysql-cdc-connect-cluster-offsets
status.storage.topic=dbz-mysql-cdc-connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs
- Edit the configuration file and replace the details below to match your setup.
- {YOUR.EVENTHUBS.FQDN}:9093 with FQDN of your eventhub # e.g. namespace.servicebus.windows.net:9093
- {YOUR.EVENTHUBS.CONNECTION.STRING} with the connection string for your Event Hubs namespace. For instructions on getting the connection string, see Get an Event Hubs connection string. Here's an example configuration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
- {KAFKA.DIRECTORY} # path to the libs directory within the Kafka release ($KAFKA_HOME/libs)
2. After you replace the values, save the updated connect-distributed.properties configuration file.
Start Kafka Connect cluster with Debezium MySQL connector
- Navigate to the location of the Kafka release on your computer.
- To start Kafka Connect, run the following command:
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
Kafka Connect uses the Kafka AdminClient API to automatically create topics with recommended configurations, including compaction. After the connector has started, a quick check of the Event Hub namespace in the Azure portal helps to confirm that the Connect worker's internal topics have been created automatically. Kafka Connect internal topics must use compaction.
3. In the Azure Portal, navigate to Event Hub Namespace --> Event Hubs.
You should see internal topic names that we mentioned in connect-distributed.properties configuration file have been auto-created.
Configure and start the Debezium MySQL source connector
- Create a configuration file named mysql-source-connector.json for the MySQL source connector, and include the following contents:
{
  "name": "classicmodel-connector",
  "config": {
     "connector.class": "io.debezium.connector.mysql.MySqlConnector",
     "database.hostname": "{SERVER-NAME}.mysql.database.azure.com",
     "database.user": "{USER-NAME}",
     "database.port": "3306",
     "database.password": "{PASSWORD}",
     "database.server.id": "1",
     "database.server.name": "mysql-connector-demo",
     "database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
     "database.history.kafka.topic": "dbhistory.classicmodels",
     "include.schema.changes": "true"
   }
}
2. Edit the mysql-source-connector.json configuration file and replace the details below to match your setup.
- {SERVER-NAME}: Replace with the name of the Azure Database for MySQL instance
- {USER-NAME}: Replace with the user name for the database
- {PASSWORD}: Replace with the password for the specified database user account.
Note: For more details on the see Debezium Connector for MySQL.
After you replace the values, save the updated mysql-source-connector.json configuration file.
3. To create an instance of the connector, use the Kafka Connect REST API endpoint by running the following command:
curl -X POST -H "Content-Type: application/json" --data @mysql-source-connector.json http://localhost:8083/connectors
4. To check the status of the connector, run the following command:
curl -s http://localhost:8083/connectors/classicmodel-connector/status
Note that a new topic ID has been created, as shown below:
Test change data capture
To see CDC in action, you’ll need to create/update/delete records in the Azure Database for MySQL database.
- Connect to your Azure Database for MySQL database by running the following command:
mysql -h {SERVER-NAME}.mysql.database.azure.com -u mysqladmin –p
2. Create a table and insert records by running the following command:
CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));
INSERT INTO todos (description, todo_status) VALUES ('setup mysql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');
If you navigate to the Event Hub, you should see counts of requests and messages getting updated to the Event Hub.
We can now look at the contents of the topic to make sure everything is working as expected. The below example uses kafkacat, but you can also create a consumer using any of the options listed in the article Apache Kafka developer guide for Azure Event Hubs.
For the purposes of this blog post, we’re using version 1.5, which you can check by running the command:
kafkacat -V
- Create a file named kafkacat.conf with the following contents:
metadata.broker.list={YOUR.EVENTHUBS.FQDN}:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password={YOUR.EVENTHUBS.CONNECTION.STRING}
2. Edit the kafkacat.conf file and replace the details below to match your setup.
- {YOUR.EVENTHUBS.FQDN}:9093 – Replace with the FQDN of your eventhub #, for example namespace.servicebus.windows.net:9093
- {YOUR.EVENTHUBS.CONNECTION.STRING} – Replace with the connection string for your Event Hubs namespace.
- 
For instructions on getting the connection string, see Get an Event Hubs connection string. 
- 
Here's an example configuration: 
- 
sasl.password=Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX 
- 5. After you replace the values, save the updated kafkacat.conf file.
- 
6. In a different terminal, to start a session for the Kafka connect, run the following commands: 
export KAFKACAT_CONFIG=kafkacat.conf
export BROKER={YOUR.EVENTHUBS.FQDN}:9093
export TOPIC=mysql-connector-demo.classicmodels.todoskafkacat -b $BROKER -t $TOPIC -o beginning
- 7. Insert/update/delete database records, and monitor the records.
- You should see the JSON payloads representing the change data events generated in response to the rows you had just added to the todos table.  For example, INSERT INTO todos (description, todo_status) VALUES ('to check from kafkacat', 'complete');
- 
Here’s a snippet of the payload: 
{
   "payload":{
      "before":null,
      "after":{
      "id":104,
      "description":"to check from kafkacat",
      "todo_status":"complete"
   },
   "source":{
       "version":"1.6.2.Final",
       "connector":"mysql",
       "name":"mysql-connector-demo",
       "ts_ms":1631598198000,
       "snapshot":"false",
       "db":"classicmodels",
       "sequence":null,
       "table":"todos",
       "server_id":3237408199,
       "gtid":"d5978bbf-bf8f-11eb-9c11-000d3a1e8e2e:347",
       "file":"mysql-bin.000053",
       "pos":51636,
       "row":0,
       "thread":null,
       "query":null
   },
   "op":"c",
   "ts_ms":1631598198431,
   "transaction":null
  }
}
Install FileStreamSink connector (optional)
Now that all the todos table changes are being captured in the Event Hubs topic, we’ll use the FileStreamSink connector (available by default in Kafka Connect) to consume these events.
- Create a configuration file (file-sink-connector.json) for the connector, which contains the following content:
{
     "name": "cdc-file-sink",
     "config": {
     "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
     "tasks.max": "1",
     "topics": "mysql-connector-demo.classicmodels.todos",
     "file": "./todos-cdc.txt>"}
}
2. Replace the file attribute based on your file system, and then save the file.
3. To create the connector and check its status, run the following command:
curl -X POST -H "Content-Type: application/json"--data @file-sink-connector.json http://localhost:8083/connector
4. To check the status, run the following command:
curl http://localhost:8083/connectors/cdc-file-sink/status
5. Insert/update/delete database records, and monitor the records in the configured output sink file by running the following command:
tail -f ./todos-cdc.txt
Note: This approach also works with Azure Database for MySQL - Single Server.
Conclusion
This concludes my guidance on setting up a CDC-based system on Azure by using Azure Event Hubs (for Kafka), Azure Database for MySQL- Flexible Server, and Debezium.
CDC captures incremental changes in the original database so that they can be propagated to other databases or applications in near real-time. Though for demonstration purposes I have used managed Azure services, these instructions should work for any other setup, for example a local Kafka cluster and MYSQL instance. After change event records are in Apache Kafka, different connectors in the Kafka Connect ecosystem can stream the records to other systems and databases such as Elasticsearch, data warehouses, and analytics systems, as well as caches such as Infinispan.
Supportability of the solution
Use of the Apache Kafka Connect framework as well as the Debezium platform and its connectors are not eligible for product support through Microsoft.
Many Apache Kafka Connect scenarios will be functional, but the conceptual differences between Apache Kafka's and Azure Event Hubs' retention models may result in certain configurations that do not work as expected.
Posted at https://sl.advdat.com/39Ix7CO
