Wednesday, December 8, 2021

Bridge Building – Mosquitto MQTT to Azure IOT Using Paho MQTT and the Azure IoT Device SDK’s

Paho MQTT with the Azure IoT Device SDK = Awesome!Paho MQTT with the Azure IoT Device SDK = Awesome!I have been on a journey to explore Azure IoT and push the thousands of events that flow through my local MQTT broker (Mosquitto) in to Azure IoT Hub. From direct connection to Azure IoT Hub (via MQTT and SAS tokens) through to Azure IoT Edge running locally with MQTT. I have been able to achieve my goals with varying levels of success, but have a few concerns on the approaches I have tried thus far.

  • Direct-Connection to Azure IoT Hub introduces latency to the cloud.
  • Authentication, from SAS tokens to X509 certificates, its not anonymous and some of my tiny devices (Tasmota) dont bode well.
  • Topic structure, it is defined (devices/{DeviceID}/messsages/events/) and not free form. It means reconfiguration, which isn’t hard, but a lot of friction.

Channeling my inner Steve Balmer (developer, developer, developer), it’s time to build. My goals for building a solution

  1. No reconfiguration of any of my MQTT devices (Home Assistant, PLC, Arduino Mega 2560, ~75 Tasmota devices).
  2. Bridge my existing MQTT broker (Mosquitto) in to Azure IoT Hub.
  3. Must run on aarch64 architecture, a Raspberry Pi.

Pretty lofty goals, you may even say I am being lazy, but the reality is I want a low friction away to derive operational intelligence from the many thousands of events each day (read below, its over 10K per day!)And for that we need to get our hands dirty, write some code and use SDK’s.

 

What we are going to build

 

Using Paho + Azure IOT SDK we will listen to MosquittoUsing Paho + Azure IOT SDK we will listen to Mosquitto

To overcome, the limitations described above we are going to build an application in Python using the Python MQTT library Paho and the Azure Python IoT SDK. Lets quickly talk about both of these.

 

Paho MQTT

 

Paho is a Python client class which enable applications to connect to an MQTT broker to publish messages, to subscribe to topics and receive published messages. It also provides some helper functions to make publishing one off messages to an MQTT server very straightforward. This is what we will be using to listen to messages on our Mosquitto broker. For examples and more you can more information about the Paho MQTT module on the pypi.org website.To install Paho, you can use PIP.

pip install paho-mqtt

and can leverage Paho with

import paho.mqtt.client as mqtt

 

Azure IoT SDK for Python

 

Once the messages have been read by Paho from Mosquitto we need to get these in to Azure IoT. The Azure IoT SDKs for Python enables us to do away with MQTT and speak directly to the service in Python. The SDK takes care of ‘Authentication’, ‘Send device-to-cloud message’, ‘Receive cloud-to-device messages’, ‘Device twins’, ‘Direct methods’, ‘Connection status and error reporting’, ‘Retry policies’ and ‘Upload file to blob’.A lot of the heavy lifting I need is being performed by this SDK for us. To install Azure IoT SDK for Python you can use PIP. For code examples and more you can find more information about this device module on the pypi.org website.

pip install azure-iot-device

and can leverage Azure IoT SDK for Python

from azure.iot.device import IoTHubDeviceClien

Lets write some code.

 

Code Summary

 

See the steps below as I tease out this solution or my GitHub repo for the full Python script. To give you a better understanding on how this works I will break it down in to the logical steps below required to receive messages from Mosquitto over MQTT using Paho and to then re-publish them in to Azure IoT Hub using the Azure IoT SDK for Python.

 

Step 1 – Import Modules

We need to use modules, mainly Paho and Azure IoT to provide additional functionality

import paho.mqtt.client as mqtt
import os
import asyncio
import uuid
from azure.iot.device.aio import IoTHubDeviceClient
from azure.iot.device import Message
from datetime import datetime

 

Step 2 – Connect To Mosquitto & Subscribe To MQTT Topics

After declaring our modules we need to connect to our MQTT broker, we will do this with a function (on_connect)

client = mqtt.Client(MQTTClientName)  
client.on_connect = on_connect 

def on_connect(client, userdata, flags, rc):  # The callback for when the client connects to the broker
    print(str(datetime.now()) + " | Connecting to MQTT Broker : " + MQTTBrokerIP)
    print(str(datetime.now()) + " | Connected with result code {0}".format(str(rc))) 
    print(str(datetime.now()) + " | We are connected!")
    print()
    print(str(datetime.now()) + " | Subscribing to MQTT Topics")
    print(str(datetime.now()) + " | Subscribing to " + MQTTTopicSubscribe)
    client.subscribe(MQTTTopicSubscribe)
    print()

 

After connecting, we need to tell our MQTT broker what topics we want to subscribe to. This way we can be more precise on what we want to replicate in to Azure. We can use MQTT topic filters to do this. Initially I started with a # but decided to use a single level wildcard +.

MQTTTopicSubscribe = "stat/+/POWER" #MQTT Topic Filter

Plus sign (+): It is a single level wildcard that matches any name for a specific topic level. We can use this wildcard instead of specifying a name for any topic level in the topic filter.Hash (#): It is a multi level wildcard that we can use only at the end of the topic filter, as the last level and matches any topic whose first levels are the same as the topic levels specified at the left-hand side of the # symbol.

Understanding wildcards | MQTT Essentials – A Lightweight IoT Protocol (packtpub.com)

 

Step 3 – Listen For Messages

We have now subscribed to MQTT topics and we need to listen and act on these incoming messages. I am taking the MQTT Topic and MQTT Payload, and passing these in to the my python Azure function (not an Azure Function ;)) which will push the payload in to Azure.

client.on_message = on_message  

def on_message(client, userdata, msg):  # The callback for when a PUBLISH message is received from the server.
    global mqtt_topic
    mqtt_topic = msg.topic
    global mqtt_payload
    mqtt_payload = str(msg.payload)
    print(str(datetime.now()) + " | Message received")
    print(str(datetime.now()) + " | MQTT Topic and Payload: " + msg.topic + " " + str(msg.payload)[2:][:-1])  # Print a received msg
    asyncio.run(azure())

 

Step 4 – Send Messages To Azure

With the MQTT Topic and Payload we can now push these messages in to Azure. I am sending these as a JSON object. I have had to massage the MQTT payload as my PLC is adding a few extra values I dont need.

async def azure():
    # Create instance of the device client using the connection string
    device_client = IoTHubDeviceClient.create_from_connection_string(AzureIOTHub_conn_str)
     
    # Connect the device client.
    await device_client.connect()
    print(str(datetime.now()) + " | Async connection established to Azure IOT")

    # Send a single message
    print(str(datetime.now()) + " | Sending message to Azure IOT Hub")
    msg = Message("{ \"DateTime\": \"" + str(datetime.now()) + "\", \"MQTT Topic\": \"" + mqtt_topic + "\", \"Payload\": \"" + mqtt_payload[2:][:-1] + "\" }")
    msg.message_id = uuid.uuid4()
    msg.content_encoding = "utf-8"
    msg.content_type = "application/json"

    
    await device_client.send_message(msg)
    print(str(datetime.now()) + " | Message sent, tearing down Azure IOT Hub connection")
    print()

    # Finally, shut down the client
    await device_client.shutdown()

 

Pulling It All Together

 

Here is a complete copy of the above, plus a bit more. Assuming you have installed Paho and Azure IoT installed via PIP. You could cut and paste the below or clone my GitHub repository.

import paho.mqtt.client as mqtt
import os
import asyncio
import uuid
from azure.iot.device.aio import IoTHubDeviceClient
from azure.iot.device import Message
from datetime import datetime

# -----------------------------------------------------------------------
# EDIT BELOW THIS LINE

ScriptVersion = "1.0"
ModifiedDate = "Monday 15, November 2021"
MQTTBrokerIP = "10.0.0.200" #IP Address of your MQTT Broker
MQTTTopicSubscribe = "stat/+/POWER" #MQTT Topic Filter
MQTTClientName = "RaspiPI4" #Used to identify the device to your MQTT Broker
AzureIOTHub_conn_str = "********************************************************" #Azure IOT Hub Connection String

# EDIT ABOVE THIS LINE
# -----------------------------------------------------------------------

def on_connect(client, userdata, flags, rc):  # The callback for when the client connects to the broker
    print(str(datetime.now()) + " | Connecting to MQTT Broker : " + MQTTBrokerIP)
    print(str(datetime.now()) + " | Connected with result code {0}".format(str(rc))) 
    print(str(datetime.now()) + " | We are connected!")
    print()
    print(str(datetime.now()) + " | Subscribing to MQTT Topics")
    print(str(datetime.now()) + " | Subscribing to " + MQTTTopicSubscribe)
    client.subscribe(MQTTTopicSubscribe)
    print()

def on_message(client, userdata, msg):  # The callback for when a PUBLISH message is received from the server.
    global mqtt_topic
    mqtt_topic = msg.topic
    global mqtt_payload
    mqtt_payload = str(msg.payload)
    print(str(datetime.now()) + " | Message received")
    print(str(datetime.now()) + " | MQTT Topic and Payload: " + msg.topic + " " + str(msg.payload)[2:][:-1])  # Print a received msg
    asyncio.run(azure())
    

async def azure():
    # Create instance of the device client using the connection string
    device_client = IoTHubDeviceClient.create_from_connection_string(AzureIOTHub_conn_str)
     
    # Connect the device client.
    await device_client.connect()
    print(str(datetime.now()) + " | Async connection established to Azure IOT")

    # Send a single message
    print(str(datetime.now()) + " | Sending message to Azure IOT Hub")
    msg = Message("{ \"DateTime\": \"" + str(datetime.now()) + "\", \"MQTT Topic\": \"" + mqtt_topic + "\", \"Payload\": \"" + mqtt_payload[2:][:-1] + "\" }")
    msg.message_id = uuid.uuid4()
    msg.content_encoding = "utf-8"
    msg.content_type = "application/json"

    
    await device_client.send_message(msg)
    print(str(datetime.now()) + " | Message sent, tearing down Azure IOT Hub connection")
    print()

    # Finally, shut down the client
    await device_client.shutdown()

print("*********************************************************************")
print("*                                                                   *")
print("*                                                                   *")
print("*               MQTT --> Azure IOT Bridge                           *")
print("*                                                                   *")
print("*                                                                   *")
print("*                                                                   *")
print("* shane@baldacchino.net                                             *")
print(f"* Version : {ScriptVersion}                                                     *")
print(f"* Modified Date : {ModifiedDate}                          *")
print("*                                                                   *")
print("*********************************************************************")


client = mqtt.Client(MQTTClientName)  
client.on_connect = on_connect 
print(str(datetime.now()) + " | Listening for messages")
print()
client.on_message = on_message  
client.connect(MQTTBrokerIP, 1883, 60)  # Connect to (broker, port, keepalive-time)
client.loop_forever()  # Start networking daemon
try:
    asyncio.run(azure())
except:
    pass #continue on errors - used to solve internet connectivity issues.

 

Seeing This In Action

 

Lets drop to a video to see this in working end-to-end, to validate messages are flowing in to Azure IoT Hub I can use the Azure CLI (AZ-CLI) to monitor the output.

az iot hub monitor-events --output table --device-id devicename --hub-name hubname --output json

 

For the purpose of this demo, I have left a handful of messages at QoS level 2 and set LWT (Last Will and Testament) to true.

 

 

After 24 hours of running, we can see I have published 10.52K of messages in to Azure IoT Hub and there are certain ebbs and flows that occur in my house. You can even tell my kids have screen time late in the afternoon as the number of messages drop considerably. There is temperature, humidity and other signals but movement (captured by passive infrared sensors) almost vanishes from the event stream.

 

Can you see where my children have screen time?Can you see where my children have screen time?

 

Conclusion

 

There are many ways to skin this code cat. My requirements was to publish messages in to Azure and we have been able to achieve this via different ways (I am sure there is more). Automation is a journey, which path will you take?

We illustrated a transparent side-car approach that will listen to an existing broker, on topics you desire and push these in to Azure IoT, all without making any configuration changes (the most important thing for my implementation).

Are there any draw backs? Sure there are. Right now this is one way in direction (simplex) and allows me to push messages in to Azure IoT Hub but not receive messages back. All of my devices locally are represented as a single Azure IoT device which is less than ideal.

Azure IoT Edge or direct MQTT publishing to Azure IoT Hub would be duplex communication resulting in each local device being represented as a unique device in Azure IoT Hub.

The Azure IoT SDK for Python is capable of duplex communication (Receive cloud-to-device message) but I have yet to implement it. Will I? I am unsure but its nice to know I can. Today I am using this approach to pass in telemetry and am able to distinguish my devices based on the MQTT topic structure. My data will be in a future post massaged within Azure removing the constraints of MQTT. There is many ways to do this, from a pure code approach (I mentioned above the Azure IoT SDK for Python has a lot more smarts) though to Device Twins and Module Twins. Personally, I like the SDK approach, it's my code, my choices on what I do, but I do understand this is not for everyone. We now have my messages, my events, in Azure and now its time to make some friends and learn how to derive operational intelligence from visualisations through to machine learning and beyond.

Think big and happy building!

Shane

Posted at https://sl.advdat.com/3DzU4Vm