Friday, July 9, 2021

Using Change Data Capture and Azure Data Factory to Incrementally Load Change Data

This blog is part of the Change Data Capture in Azure SQL Databases Blog Series, which started with the announcement on releasing CDC in Azure SQL Databases in early June 2021. You can view the release announcement here: https://aka.ms/CDCAzureSQLDB

 

In this tutorial, you will create an Azure Data Factory pipeline that copies change data from Change Data Capture tables in an Azure SQL database to Azure Blob Storage. 

 

Change Data Capture

Change Data Capture (CDC) is currently available in SQL Server (all supported versions), Azure SQL Managed Instance, and Azure SQL Database (Preview). CDC records insert, update, and delete activity that applies to a table. This makes the details of the changes available in an easily consumed relational format. Column information and the metadata that is required to apply the changes to a target environment is captured for the modified rows and stored in change tables that mirror the column structure of the tracked source tables. Table-valued functions are provided to allow systematic access to the change data. Learn more about CDC here.

 

Azure Data Factory

Azure Data Factory is a cloud-based data integration service that orchestrates and automates the movement and transformation of data. You can create data integration solutions using the Data Factory service that can ingest data from various data stores, transform/process the data, and publish the result data to the data stores. Learn more about Azure Data Factory here.

 

Azure Blob Storage

Azure Blob storage is Microsoft’s object storage solution for the cloud. Blob storage is optimized for storing massive amounts of unstructured data. Unstructured data is data that doesn’t adhere to a particular data model or definition, such as text or binary data. Learn more about Azure Blob Storage here.

 

Using Azure Data Factory to send Change Data Capture data from an Azure SQL Database to Azure Blob Storage

Prerequisites:

Steps:

  • Create a data source table in Azure SQL:
  1. Launch SQL Server Management Studio and connect to your Azure SQL database.
  2. In Server Explorer, right-click your database and choose the New Query.
  3. Run the following SQL command against your Azure SQL database to create a table named “customers” as data source store.

create table customers

(

customer_id int,

first_name varchar(50),

last_name varchar(50),

email varchar(100),

city varchar(50), CONSTRAINT "PK_Customers" PRIMARY KEY CLUSTERED ("customer_id")

 );

  1. Enable Change Data Capture on your database and source table.

EXEC sys.sp_cdc_enable_db

EXEC sys.sp_cdc_enable_table

@source_schema = 'dbo',

@source_name = 'customers',

@role_name = 'null',

@supports_net_changes = 1

  1. Insert some data into the customers table.

INSERT INTO customers (customer_id, first_name, last_name, email, city)

VALUES

     (1, 'Chevy', 'Leward', 'cleward0@mapy.cz', 'Reading'),

     (2, 'Sayre', 'Ateggart', 'sateggart1@nih.gov', 'Portsmouth'),

    (3, 'Nathalia', 'Seckom', 'nseckom2@blogger.com', 'Portsmouth');

 

  • Create an Azure Data Factory pipeline
  1. Launch the Azure Portal. In the left menu, go to Create a resource -> Data + Analytics -> Data Factory.
  2. Select your Azure subscription in which you want to create the data factory.
  3. For the Resource Group, do one of the following steps:
    • Select Use existing and select an existing resource group from the drop-down list.
    • Select Create new and enter the name of a resource group.
  4. Select the region for the data factory. Only locations that are supported are displayed in the drop-down list. The data stores (Azure Storage, Azure SQL Database, etc.) and computes (HDInsight, etc.) used by data factory can be in other regions.
  5. Enter ADFCDCTutorial in name. Note that this name must be globally unique.
  6. Select V2 for version.
  7. Click Review + Create.
  8. Once the deployment is complete, click on Go to resource.
  9. Click Author and monitor tile to launch the Azure Data Factory user interface (UI) in a separate tab.
  10. In the get started page, switch to the Author tab in the left panel.

 

  • Create an Azure Storage linked service
  1. In Manage, click Linked services and click + New.
  2. In the New Linked Service window, select Azure Blob Storage, and click Continue.
  3. In the New Linked Service window, do the following steps:
    1. Enter AzureStorageLinkedService for Name.
    2. Select your Azure Storage account for Storage account name.
    3. Click Create.

 

  • Create an Azure SQL Database linked service
  1. Click Linked services and click + New.
  2. In the New Linked Service window, select Azure SQL Database, and click Continue.
  3. In the New Linked Service window, do the following steps:
    1. Enter AzureSqlDB for the Name field.
    2. Select your SQL server for the Server name field.
    3. Select your SQL database for the Database name field.
    4. Enter name of the user for the User name field.
    5. Enter password for the user for the Password field.
    6. Click Test connection to test the connection.
    7. Click Create to save the linked service.

 

  • Create a dataset to represent source data
  1. In Author, click + (plus) and Dataset
  2. Select Azure SQL Database and continue.
  3. In the Set properties tab, set the dataset name and connection information:
    1. Insert AzureSQLCDCCustomers for Name.
    2. Select AzureSqlDB for Linked service.
    3. Select [dbo].[dbo_customers_CT] for Table name. Note: this table was automatically created when CDC was enabled on the customers table. Changed data is never queried from this table directly but is instead extracted through the CDC functions.
    4. Click OK.

 

  • Create a dataset to represent data copied to sink data store
  1. In the treeview, click + (plus), and click Dataset.
  2. Select Azure Blob Storage, and click Continue.
  3. Select DelimitedText, and click Continue.
  4. In the Set Properties tab, set the dataset name and connection information:
    1. Select AzureStorageLinkedService for Linked service.
    2. Enter raw for container part of the filePath.
    3. Enable First row as header
    4. Click Ok

 

  • Create a pipeline to copy the change data
  1. In the treeview, click + (plus), and click Pipeline.
  2. Change the pipeline name to IncrementalCopyPipeline
  3. Expand General in the Activities toolbox, and drag-drop the Lookup activity to the pipeline designer surface. Set the name of the activity to GetChangeCount. This activity gets the number of records in the change table for a given time window.
  4. Switch to the Settings in the Properties window:
    1. Specify the SQL DB dataset name for the Source Dataset field.
    2. Select the Query option and enter the following into the query box:

DECLARE  @from_lsn binary(10), @to_lsn binary(10); 

SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); 

SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal',  GETDATE());

SELECT count(1) changecount FROM cdc.fn_cdc_get_all_changes_dbo_customers(@from_lsn, @sto_lsn, 'all')

             3. Enable First row only

  1. Click the Preview data button to ensure a valid output is obtained by the lookup activity.
  2. Expand Iteration & conditionals in the Activities toolbox, and drag-drop the If Condition activity to the pipeline designer surface. Set the name of the activity to HasChangedRows.
  3. Switch to the Activities in the Properties window:
    1. Enter the following Expression: @greater(int(activity('GetChangeCount').output.firstRow.changecount),0)
    2. Click on the pencil icon to edit the True condition.
    3. Expand General in the Activities toolbox and drag-drop a Wait activity to the pipeline designer surface. This is a temporary activity in order to debug the If condition and will be changed later in the tutorial.
    4. Click on the IncrementalCopyPipeline breadcrumb to return to the main pipeline.
  1. Run the pipeline in Debug mode to verify the pipeline executes successfully.
  1. Next, return to the True condition step and delete the Wait activity. In the Activities toolbox, expand Move & transform, and drag-drop a Copy activity to the pipeline designer surface. Set the name of the activity to IncrementalCopyActivity.
  2. Switch to the Source tab in the Properties window, and do the following steps:
    1. Specify the SQL dataset name for the Source Dataset field.
    2. Select Query for Use Query.
    3. Enter the following for Query.

DECLARE @from_lsn binary(10), @to_lsn binary(10);

SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers');

SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE());

SELECT * FROM cdc.fn_cdc_get_all_changes_dbo_customers(@from_lsn, @to_lsn, 'all')

               4. Click preview to verify that the query returns the changed rows correctly.

               5. Switch to the Sink tab, and specify the Azure Storage dataset for the Sink Dataset field.

               6. Click back to the main pipeline canvas and connect the Lookup activity to the If Condition activity one by one. Drag the green button attached to the Lookup activity to the If Condition activity.

               7. Click Validate on the toolbar. Confirm that there are no validation errors. Close the Pipeline Validation Report window by clicking >>.

               8. Click Debug to test the pipeline and verify that a file is generated in the storage location.

               9. Publish entities (linked services, datasets, and pipelines) to the Data Factory service by clicking the Publish all button. Wait until you see the Publishing succeeded message.

 

This process is very similar to moving change data from Azure SQL MI to Blob Storage, which is documented here: Incrementally copy data using Change Data Capture - Azure Data Factory | Microsoft Docs

 

 

Blog Series for Change Data Capture in Azure SQL Databases

We are happy to continue the bi-weekly blog series for customers who’d like to learn more about enabling CDC in their Azure SQL Databases! This series explores different features/services that can be integrated with CDC to enhance change data functionality.

 

Posted at https://sl.advdat.com/36uksBT