The use case is as follows: I have water meter telemetry I would like to do analytics on.
Events are ingested from water meters and collected into a data lake in parquet format. The data is partitioned by Year, Month and Day based on the timestamp contained in the events themselves and not based on the time of the event processing in ASA as this is a frequent requirement.
Events are sent from the on premise SCADA systems to Event Hub then processed by Stream Analytics which then can easily:
- Convert events sent in JSON format into partitioned parquet.
- Portioning is based on Year/Month/Day.
- Date used for partitioning is coming from within the event.
The result can immediately be queried with serverless Synapse SQL pool.
Input Stream
My ASA input stream named inputEventHub is plugged into an Event Hub in JSON format.
Output Stream
The output stream is the interesting part and will define the partition scheme:
We see that its path pattern is based on a pseudo column named "time_struct" and all the partitioning logic is in the construct of this pseudo column.
Let's have a look at the ASA query:
We can see now that the pseudo_column time_struct contains the path, ASA understands it and processes it literally including the "/" sign.
Here is the query code:
select
concat('year=',substring(createdAt,1,4),'/month=',substring(createdAt,6,2),'/day=',substring(createdAt,9,2)) as time_struct,
eventId,
[type],
deviceId,
deviceSequenceNumber,
createdAt,
Value,
complexData,
EventEnqueuedUtcTime AS enqueuedAt,
EventProcessedUtcTime AS processedAt,
cast(UDF.GetCurrentDateTime('') as datetime) AS storedAt,
PartitionId
into
[lionelpdl]
from
[inputEventHub]
After few days of processing the output folder looks like this as a result:
Query results with serveless SQL and take advantage of partitioning
Now I can directly query my Output Stream with serverless SQL:
We can also notice that the metadata functions are fully functional without any additional work. For example I can run the following query using filepath metadata function:
SELECT top 100
[result].filepath(1) AS [year]
,[result].filepath(2) AS [month]
,[result].filepath(3) AS [day]
,*
FROM
OPENROWSET(
BULK 'https://lionelpdl.dfs.core.windows.net/parquetzone/deplasa1/year=*/month=*/day=*/*.parquet',
FORMAT='PARQUET'
) AS [result]
where [result].filepath(2)=6
and [result].filepath(3)=23
Spark post processing
Finally, to optimize my query performance I can schedule a Spark job which processes daily all events from the previous day, compacts them into fewer and larger parquet files.
As an example, I've decided to rebuild the partitions with files containing 2 million rows.
Here are 2 versions of the same code:
PySpark notebook (for interactive testing for instance)
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from functools import reduce
from pyspark.sql import DataFrame
import datetime
account_name = "storage_account_name"
container_name = "container_name"
source_root = "source_directory_name"
target_root = "target_directory_name"
days_backwards = 4 #number of days from today, typicaly, as a daily job it'll be set to 1
adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, source_root)
hier = datetime.date.today() - datetime.timedelta(days = days_backwards)
day_to_process = '/year=%04d/month=%02d/day=%02d/' % (hier.year,hier.month,hier.day)
file_pattern='*.parquet'
print((adls_path + day_to_process + file_pattern))
df = spark.read.parquet(adls_path + day_to_process + file_pattern)
adls_result = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, target_root)
print(adls_result + day_to_process + file_pattern)
df.coalesce(1).write.option("header",True) \
.mode("overwrite") \
.option("maxRecordsPerFile", 2000000) \
.parquet(adls_result + day_to_process)
Spark job (with input parameters scheduled daily)
import sys
import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from functools import reduce
from pyspark.sql import DataFrame
if __name__ == "__main__":
# create Spark context with necessary configuration
conf = SparkConf().setAppName("dailyconversion").set("spark.hadoop.validateOutputSpecs", "false")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
account_name = sys.argv[1] #'storage_account_name'
container_name = sys.argv[2] #"container_name"
source_root = sys.argv[3] #"source_directory_name"
target_root = sys.argv[4] #"target_directory_name"
days_backwards = sys.argv[5] #number of days backwards in order to reprocess the parquet files, typically 1
hier = datetime.date.today() - datetime.timedelta(days=int(days_backwards))
day_to_process = '/year=%04d/month=%02d/day=%02d/' % (hier.year,hier.month,hier.day)
file_pattern='*.parquet'
adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, source_root)
print((adls_path + day_to_process + file_pattern))
df = spark.read.parquet(adls_path + day_to_process + file_pattern)
#display (df.limit(10))
#df.printSchema()
#display(df)
adls_result = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, target_root)
print(adls_result + day_to_process + file_pattern)
df.coalesce(1).write.option("header",True) \
.mode("overwrite") \
.option("maxRecordsPerFile", 2000000) \
.parquet(adls_result + day_to_process)
Conclusion
In this article we have covered:
- How to easily use Stream Analytics to write an output with partitioned parquet files.
- How to use serverless Synapse SQL pool to query Stream analytics output.
- How to reduce the number of parquet files using synapse Spark pool.
Additional resources:
- Partitioning of the output stream is built based on Azure Stream Analytics custom blob output partitioning | Microsoft Docs
- Querying the data using file metadata with serverless SQL is based on Using file metadata in queries - Azure Synapse Analytics | Microsoft Docs
- Additional Apache Spark post processing of parquet files is based on Tutorial: Create Apache Spark job definition in Synapse Studio - Azure Synapse Analytics | Microsoft Docs
Posted at https://sl.advdat.com/3zaI9LR