Thursday, April 14, 2022

Real-time data ingestion in Synapse SQL pool at scale

This article reviews a common pattern of streaming data (i.e. real-time message ingestion) in Synapse dedicated pool. It opens a discussion on the simple standard way to implement this, as well as the challenges and drawbacks. It then presents an alternate solution which enables optimal performance and greatly reduces maintenance tasks when using clustered column store indexes. This is aimed at developers, DBAs, architects, and anyone who works with streams of data that are captured in real-time.

 

Scenario description

Description of this scenario is as follow: we have messages produced by sensors (simulated by Azure container instances) sent to an Event Hub, then processed by an ASA job which redirects its output into a Synapse table named “rawdata”. At this stage there are two options:

  • query directly “rawdata” table.
  • use “rawdata” as a staging table, then build a pipeline that process the data every ten minutes and store it in a final fact table. It consists of an UPSERT statement to ensure existing records are updated (in case they are resent), otherwise inserts them into a final fact table named “rawdata_fact”.

 

General solution Design Architecture

Picture2.png

 

Building blocks of the current solution can be found on the following link

streaming-at-scale/eventhubs-streamanalytics-azuresql at main · Azure-Samples/streaming-at-scale (github.com)

This sample deploys within one resource group:

  • Container instances simulating IoT devices sending their temperature and amount of CO2.
  • Event Hub ingesting messages in real time.
  • Stream Analytics job preprocessing messages and redirect them into Synapse SQL pool.

How to handle the data ingested

In case ASA job loads data directly in the fact table and assuming it has a clustered column store index, what happens is:

  • no delta store compression can take place as the table is constantly inserted
  • simultaneous COPY INTO statements issued by ASA job can potentially trigger new delta stores creation, hence reduce the performance and/or require index maintenance tasks.

 

See illustration below which shows this:

Picture3.png

 

In case ASA job loads the telemetry data into a staging table named "rawdata" then upsert the fact table, we can see that what happens is the following:

  • there is one delta store per distribution where the insertions land, and no closed rowgroup hence all others are compressed rowgroups.
  • the tuple mover at each compute node ensures that once the delta stores get filled up they are converted in compressed rowgroups.

See illustration below which shows this:

Picture4.png

This task is constituted of:

  • A simple pipeline with a stored procedure activity.

 

 

{
    "name": "processrawdata",
    "type": "SqlPoolStoredProcedure",
    "dependsOn": [],
    "policy": {
        "timeout": "7.00:00:00",
        "retry": 0,
        "retryIntervalInSeconds": 30,
        "secureOutput": false,
        "secureInput": false
    },
    "userProperties": [],
    "sqlPool": {
        "referenceName": "sqlpool2",
        "type": "SqlPoolReference"
    },
    "typeProperties": {
        "storedProcedureName": "[dbo].[processrawdata]"
    }
}

 

 

  • A tumbling windows trigger that runs the pipeline every 10 minutes.

 

 

{
    "name": "tumblingwindow10minutes",
    "properties": {
        "annotations": [],
        "runtimeState": "Stopped",
        "pipeline": {
            "pipelineReference": {
                "referenceName": "processrawdata",
                "type": "PipelineReference"
            }
        },
        "type": "TumblingWindowTrigger",
        "typeProperties": {
            "frequency": "Minute",
            "interval": 10,
            "startTime": "2022-01-14T12:58:00Z",
            "delay": "00:00:00",
            "maxConcurrency": 50,
            "retryPolicy": {
                "intervalInSeconds": 30
            },
            "dependsOn": []
        }
    }
}

 

 

  • a stored procedure scheduled to update the fact from the staging:

 

 

ALTER PROC [dbo].[processrawdata] AS
BEGIN 
begin TRANSACTION
	--We ensure the staging data is locked before its data gets either insert or update the fact table to avoid inconsistencies
    UPDATE rawdata SET partitionid=0 WHERE 1=0 
	--Now the fact update can take place as well as the stating data when processed
    MERGE dbo.rawdata_fact AS macible 
    USING dbo.rawdata AS masource
    ON (macible.eventId = masource.eventId
	and macible.[Type]=masource.[Type] 
	and macible.DeviceId=masource.DeviceId 
	and macible.DeviceSequenceNumber=masource.DeviceSequenceNumber 
	and macible.CreatedAt=masource.CreatedAt) 
    WHEN MATCHED
    THEN UPDATE SET
        macible.EventId=masource.EventId,
        macible.[Type]=masource.[Type],
        macible.DeviceId=masource.DeviceId,
        macible.DeviceSequenceNumber=masource.DeviceSequenceNumber,
        macible.CreatedAt=masource.CreatedAt,
        macible.[Value]=masource.[Value],
        macible.ComplexData=masource.ComplexData,
        macible.EnqueuedAt=masource.EnqueuedAt,
        macible.ProcessedAt=masource.ProcessedAt,
        macible.StoredAt=masource.StoredAt
        WHEN NOT MATCHED BY TARGET THEN  
    INSERT ([EventId]
            ,[Type]
            ,[DeviceId]
            ,[DeviceSequenceNumber]
            ,[CreatedAt]
            ,[Value]
            ,[ComplexData]
            ,[EnqueuedAt]
            ,[ProcessedAt]
            ,[StoredAt]
            ,[PartitionId]) 
    VALUES
            (masource.[EventId]
            ,masource.[Type]
            ,masource.[DeviceId]
            ,masource.[DeviceSequenceNumber]
            ,masource.[CreatedAt]
            ,masource.[Value]
            ,masource.[ComplexData]
            ,masource.[EnqueuedAt]
            ,masource.[ProcessedAt]
            ,masource.[StoredAt]
            ,masource.[PartitionId]);
    delete from rawdata;
commit TRAN;
END

 

 

Then we can a simple view on to of both tables so we have the latest data as well as the processed one:

 

 

CREATE  VIEW [dbo].[v_rawdata]
AS select * from rawdata 
union ALL
select * from rawdata_fact;

 

 

And monitor in near real time the average temperature and CO2 values:

 

 

select type, avg(value) as average
from v_rawdata 
group by type

 

 

Takeaways

We can stream data directly from Stream Analytics into a Synapse dedicated pool table. We can avoid the clustered store index maintenance by using an intermediate table and a scheduled pipeline.

Posted at https://sl.advdat.com/3OcMhDmhttps://sl.advdat.com/3OcMhDm