Thursday, March 3, 2022

Use Java to validate schema when sending and receiving events in Azure Event Hub

I believe most of you have already read this article which provides a tutorial for how to validate schema via AMQP and .NET. However, there are lots of Event Hub users currently using JAVA SDK in their projects. Hence, there is an increasing demand for using JAVA to validate schema when sending and receiving events in Azure Event Hub. This article will provide a sample on how to implement serialize and deserialize by using JAVA.

 

There are two sections in this blog. I will first introduce the Event Hub schema feature briefly in section 1 and then put a sample code in section 2.

 

Section 1: Event Hub Schema

In this section, I will first shortly explain what serialization and deserialization is and then introduce the Schema. If you are already familiar with these concepts, please skip this section.

  • Serialization & Deserialization

Serialization: converting an object to bytes.

Deserialization: converting a series of bytes to a replica of the original object.

.

  • What is Azure Schema Registry and how it is used in Azure Event Hub?

 

Azure Schema Registry is a feature of Event Hubs, which provides a central repository for schemas for event-driven and messaging-centric applications.

Hildat_0-1646292961087.png

 

Figure 1

 

You can find this feature at Schema Registry blade in Azure Event Hub. You can easily add a Schema group and schema in the Azure portal by clicking on add schema group.

Hildat_1-1646292961096.png

 

Figure 2

Apache Avro is one of the schema-driven formats to serialize or deserialize data. As the workflow is shown in Figure 2, an event producer serializes event payload by referencing schema from Azure Schema Registry and then publish event to Azure Event Hub. The event consumer will read the event payload from the event hub and deserialize the data.

 

Section 2: Java Implementation.

 

Pre-request:

  1. If you don’t know how to create a Maven project or are still not familiar with how to use java to create sender and receiver in Azure Event Hub, you can refer to this document.
  2. You can create an Azure Event Hub schema by following this article.

 

Checklist:

  1. Microsoft Azure Subscription.
  2. An Event Hub namespace and an event hub.
  3. Azure Event Hubs schema registry.
  4. An Azure Storage and a blob container.
  5. Java development environment, such as Eclipse.
  6. Java Development Kit (JDK) with version 8 or above is required.

 

Step 1. Add reference to Azure Event Hubs library

 

The Java client library for Event Hubs is available in the Maven Central Repository. You can reference this library using the following dependency declaration inside your Maven project file:

 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>EventhubSchema</groupId>
  <artifactId>EventhubSchema</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
	   <dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.7.0</version>
	   </dependency>
	    <dependency>
	        <groupId>com.azure</groupId>
	        <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
	        <version>1.6.0</version>
	    </dependency>
	    <dependency>
	        <groupId>com.azure</groupId>
	        <artifactId>azure-data-schemaregistry</artifactId>
	        <version>1.0.2</version>
	    </dependency>
	    <dependency>
	        <groupId>com.azure</groupId>
	        <artifactId>azure-data-schemaregistry-avro</artifactId>
	        <version>1.0.0-beta.5</version>
	    </dependency>
	    <dependency>
	        <groupId>com.azure</groupId>
	        <artifactId>azure-identity</artifactId>
	        <version>1.4.4</version>
	    </dependency> 
	    <dependency>
	        <groupId>com.azure</groupId>
	        <artifactId>azure-core</artifactId>
	        <version>1.22.0-beta.1</version>
	    </dependency>
   </dependencies>
</project>

 

Step 2. Create a test schema.

 

public class TestSchemaData {
   private int dataInt; 
   private String dataString;
   
   public void setDataInt(int data)
   {
	   dataInt = data;
   }
  
   public void setDataString(String data)
   {
	   dataString = data;
   }
    
   public int getDataInt()
   {
	   return dataInt;
   } 

   public String getDataString()
   {
	   return dataString;
   }
}

 

 

 

Step 3. Create an event producer and an event consumer.

 

According to the above workflow, we need an event producer that will serialize events and publish data to Event Hub and an event consumer that will be responsible for reading data and deserializing it to events.

 

Add a class named SchemaSenderReceiver, and add the following code to the class:

 

import com.azure.data.schemaregistry.SchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializer;
import com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializerBuilder;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.messaging.eventhubs.*;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import com.azure.messaging.eventhubs.*;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.*;
import com.azure.storage.blob.*;
import java.util.function.Consumer;
 
public class SchemaSenderReceiver {
    private static final String namespaceName = "[event hub namespace]";
    private static final String schemaGroupName = "[schema group name]";
    private static final String connectionString = "[Eventhun namespace connection string]"
     ;
    private static final String storageConnectionString = "[sotrage account connectin string]"
    private static final String storageContainerName = "[storage account name]";

    public static void main(String[] args) {
        consumeEvents();
        publishEvents();

        System.out.println("Press enter to stop.");
        try {
			System.in.read();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
        System.out.println("Exiting process");
    }

 

Add the following method.

 

public static void consumeEvents() {
        // Create a blob container client that you use later to build an event processor client to receive and process events
        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(storageConnectionString)
            .containerName(storageContainerName)
            .buildAsyncClient();

        // Create a builder object that you will use later to build an event processor client to receive and process events and errors.
        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
            .connectionString(connectionString, eventHubName)
            .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
            .processEvent(PARTITION_PROCESSOR)
            .processError(ERROR_HANDLER)
            .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));

        // Use the builder object to create an event processor client
        EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();

        System.out.println("Starting event processor");
        eventProcessorClient.start();
  
}

    public static void publishEvents() {
        // create a producer client
        EventHubProducerClient producer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData(GenerateSchemaEvent())); 

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }

 

Add the message process callback function and error process callback function(PARTITION_PROCESSOR & ERROR_HANDLER)

 

public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
        
        TestSchemaData dt = DeserializeSchemaEvent(eventData.getBody());

        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());

        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };

    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
};

 

Step 4. Implement Serialize and deserialize.

 

We will create SchemaRegistryAsyncClient through the SchemaRegistryClientBuilder in order to get interaction with Azure Schema Registry. This process can be finished by enabling Azure Active Directory. Register a new AAD application and grant access to Schema Registry service.

 

With reference to schema format, schemaRegistryAvroSerializer.serialize can serialize the object. Similarly, schemaRegistryAvroSerializer.deserialize will deserialize the data to object by referring to the same schema.

 

 public static byte[] GenerateSchemaEvent()
    { 
        TokenCredential tokenCredential = new ClientSecretCredentialBuilder()
                .tenantId("myTenantId")
                .clientId("myClientId")
                .clientSecret("myClientSecret")
                .build();
        
        SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBuilder()
        	    .fullyQualifiedNamespace(namespaceName)
        	    .credential(tokenCredential)
        	    .buildAsyncClient();
     
        SchemaRegistryAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryAvroSerializerBuilder()
                .schemaRegistryAsyncClient(schemaRegistryAsyncClient)
                .schemaGroup(schemaGroupName)
                .avroSpecificReader(true)
                .autoRegisterSchema(true)
                .buildSerializer();
   
   
        TestSchemaData ts = new TestSchemaData();
        ts.setDataInt(5); 
        ts.setDataString("Hi,mate...");
   
        ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
   
        schemaRegistryAvroSerializer.serialize(byteOutStream, ts);    	
    	return byteOutStream.toByteArray();
    }


public static TestSchemaData DeserializeSchemaEvent(byte[] message)
    {
        TokenCredential tokenCredential = new ClientSecretCredentialBuilder()
                .tenantId("myTenantId")
                .clientId("myClientId")
                .clientSecret("myClientSecret")
                .build();
        
        SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBuilder()
        	    .fullyQualifiedNamespace(namespaceName)
        	    .credential(tokenCredential)
        	    .buildAsyncClient();
     
        SchemaRegistryAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryAvroSerializerBuilder()
                .schemaRegistryAsyncClient(schemaRegistryAsyncClient)
                .schemaGroup(schemaGroupName)
                .avroSpecificReader(true)
                .autoRegisterSchema(true)
                .buildSerializer();
        
        ByteArrayInputStream in = new ByteArrayInputStream(message);
        
        TestSchemaData testData = schemaRegistryAvroSerializer.deserialize(in, TypeReference.createInstance(TestSchemaData.class));
    	return testData;
    }

 

THE END

 

Attached complete SchemaSenderReceiver code here:

import com.azure.core.credential.TokenCredential;
import com.azure.core.util.serializer.TypeReference;
import com.azure.data.schemaregistry.SchemaRegistryAsyncClient;
import com.azure.data.schemaregistry.SchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializer;
import com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializerBuilder;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.messaging.eventhubs.*;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import com.azure.messaging.eventhubs.*;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.*;
import com.azure.storage.blob.*;
import java.util.function.Consumer;
 
public class SchemaSenderReceiver {
    private static final String namespaceName = "yh-eh-standard";
    private static final String schemaGroupName = "haiSchemaGroup";
    private static final String connectionString = "Endpoint=sb://yh-eh-standard.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=EzXseM7hREWY81Egm+vN2avBcEbUmgZYkNEUJrGXb48=";
    private static final String eventHubName = "hubmultipleconsumer";
     ;
    private static final String storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=yhcustomer;AccountKey=fKLzTqP3Ov4O4aDLKIj5gnJxNCZt0RRrDqgqV3Pr+A6sAasxSqJEewfIp/rQ2lQTfE8X6iuxh6smePUDUSHvWA==;EndpointSuffix=core.windows.net";
    private static final String storageContainerName = "eventhubep";

    public static void main(String[] args) {
        consumeEvents();
        publishEvents();

        System.out.println("Press enter to stop.");
        try {
			System.in.read();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
        System.out.println("Exiting process");
    }

    public static void consumeEvents() {
        // Create a blob container client that you use later to build an event processor client to receive and process events
        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(storageConnectionString)
            .containerName(storageContainerName)
            .buildAsyncClient();

        // Create a builder object that you will use later to build an event processor client to receive and process events and errors.
        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
            .connectionString(connectionString, eventHubName)
            .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
            .processEvent(PARTITION_PROCESSOR)
            .processError(ERROR_HANDLER)
            .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));

        // Use the builder object to create an event processor client
        EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();

        System.out.println("Starting event processor");
        eventProcessorClient.start();
  
    }
    
    public static void publishEvents() {
        // create a producer client
        EventHubProducerClient producer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData(GenerateSchemaEvent())); 

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }
    
    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
        
        TestSchemaData dt = DeserializeSchemaEvent(eventData.getBody());

        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());

        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };

    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };    
    
    public static byte[] GenerateSchemaEvent()
    { 
        TokenCredential tokenCredential = new ClientSecretCredentialBuilder()
                .tenantId("myTenantId")
                .clientId("myClientId")
                .clientSecret("myClientSecret")
                .build();
        
        SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBuilder()
        	    .fullyQualifiedNamespace(namespaceName)
        	    .credential(tokenCredential)
        	    .buildAsyncClient();
     
        SchemaRegistryAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryAvroSerializerBuilder()
                .schemaRegistryAsyncClient(schemaRegistryAsyncClient)
                .schemaGroup(schemaGroupName)
                .avroSpecificReader(true)
                .autoRegisterSchema(true)
                .buildSerializer();
   
   
        TestSchemaData ts = new TestSchemaData();
        ts.setDataInt(5); 
        ts.setDataString("Hi,mate...");
   
        ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
   
        schemaRegistryAvroSerializer.serialize(byteOutStream, ts);    	
    	return byteOutStream.toByteArray();
    }
    

    public static TestSchemaData DeserializeSchemaEvent(byte[] message)
    {
        TokenCredential tokenCredential = new ClientSecretCredentialBuilder()
                .tenantId("myTenantId")
                .clientId("myClientId")
                .clientSecret("myClientSecret")
                .build();
        
        SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBuilder()
        	    .fullyQualifiedNamespace(namespaceName)
        	    .credential(tokenCredential)
        	    .buildAsyncClient();
     
        SchemaRegistryAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryAvroSerializerBuilder()
                .schemaRegistryAsyncClient(schemaRegistryAsyncClient)
                .schemaGroup(schemaGroupName)
                .avroSpecificReader(true)
                .autoRegisterSchema(true)
                .buildSerializer();
        
        ByteArrayInputStream in = new ByteArrayInputStream(message);
        
        TestSchemaData testData = schemaRegistryAvroSerializer.deserialize(in, TypeReference.createInstance(TestSchemaData.class));
    	return testData;
    }
    
}

//ref: https://docs.microsoft.com/en-us/java/api/overview/azure/data-schemaregistry-readme?view=azure-java-stable
//ref: https://docs.microsoft.com/en-us/answers/questions/251186/sending-and-receiving-avro-formatted-events-to-eve.html
//ref: https://github.com/Azure/azure-sdk-for-java/search?q=fullyQualifiedNamespace
Posted at https://sl.advdat.com/3IGFuP0https://sl.advdat.com/3IGFuP0