Monday, September 27, 2021

CDC in Azure Database for MySQL – Flexible Server using Kafka, Debezium, and Azure Event Hubs

Change Data Capture (CDC) is a technique you can use to track row-level changes in database tables in response to create, update, and delete operations. Debezium is a distributed platform that builds on top of CDC features available in different databases (for example, binlog replication with MySQL). Debezium provides a set of Kafka Connect connectors that tap into row-level changes in database table(s), convert those changes into event streams, and then send those streams to Apache Kafka.

 

Most commonly, Debezium is deployed by using Apache Kafka Connect. Kafka Connect is a framework and runtime for implementing and operating:

  • Source connectors such as Debezium that send records into Kafka.
  • Sink connectors that propagate records from Kafka topics to other systems.

This tutorial walks you through setting up a CDC-based system on Azure by using Azure Event Hubs (for Kafka), Azure Database for MySQL- Flexible Server, and Debezium. In this case, the Debezium MySQL connector will stream database modifications from MySQL to Kafka topics in Azure Event Hubs.

 

The architecture of a CDC pipeline based on Debezium, Apache Kafka, and Azure Event Hubs is shown in the following diagram:

 

CDC_cropped.png

 

In this post, you’ll learn how to:

  • Configure and run Kafka Connect with a Debezium MySQL connector.
    • Install the Debezium connector.
    • Configure Kafka Connect for Event Hubs.
    • Start a Kafka Connect cluster with the Debezium connector.
  • Test CDC.
  • (Optional) Install the FileStreamSink connector.

Prerequisites

Before you begin completing the process outlined in this post, ensure that you have:

Configure and run Kafka Connect with the Debezium MySQL connector

This section covers the following topics:

  • Installing the Debezium MySQL connector
  • Configuring Kafka Connect for Event Hubs
  • Starting a Kafka Connect cluster with the Debezium connector

Install the Debezium MySQL connector

  1. To download the Debezium mysql connector from the archive https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.2.Final/debezium-connector-mysql-1.6.2.Final-plugin.tar.gz, run the following command:

 

curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.2.Final/debezium-connector-mysql-1.6.2.Final-plugin.tar.gz --output debezium-connector-mysql.tar.gz

 

2. To extract the tar file contents, run the following command:

 

tar -xvzf debezium-connector-mysql.tar.gz

 

You should now see a new folder named debezium-connector-mysql.

 

3. To copy the connector JAR files to your Kafka installation, run the following command:

 

export KAFKA_HOME=[path to kafka installation e.g./home/kafka/kafka]
cp debezium-connector-mysql/*.jar $KAFKA_HOME/libs

 

Note: To confirm that the binaries have been copied, run the following command:

 

ls -lrt $KAFKA_HOME/libs | grep mysql

 

1_Confirm_connecotr.jpg

 

Note: For more information on how to install and latest builds, see the Debezium documentation.

 

Configure Kafka Connect for Event Hubs

When redirecting Kafka Connect throughput from Kafka to Event Hubs, some minimal reconfiguration is necessary. The following sample configuration file, contains detail that shows how to configure the Kafka endpoint on Event Hubs.

 

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=dbz-mysql-cdc-connect-cluster-configs
offset.storage.topic=dbz-mysql-cdc-connect-cluster-offsets
status.storage.topic=dbz-mysql-cdc-connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs

 

  1. Edit the configuration file and replace the details below to match your setup.
  • {YOUR.EVENTHUBS.FQDN}:9093 with FQDN of your eventhub # e.g. namespace.servicebus.windows.net:9093
  • {YOUR.EVENTHUBS.CONNECTION.STRING} with the connection string for your Event Hubs namespace. For instructions on getting the connection string, see Get an Event Hubs connection string. Here's an example configuration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
  • {KAFKA.DIRECTORY} # path to the libs directory within the Kafka release ($KAFKA_HOME/libs)

2. After you replace the values, save the updated connect-distributed.properties configuration file.

 

Start Kafka Connect cluster with Debezium MySQL connector

  1. Navigate to the location of the Kafka release on your computer.
  2. To start Kafka Connect, run the following command:

 

./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties

 

Kafka Connect uses the Kafka AdminClient API to automatically create topics with recommended configurations, including compaction. After the connector has started, a quick check of the Event Hub namespace in the Azure portal helps to confirm that the Connect worker's internal topics have been created automatically. Kafka Connect internal topics must use compaction.

 

3. In the Azure Portal, navigate to Event Hub Namespace --> Event Hubs.

 

You should see internal topic names that we mentioned in connect-distributed.properties configuration file have been auto-created.

 

2_Internal_Topic.jpg

 

Configure and start the Debezium MySQL source connector

  1. Create a configuration file named mysql-source-connector.json for the MySQL source connector, and include the following contents:

 

{
"name": "classicmodel-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "{SERVER-NAME}.mysql.database.azure.com",
"database.user": "{USER-NAME}",
"database.port": "3306",
"database.password": "{PASSWORD}",
"database.server.id": "1",
"database.server.name": "mysql-connector-demo",
"database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
"database.history.kafka.topic": "dbhistory.classicmodels",
"include.schema.changes": "true"
}
}

 

2. Edit the mysql-source-connector.json configuration file and replace the details below to match your setup.

 

  • {SERVER-NAME}: Replace with the name of the Azure Database for MySQL instance
  • {USER-NAME}: Replace with the user name for the database
  • {PASSWORD}: Replace with the password for the specified database user account.

Note: For more details on the see Debezium Connector for MySQL.

 

After you replace the values, save the updated mysql-source-connector.json configuration file.

 

3.  To create an instance of the connector, use the Kafka Connect REST API endpoint by running the following command:

 

curl -X POST -H "Content-Type: application/json" --data @mysql-source-connector.json http://localhost:8083/connectors

 

4. To check the status of the connector, run the following command:

 

curl -s http://localhost:8083/connectors/classicmodel-connector/status

 

3_connector_status.jpg

 

Note that a new topic ID has been created, as shown below:

 

4_Connetor_eventhub.jpg

 

Test change data capture

To see CDC in action, you’ll need to create/update/delete records in the Azure Database for MySQL database.

  1. Connect to your Azure Database for MySQL database by running the following command:

 

mysql -h {SERVER-NAME}.mysql.database.azure.com -u mysqladmin –p

 

2. Create a table and insert records by running the following command:

 

CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));
INSERT INTO todos (description, todo_status) VALUES ('setup mysql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');

 

If you navigate to the Event Hub, you should see counts of requests and messages getting updated to the Event Hub.

 

5_Connetor_eventhub_changes.jpg

 

We can now look at the contents of the topic to make sure everything is working as expected. The below example uses kafkacat, but you can also create a consumer using any of the options listed in the article Apache Kafka developer guide for Azure Event Hubs.

 

For the purposes of this blog post, we’re using version 1.5, which you can check by running the command:

 

kafkacat -V​

 

  1.  Create a file named kafkacat.conf with the following contents:

 

metadata.broker.list={YOUR.EVENTHUBS.FQDN}:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password={YOUR.EVENTHUBS.CONNECTION.STRING}​

 

2. Edit the kafkacat.conf file and replace the details below to match your setup.

  • {YOUR.EVENTHUBS.FQDN}:9093 – Replace with the FQDN of your eventhub #, for example namespace.servicebus.windows.net:9093
  • {YOUR.EVENTHUBS.CONNECTION.STRING} – Replace with the connection string for your Event Hubs namespace.
  1. For instructions on getting the connection string, see Get an Event Hubs connection string.

  2.  

    Here's an example configuration:

     

  3. sasl.password=Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX

  4.  
  5. 5. After you replace the values, save the updated kafkacat.conf file.
  6. 6. In a different terminal, to start a session for the Kafka connect, run the following commands:

  7.  
export KAFKACAT_CONFIG=kafkacat.conf
export BROKER={YOUR.EVENTHUBS.FQDN}:9093
export TOPIC=mysql-connector-demo.classicmodels.todoskafkacat -b $BROKER -t $TOPIC -o beginning
  1.  
  2. 7. Insert/update/delete database records, and monitor the records.
  3.  
  4. You should see the JSON payloads representing the change data events generated in response to the rows you had just added to the todos table.  For example, INSERT INTO todos (description, todo_status) VALUES ('to check from kafkacat', 'complete');

     

  5. Here’s a snippet of the payload:

  6.  
{
"payload":{
"before":null,
"after":{
"id":104,
"description":"to check from kafkacat",
"todo_status":"complete"
},
"source":{
"version":"1.6.2.Final",
"connector":"mysql",
"name":"mysql-connector-demo",
"ts_ms":1631598198000,
"snapshot":"false",
"db":"classicmodels",
"sequence":null,
"table":"todos",
"server_id":3237408199,
"gtid":"d5978bbf-bf8f-11eb-9c11-000d3a1e8e2e:347",
"file":"mysql-bin.000053",
"pos":51636,
"row":0,
"thread":null,
"query":null
},
"op":"c",
"ts_ms":1631598198431,
"transaction":null
}
}

 

Install FileStreamSink connector (optional)

Now that all the todos table changes are being captured in the Event Hubs topic, we’ll  use the FileStreamSink connector (available by default in Kafka Connect) to consume these events.

 

  1. Create a configuration file (file-sink-connector.json) for the connector, which contains the following content:

 

{
"name": "cdc-file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "mysql-connector-demo.classicmodels.todos",
"file": "./todos-cdc.txt>"}
}​

 

2. Replace the file attribute based on your file system, and then save the file.
3. To create the connector and check its status, run the following command:

 

curl -X POST -H "Content-Type: application/json"--data @file-sink-connector.json http://localhost:8083/connector

 

4. To check the status, run the following command:

 

curl http://localhost:8083/connectors/cdc-file-sink/status

 

5. Insert/update/delete database records, and monitor the records in the configured output sink file by running the following command:

 

tail -f ./todos-cdc.txt

 

Note: This approach also works with Azure Database for MySQL - Single Server.

 

Conclusion

This concludes my guidance on setting up a CDC-based system on Azure by using Azure Event Hubs (for Kafka), Azure Database for MySQL- Flexible Server, and Debezium.

 

CDC captures incremental changes in the original database so that they can be propagated to other databases or applications in near real-time. Though for demonstration purposes I have used managed Azure services, these instructions should work for any other setup, for example a local Kafka cluster and MYSQL instance. After change event records are in Apache Kafka, different connectors in the Kafka Connect ecosystem can stream the records to other systems and databases such as Elasticsearch, data warehouses, and analytics systems, as well as caches such as Infinispan.

 

Supportability of the solution

Use of the Apache Kafka Connect framework as well as the Debezium platform and its connectors are not eligible for product support through Microsoft.

 

Many Apache Kafka Connect scenarios will be functional, but the conceptual differences between Apache Kafka's and Azure Event Hubs' retention models may result in certain configurations that do not work as expected.

 

Posted at https://sl.advdat.com/39Ix7CO