Friday, March 4, 2022

Implementing Spark posexplode() equivalent in the serverless SQL pool

Spark enables us to work with nested data in Parquet files. If you have a Parquet file with a complex type column (array, struct, map), Spark enables you to “unpack” the values from this column and join the elements of this column with the row it belongs.


Why do we need to unpack array elements?

Having a denormalized data representation with the data stored as array elements might be very convenient from the data modeling and storage perspective but complicates analytic. In many cases you would need to use a flattened structure where every array element is represented as scalar values.

In the following picture, you can see a typical transformation that is done on a dataset with complex array cells that are transformed into scalar structure.


The array cells are pivoted and returned as simple scalar columns. Now you can simply use WHERE or GROUP BY clauses to filter or summarize information by array element values. Another very useful piece of information might be the index of every element (generated as pos column).

Spark enables you to use the posexplode() function on every array cell. The posexplode() function will transform a single array element into a set of rows where each row represents one value in the array and the index of that array element. As a result, one row with the array containing three elements will be transformed into three rows containing scalar cells. This flattened/normalized representation is much easier for the analysis.

Once the array is flattened and normalized, you can easily analyze the data and find how much people knowing SQL or Java.


Using posexplode() function in Spark

In the following example (taken from the SparkByExamples site where you can find more information about these functions), we can see how to load a Parquet file containing the name of a person and a list of programming languages they know:



In every row in the dataframe, we have a name of a person and an array of languages that the person knows.

In many scenarios we don’t want to use composite array elements in the column. There are many scenarios where we would like to “join” the row containing the name of the person with every individual language in the array.

Spark engine provides the explode() function that “pivots” the array elements and “join” them with the main row. The CROSS/OUTER APPLY operator in T-SQL combined with the OPENJSON function is a very similar construct.

There is another interesting Spark function called posexplode() that unpacks the array and returns the position of each element with the element value. In the following example is shown how to return the rows from the dataframe, expose the name column, and join the main row with the languages from the array column:



Note that the name is repeated for every name-language pair – Michael is repeated three times because this name is “joined” with three language values (Spark, Java, and null). We have an additional pos column that represents the index of each language element in the array. The posexplode() function returns this position value with every element from the array. Position information might be very useful if you need to get every first element in the array.


Working with array types in T-SQL

The serverless SQL pools in Synapse Analytics enable us to read the Parquet files that contain nested types. The array types are formatted as JSON arrays and returned as single column.



The serverless SQL pools transform the Parquet array object into an array cell formatted as JSON array. The OPENJSON function in the serverless SQL pool allows you to parse nested arrays and return one row for each JSON array element as a separate cell. Therefore, you can transform the Spark queries with the explode() function as CROSS APLY OPENJSON() construct in T-SQL.

However, converting posexplode() and returning the position of the element might be a challenge. In T-SQL we might need to use complex window aggregate functions (such as ROW_NUMBER) to get the index of an array element, which might complicate the query.

If you are writing the T-SQL queries in the serverless SQL pools, you don’t need to use the ROW_NUMBER function to identify the index of each array element. The OPENJSON function can return a special column that represents the index (position) of the returned element in the array.

The T-SQL query in serverless SQL pool that is equivalent to posexplode() example in the previous code sample is shown on the following picture:



Note the pos column in the WITH clause with the expression $.sql:identity() after the column type definition. This column will return identity of each array element in the nested array column. In this case, every array element will get a 0-based index that represents the position in the array.

With this column you can have the equivalent of the posexplode() function in Spark and easily get the position of each element returned by the OPENJSON function.



The $.sql:identity() path is a new feature in the OPENJSON function that can be used in the serverless SQL pols in Azure Synapse Analytics, that enables you to easily transform the Spark queries that process the nested arrays. Learn more about this feature in the OPENJSON documentation page.

Posted at