Monday, July 19, 2021

Partition Stream Analytics output and query it with serverless Synapse SQL

The use case is as follows: I have water meter telemetry I would like to do analytics on. 
Events are ingested from water meters and collected into a data lake in parquet format. The data is partitioned by Year, Month and Day based on the timestamp contained in the events themselves and not based on the time of the event processing in ASA as this is a frequent requirement.

 

Events are sent from the on premise SCADA systems to Event Hub then processed by Stream Analytics which then can easily:

  1. Convert events sent in JSON format into partitioned parquet.
  2. Portioning is based on Year/Month/Day.
  3. Date used for partitioning is coming from within the event.

The result can immediately be queried with serverless Synapse SQL pool.

Input Stream

My ASA input stream named inputEventHub is plugged into an Event Hub in JSON format.

Output Stream

The output stream is the interesting part and will define the partition scheme:

lionelp_0-1624896450197.png

We see that its path pattern is based on a pseudo column named "time_struct" and all the partitioning logic is in the construct of this pseudo column.

 

Let's have a look at the ASA query:

lionelp_1-1624896696879.png

lionelp_2-1624896721550.png

 

We can see now that the pseudo_column time_struct contains the path, ASA understands it and processes it literally including the "/" sign.

 

Here is the query code:

 

 

 

select 
    concat('year=',substring(createdAt,1,4),'/month=',substring(createdAt,6,2),'/day=',substring(createdAt,9,2)) as time_struct,
    eventId,
    [type],
    deviceId,
    deviceSequenceNumber,
    createdAt,
    Value,
    complexData,
    EventEnqueuedUtcTime AS enqueuedAt,
    EventProcessedUtcTime AS processedAt,
    cast(UDF.GetCurrentDateTime('') as datetime) AS storedAt,
    PartitionId
into
    [lionelpdl]
from 
    [inputEventHub]

 

 

 

 

After few days of processing the output folder looks like this as a result:

lionelp_0-1624896848841.png

 

lionelp_1-1624896848841.png

Query results with serveless SQL and take advantage of partitioning

Now I can directly query my Output Stream with serverless SQL:

 

lionelp_0-1624898246159.png

 

We can also notice that the metadata functions are fully functional without any additional work. For example I can run the following query using filepath metadata function:

 

 

 

  SELECT top 100
    [result].filepath(1) AS [year]
    ,[result].filepath(2) AS [month]
    ,[result].filepath(3) AS [day]
    ,*
FROM
    OPENROWSET(
        BULK 'https://lionelpdl.dfs.core.windows.net/parquetzone/deplasa1/year=*/month=*/day=*/*.parquet',
        FORMAT='PARQUET'
    ) AS [result]

where [result].filepath(2)=6
  and [result].filepath(3)=23

 

 

 

Spark post processing

Finally, to optimize my query performance I can schedule a Spark job which processes daily all events from the previous day, compacts them into fewer and larger parquet files.

As an example, I've decided to rebuild the partitions with files containing 2 million rows.

 

Here are 2 versions of the same code:

PySpark notebook (for interactive testing for instance)

 

 

 

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from functools import reduce
from pyspark.sql import DataFrame
import datetime

account_name = "storage_account_name"
container_name = "container_name"
source_root = "source_directory_name"
target_root = "target_directory_name"
days_backwards = 4 #number of days from today, typicaly, as a daily job it'll be set to 1
adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, source_root)

hier = datetime.date.today() - datetime.timedelta(days = days_backwards)
day_to_process = '/year=%04d/month=%02d/day=%02d/' % (hier.year,hier.month,hier.day)
file_pattern='*.parquet'

print((adls_path + day_to_process + file_pattern))

df = spark.read.parquet(adls_path + day_to_process + file_pattern)

adls_result = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, target_root)

print(adls_result + day_to_process + file_pattern)

df.coalesce(1).write.option("header",True) \
        .mode("overwrite") \
        .option("maxRecordsPerFile", 2000000) \
        .parquet(adls_result + day_to_process)

 

 

 

 

Spark job (with input parameters scheduled daily)

lionelp_0-1625501958062.png

 

 

 

 

import sys
import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from functools import reduce
from pyspark.sql import DataFrame

if __name__ == "__main__":
	
	# create Spark context with necessary configuration
	conf = SparkConf().setAppName("dailyconversion").set("spark.hadoop.validateOutputSpecs", "false")
	sc = SparkContext(conf=conf)
	spark = SparkSession(sc)
	
	account_name = sys.argv[1] #'storage_account_name'
	container_name = sys.argv[2] #"container_name"
	source_root = sys.argv[3] #"source_directory_name"
	target_root = sys.argv[4] #"target_directory_name"
	days_backwards = sys.argv[5] #number of days backwards in order to reprocess the parquet files, typically 1

	hier = datetime.date.today() - datetime.timedelta(days=int(days_backwards))
    
	day_to_process = '/year=%04d/month=%02d/day=%02d/' % (hier.year,hier.month,hier.day)
	file_pattern='*.parquet'

	adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, source_root)

	print((adls_path + day_to_process + file_pattern))

	df = spark.read.parquet(adls_path + day_to_process + file_pattern)
	#display (df.limit(10))
	#df.printSchema()
	#display(df)
	adls_result = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, target_root)

	print(adls_result + day_to_process + file_pattern)

	df.coalesce(1).write.option("header",True) \
		.mode("overwrite") \
		.option("maxRecordsPerFile", 2000000) \
		.parquet(adls_result + day_to_process)

 

 

 

Conclusion

In this article we have covered:

  • How to easily use Stream Analytics to write an output with partitioned parquet files.
  • How to use serverless Synapse SQL pool to query Stream analytics output.
  • How to reduce the number of parquet files using synapse Spark pool.

Additional resources:

 

 

Posted at https://sl.advdat.com/3zaI9LR