This article reviews a common pattern of streaming data (i.e. real-time message ingestion) in Synapse dedicated pool. It opens a discussion on the simple standard way to implement this, as well as the challenges and drawbacks. It then presents an alternate solution which enables optimal performance and greatly reduces maintenance tasks when using clustered column store indexes. This is aimed at developers, DBAs, architects, and anyone who works with streams of data that are captured in real-time.
Scenario description
Description of this scenario is as follow: we have messages produced by sensors (simulated by Azure container instances) sent to an Event Hub, then processed by an ASA job which redirects its output into a Synapse table named “rawdata”. At this stage there are two options:
- query directly “rawdata” table.
- use “rawdata” as a staging table, then build a pipeline that process the data every ten minutes and store it in a final fact table. It consists of an UPSERT statement to ensure existing records are updated (in case they are resent), otherwise inserts them into a final fact table named “rawdata_fact”.
General solution Design Architecture
Building blocks of the current solution can be found on the following link
This sample deploys within one resource group:
- Container instances simulating IoT devices sending their temperature and amount of CO2.
- Event Hub ingesting messages in real time.
- Stream Analytics job preprocessing messages and redirect them into Synapse SQL pool.
How to handle the data ingested
In case ASA job loads data directly in the fact table and assuming it has a clustered column store index, what happens is:
- no delta store compression can take place as the table is constantly inserted
- simultaneous COPY INTO statements issued by ASA job can potentially trigger new delta stores creation, hence reduce the performance and/or require index maintenance tasks.
See illustration below which shows this:
In case ASA job loads the telemetry data into a staging table named "rawdata" then upsert the fact table, we can see that what happens is the following:
- there is one delta store per distribution where the insertions land, and no closed rowgroup hence all others are compressed rowgroups.
- the tuple mover at each compute node ensures that once the delta stores get filled up they are converted in compressed rowgroups.
See illustration below which shows this:
This task is constituted of:
- A simple pipeline with a stored procedure activity.
{
"name": "processrawdata",
"type": "SqlPoolStoredProcedure",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"sqlPool": {
"referenceName": "sqlpool2",
"type": "SqlPoolReference"
},
"typeProperties": {
"storedProcedureName": "[dbo].[processrawdata]"
}
}
- A tumbling windows trigger that runs the pipeline every 10 minutes.
{
"name": "tumblingwindow10minutes",
"properties": {
"annotations": [],
"runtimeState": "Stopped",
"pipeline": {
"pipelineReference": {
"referenceName": "processrawdata",
"type": "PipelineReference"
}
},
"type": "TumblingWindowTrigger",
"typeProperties": {
"frequency": "Minute",
"interval": 10,
"startTime": "2022-01-14T12:58:00Z",
"delay": "00:00:00",
"maxConcurrency": 50,
"retryPolicy": {
"intervalInSeconds": 30
},
"dependsOn": []
}
}
}
- a stored procedure scheduled to update the fact from the staging:
ALTER PROC [dbo].[processrawdata] AS
BEGIN
begin TRANSACTION
--We ensure the staging data is locked before its data gets either insert or update the fact table to avoid inconsistencies
UPDATE rawdata SET partitionid=0 WHERE 1=0
--Now the fact update can take place as well as the stating data when processed
MERGE dbo.rawdata_fact AS macible
USING dbo.rawdata AS masource
ON (macible.eventId = masource.eventId
and macible.[Type]=masource.[Type]
and macible.DeviceId=masource.DeviceId
and macible.DeviceSequenceNumber=masource.DeviceSequenceNumber
and macible.CreatedAt=masource.CreatedAt)
WHEN MATCHED
THEN UPDATE SET
macible.EventId=masource.EventId,
macible.[Type]=masource.[Type],
macible.DeviceId=masource.DeviceId,
macible.DeviceSequenceNumber=masource.DeviceSequenceNumber,
macible.CreatedAt=masource.CreatedAt,
macible.[Value]=masource.[Value],
macible.ComplexData=masource.ComplexData,
macible.EnqueuedAt=masource.EnqueuedAt,
macible.ProcessedAt=masource.ProcessedAt,
macible.StoredAt=masource.StoredAt
WHEN NOT MATCHED BY TARGET THEN
INSERT ([EventId]
,[Type]
,[DeviceId]
,[DeviceSequenceNumber]
,[CreatedAt]
,[Value]
,[ComplexData]
,[EnqueuedAt]
,[ProcessedAt]
,[StoredAt]
,[PartitionId])
VALUES
(masource.[EventId]
,masource.[Type]
,masource.[DeviceId]
,masource.[DeviceSequenceNumber]
,masource.[CreatedAt]
,masource.[Value]
,masource.[ComplexData]
,masource.[EnqueuedAt]
,masource.[ProcessedAt]
,masource.[StoredAt]
,masource.[PartitionId]);
delete from rawdata;
commit TRAN;
END
Then we can a simple view on to of both tables so we have the latest data as well as the processed one:
CREATE VIEW [dbo].[v_rawdata]
AS select * from rawdata
union ALL
select * from rawdata_fact;
And monitor in near real time the average temperature and CO2 values:
select type, avg(value) as average
from v_rawdata
group by type
Takeaways
We can stream data directly from Stream Analytics into a Synapse dedicated pool table. We can avoid the clustered store index maintenance by using an intermediate table and a scheduled pipeline.
Posted at https://sl.advdat.com/3OcMhDmhttps://sl.advdat.com/3OcMhDm