Wednesday, August 25, 2021

Sending Kafka Topics into MySQL DB using JDBC connector hosted in an HDInsight Managed Cluster

This article will explain the process of sending Json schema formatted topics from an HDInsight managed Kafka standalone server to a MySQL DB. The steps can be extended for a distributed system also. We have used Ubuntu 18.0.4 machines for the cluster.

There are some prerequisite steps:

  • Create a HDInsight Managed Kafka cluster with OSS Kafka Schema Registry. You can follow this link for reference.
  • Start the Schema Registry service in a separate console. You can refer to the above link.
  • Create an Azure Mysql DB and a private link to access it. Create the required database, in this article, we will use a sample DB with one “id” field.

Detail Steps:

  1. Install mysql jdbc driver

$sudo apt-get install -y libmysql-java

 

This will install MySQL JDBC driver in your machine in the location /usr/share/java

Please place the mysql-connector-java-{version}.jar file in the appropriate plugin path.

To find the plugin path use the below command

 

$sudo find / -name kafka-connect-jdbc\*.jar

 

You will find the plugin paths, please put the jar file in the appropriate plugin path, if you skip this step, you may get "No Suitable Driver" found error.

 

  1. Configure Kafka Connect in standalone mode

 

Acquire the Zookeeper and Kafka broker data. Set up password variable. Replace PASSWORD with the cluster login password, then enter the command

 

$export password='PASSWORD'

 

Extract the correctly cased cluster name

 

$export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')

 

Extract the Kafka Zookeeper hosts

 

$export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);

 

To extract Kafka Broker information into the variable KAFKABROKERS use the below command

 

$export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);

 

Copy the connect-standalone.properties to connect-standalone.properties-1 and populate the properties as shown below.

 

$sudo cp /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-standalone-1.properties

 

bootstrap.servers=<Enter the full contents of $KAFKABROKERS>

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false

value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets1

offset.flush.interval.ms=10000

rest.port=8085

plugin.path=/usr/hdp/current/kafka-broker/connectors/confluentinc-kafka-connect-jdbc-10.2.0

 

  1. Deploy the JDBC sink Kafka Connect Plugins

Download the relevant Kafka Plugins from the Confluent Hub to your local desktop

Create a new folder path on the edge node and set its properties

$sudo mkdir /usr/hdp/current/kafka-broker/connectors

$sudo chmod 777 /usr/hdp/current/kafka-broker/connectors

Using WINSCP or any other SCP tool of your choice upload the Kafka Connect plugins into the folder path /usr/hdp/current/kafka-broker/connectors

 

  1. Configure Kafka Connect plugin for mysql jdbc sink connector

$sudo vi /usr/hdp/current/kafka-broker/connectors/mysql.properties

 

name=<name of the connector>

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

tasks.max=3

connection.url=jdbc:mysql://<mysqlserverNameFQDN>:3306/<databaseName>

connection.user=<userName>@<mysqlserverName>

connection.password=<Password>

table.name.format=<tableName>

auto.create=true

topics=<topiceName>

 

  1. Start the sink connector

In a new session start the sink connector

$sudo /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone-1.properties /usr/hdp/current/kafka-broker/connectors/mysql.properties

 

  1. Create a Kafka topic

 

In a new session create a new topic

 

$/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic <sampleTopic name> --zookeeper $KAFKAZKHOSTS

 

  1. Send data into the topic

$/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic <sampleTopic name>

> {"schema": {"type": "struct", "fields": [{"type": "int32", "optional": true, "field": "id" }], "optional": false, "name": "foobar"},"payload": {"id": 40000}}

For more details regarding Json Schema formats please refer here.

 

  1. Check the mysql db for the new entry.

Please login to the mysql db and you should find a new entry to the database.

 

 

References:

Kafka Connect with HDInsight Managed Kafka - Microsoft Tech Community

JDBC Connector (Source and Sink) | Confluent Hub

HDInsight Managed Kafka with OSS Kafka Schema Registry - Microsoft Tech Community

Kafka Connect - JsonDeserializer with schemas.enable requires “schema” and “payload” fields (rmoff.net)

Kafka Connect in Action: JDBC Sink - YouTube

Installing a JDBC driver for the Kafka Connect JDBC connector - YouTube

(88) Kafka Connect mysql Sink Example Part 2 - YouTube

Kafka Connect mySQL Examples (supergloo.com)

Posted at https://sl.advdat.com/389JKGi