Friday, October 1, 2021

How to use Logic Apps to handle large amount of data from Log Analytics workspace

If your Log Analytics workspace has a huge amount of data, saying billion records per day, and you need to handle these data or save them somewhere else, it would be impossible to achieve the goal by using Azure Monitor Log connector directly due to the query time and size limits for a single query action. Now here is an alternative way to use logic app to process large amount of data.

 

Below are our goals in this sample:

  1. Successfully pick up all data from Log Analytics workspace and store them into Storage Account.
  2. (Optional) The size of each file stored is almost the same (e.g.15000 records per file).
  3. Execute this job as fast as we can.

 

Here are how we implement the logic app:

  1. Firstly, let’s have a pre-processing for the entire data. The goal is to create 2D array to split entire time range into small continuous time pieces. For example:

Original time period of data is from Time A (2021-01-01T00:00:00Z) to Time Z (2021-06-01T00:00:00Z)

[A,Z]: [2021-01-01T00:00:00Z, 2021-06-01T00:00:00Z]


We could split the time range into a 2D array like this: [[A,B],[B,C],...,[Y,Z]]
[A,B]: [2021-01-01T00:00:00Z, 2021-01-01T01:00:00Z]
[B,C]: [2021-01-01T01:00:00Z, 2021-01-01T02:00:00Z]
...
[Y,Z]: [2021-05-31T23:00:00Z, 2021-06-01T00:00:00Z]

Yanbo_Deng_4-1632963997813.png

 

  1. Then, to implement the time split action, we have two choices: 1) either split in a fixed time interval, 2) or split in a fixed line of records interval. Splitting in fixed time is much simpler, but the file sizes are not consistent in each time interval and may still hit the API limit (query time or response size limit) if logs are not averagely distributed in time. Splitting in fixed line of records interval would require much more complex pre-processing, but blob sizes would be almost the same which could help avoid hitting the API limit.

Example: For Splitting Time A to Time Z, we got  [A,B), [B,C), [C,D), [D,E), …,[Y,Z)

1) Splitting in a fixed time interval means B-A = C-B = D-C = … = Z-Y

2) Splitting in a fixed line interval means line of records between A and B, B and C, … Y and Z are all same, i.e., 15000 records each time frame.

For how to do the split, please see reference 1 in this article.

 

  1. Once we have Time splits ready, we could use a “For each” loop to run queries inside each time frame concurrently. Make sure you disabled “concurrency control” in “For each” action settings, so it by default runs “For each” loop in concurrency degree of 20. Or you can manually enable “Concurrency Control” and set a suitable degree for it (e.g. 40).

Yanbo_Deng_1-1632963526340.png

 

Yanbo_Deng_2-1632963526345.png

 

  1. However, if the size of logs is extremely large that even the pre-processing took too much time to execute and hit the API limit, we could implement an “outer” logic app to split the entire query time range beforehand. For example, if you would like to query one-year logs, you could split them into 10 days each, and query for ten-day logs in parallel.

Yanbo_Deng_0-1633073573166.png

 

Note:

HTTP action in the above workflow is POST request to the “inner” logic app with body of split times. Using HTTP instead of "Azure Logic App" connector is because we don't need to wait until the inner logic app run finished.

1) Initially, “TempStartTime”= “StartOfTime”, “TempEndTime”=”StartOfTime”+”DaysInterval”

2) During each iteration, we set “TempStartTime”=”TempEndTime”, “TempEndTime”=”TempEndTime”+”DaysInterval”

3) We check if “TempStartTime”>=”EndDateTime” jump out of the “Until” loop.

4) One exception is we need to check if “TempEndTime”>”EndDateTime”, “TempEndTime”=”EndDateTime”

 

Reference 1: How to Split in fixed lines interval

We used one “Run query and list results” action to pre-process the entire log data. The query is like below:

 

let innerTable = (<QueryTable>
    | where TimeGenerated between(startofday(datetime('<Start_Time>')) .. endofday(datetime('<End_Time>')))
    | order by TimeGenerated asc
    | extend CurRowNum= tostring(row_number())
    | summarize make_list (TimeGenerated) by CurRowNum);
let source1 = ( <QueryTable>
    | where TimeGenerated between(startofday(datetime('<Start_Time>')) .. endofday(datetime('<End_Time>')))
    | summarize Count=count()
    | project CurRowNum=range(1, Count, <NumberOfRecordsInOneQueryResult>)
    | mv-expand CurRowNum
    | extend CurRowNum = tostring(parse_json(CurRowNum))
    | join (innerTable) 
        on CurRowNum
    | mv-expand list_TimeGenerated
    | project TimeGenerated = todatetime(list_TimeGenerated));
let ExtendedTimeSeries = source1 | union (datatable (TimeGenerated:datetime) [datetime(@{addDays(<End_Time>,1)})]);
let add_rownumber=(T:(TimeGenerated:datetime),shift:int64) {
    T | order by TimeGenerated asc | extend RN=row_number()+shift
};
let StartTimeSeries = ExtendedTimeSeries | invoke add_rownumber(0);
let EndTimeSeries = ExtendedTimeSeries | invoke add_rownumber(-1);
StartTimeSeries | join EndTimeSeries on RN | project StartTime = TimeGenerated, EndTime = TimeGenerated1

 

Note:

<QueryTable> is the table you are querying from Log Analytics Workspace

<Start_Time> is the query start time. If you are using an "outer" Logic App, you will need to get this value from the trigger body.

<End_Time> is the query end time. If you are using an "outer" Logic App, you will need to get this value from the trigger body.

<NumberOfRecordsInOneQueryResult> is the line of records you would like to query each time. You could customize this value base on your needs: larger value means larger query results, and make sure you don't hit the API limit. If you are using an "outer" Logic App, you will need to get this value from the trigger body.

 

And its output is a JSON array. Here is the output sample:

 

[
    {
        "EndTime": "2021-01-01T01:00:00Z",
        "StartTime": "2021-01-01T00:00:00Z"
    },
    {
        "EndTime": "2021-01-01T02:00:00Z",
        "StartTime": "2021-01-01T01:00:00Z"
    },
    ...
]

 

 

Reference 2:

Sample logic app templates in GitHub

Deploy to Azure: 

There are two logic apps: “inner” and “outer”; two API connections: blob and Log Analytics Workspace.

 

Parameters in templates:

connections_azureblob_name: blob API connection name
connections_azuremonitorlogs_name: Azure Monitor Log API connection name
workflows_outer_name: "outer" logic app name
workflows_inner_name: "inner" logic app name
location: resource Azure region
storage_account_name: storage account name
storage_account_accessKey: storage account access key 
log_analytics_workspace_resourceGroup: Log Analytics Workspace resource group
log_analytics_workspace_name: Log Analytics Workspace name
endDateTime: Total query end time
numberOfRec: Number of records per file
startDateTime: Total query start time
tableName: The table we are querying from
Posted at https://sl.advdat.com/3mhKZd0