This article will explain the process of sending Json schema formatted topics from an HDInsight managed Kafka standalone server to a MySQL DB. The steps can be extended for a distributed system also. We have used Ubuntu 18.0.4 machines for the cluster.
There are some prerequisite steps:
- Create a HDInsight Managed Kafka cluster with OSS Kafka Schema Registry. You can follow this link for reference.
- Start the Schema Registry service in a separate console. You can refer to the above link.
- Create an Azure Mysql DB and a private link to access it. Create the required database, in this article, we will use a sample DB with one “id” field.
Detail Steps:
- Install mysql jdbc driver
$sudo apt-get install -y libmysql-java
This will install MySQL JDBC driver in your machine in the location /usr/share/java
Please place the mysql-connector-java-{version}.jar file in the appropriate plugin path.
To find the plugin path use the below command
$sudo find / -name kafka-connect-jdbc\*.jar
You will find the plugin paths, please put the jar file in the appropriate plugin path, if you skip this step, you may get "No Suitable Driver" found error.
- Configure Kafka Connect in standalone mode
Acquire the Zookeeper and Kafka broker data. Set up password variable. Replace PASSWORD with the cluster login password, then enter the command
$export password='PASSWORD'
Extract the correctly cased cluster name
$export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Extract the Kafka Zookeeper hosts
$export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
To extract Kafka Broker information into the variable KAFKABROKERS use the below command
$export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Copy the connect-standalone.properties to connect-standalone.properties-1 and populate the properties as shown below.
$sudo cp /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-standalone-1.properties
bootstrap.servers=<Enter the full contents of $KAFKABROKERS>
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets1
offset.flush.interval.ms=10000
rest.port=8085
plugin.path=/usr/hdp/current/kafka-broker/connectors/confluentinc-kafka-connect-jdbc-10.2.0
- Deploy the JDBC sink Kafka Connect Plugins
Download the relevant Kafka Plugins from the Confluent Hub to your local desktop
Create a new folder path on the edge node and set its properties
$sudo mkdir /usr/hdp/current/kafka-broker/connectors
$sudo chmod 777 /usr/hdp/current/kafka-broker/connectors
Using WINSCP or any other SCP tool of your choice upload the Kafka Connect plugins into the folder path /usr/hdp/current/kafka-broker/connectors
- Configure Kafka Connect plugin for mysql jdbc sink connector
$sudo vi /usr/hdp/current/kafka-broker/connectors/mysql.properties
name=<name of the connector>
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=3
connection.url=jdbc:mysql://<mysqlserverNameFQDN>:3306/<databaseName>
connection.user=<userName>@<mysqlserverName>
connection.password=<Password>
table.name.format=<tableName>
auto.create=true
topics=<topiceName>
- Start the sink connector
In a new session start the sink connector
$sudo /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone-1.properties /usr/hdp/current/kafka-broker/connectors/mysql.properties
- Create a Kafka topic
In a new session create a new topic
$/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic <sampleTopic name> --zookeeper $KAFKAZKHOSTS
- Send data into the topic
$/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic <sampleTopic name>
> {"schema": {"type": "struct", "fields": [{"type": "int32", "optional": true, "field": "id" }], "optional": false, "name": "foobar"},"payload": {"id": 40000}}
For more details regarding Json Schema formats please refer here.
- Check the mysql db for the new entry.
Please login to the mysql db and you should find a new entry to the database.
References:
Kafka Connect with HDInsight Managed Kafka - Microsoft Tech Community
JDBC Connector (Source and Sink) | Confluent Hub
HDInsight Managed Kafka with OSS Kafka Schema Registry - Microsoft Tech Community
Kafka Connect in Action: JDBC Sink - YouTube
Installing a JDBC driver for the Kafka Connect JDBC connector - YouTube
(88) Kafka Connect mysql Sink Example Part 2 - YouTube
Kafka Connect mySQL Examples (supergloo.com)
Posted at https://sl.advdat.com/389JKGi