Jump to content

Recommended Posts

Guest BatuhanTuter
Posted

largevv2px999.png.fce368a93921b4e0efe4eefbb5a399cd.png

 

 

 

 

 

In this blog post, I am going to show you how to implement a Change Data Capture (CDC) pipeline to continuously move data from SAP sources into Azure Data Lake storage in Delta file format. You will also learn about load parallelization, checkpointing, and several best practices.

 

 

 

There is a nice and short documentation created by Microsoft to walk you through how to Transform data from an SAP ODP source with the SAP CDC connector in Azure Data Factory or Azure Synapse Analytics — Azure Data Factory & Azure Synapse | Microsoft Learn. I believe it is important to read and get familiar with fundamental concepts, and I will add some details learned from my experiences in this blog.

 

 

 

What is Change Data Capture?

 

In a nutshell, CDC is keeping track of the changes that have happened to your data (e.g., update, insert, delete) since the last time you interacted with your data source.

 

 

 

How to implement CDC on Azure?

 

Azure has CDC connectors to the following SAP sources:

 

824x136vv2.png.9572f0004af5663a3b1f6f2a3d7ef122.png

 

 

 

These connectors are available both in Azure Data Factory and Azure Synapse Analytics Pipelines. I preferred to use Synapse for this blog since it offers two capabilities out of the box: having an integrated storage browser to check the result datasets, and having a Serverless SQL Pool available to query the data from the target Azure Data Lake storage easily. You can also use Azure Data Factory to implement this solution in combination with Azure Storage Explorer for browsing and some analytics engine (e.g., Synapse, Fabric, Databricks) for querying your sink data.

 

 

 

545x224vv2.png.9017e6bc9c1d4e279160b17c86307c38.png

 

 

As you can see in the above picture, a CDC pipeline in Azure Data Factory or Synapse uses a Self-Hosted Integration Runtime to connect to the SAP source and utilizes SAP’s ODP API. Hence, the capabilities of the connector are limited to the capabilities of the ODP API.

 

 

 

In this blog post, you will:

 

  1. Create a CDC Data Flow
  2. Create a CDC Pipeline
  3. Write Delta files from Azure Data Lake Storage

 

Prerequisites:

 

 

 

 

1. Create a CDC Data Flow

 

 

If you are new to creating Data Flows in Synapse, please check Data flows — Azure Synapse Analytics | Microsoft Learn.

 

515x331vv2.png.330ca9e5f4fb2bb4a27c1da46dd01d24.png

 

 

 

Our data flow should have three blocks: an SAP source, a Derived Column schema modifier, and a sink to Azure Data Lake storage with Delta file format. I also have 4 parameters defined that will be used in this data flow. If you don know what ODP Name, Context, and KeyColumns are used for, you can read more about these parameters here: Parameterizing source and run mode.

 

 

a. SAP Source

 

541x473vv2.png.bd91b8e1cb6085aa8d9fdeee56dcf0e7.png

 

 

 

Choose SAP CDC as the Inline Dataset Type and use the Linked Service you have created for your SAP source as a prerequisite. It is important to enable “Allow Schema Drift” option, which will allow all incoming fields from your source to flow through the transformations to the sink.

 

 

 

Source Options and Run Mode

 

510x301vv2.png.1bf917a2c1feee1fa131533454ef081e.png

 

 

 

Pass the defined parameters to the respective source options.

 

 

 

There are three run modes you can choose from (source)

 

  • Full on every run: If you want to load full snapshots on every execution of your mapping data flow.
  • Full on the first run, then incremental: If you want to subscribe to a change feed from the SAP source system including an initial full data snapshot. In this case, the first run of your pipeline performs a delta initialization, which means it creates an ODP delta subscription in the source system and returns a current full data snapshot. Subsequent pipeline runs only return incremental changes since the preceding run.
  • Incremental Changes Only: Creates an ODP delta subscription without returning an initial full data snapshot in the first run. Again, subsequent runs return incremental changes since the preceding run only.

 

I choose the second option for this blog so that I can demo the full (first) and incremental (subsequent) runs.

 

 

 

Optimization and Partitioning

 

536x339vv2.png.59c01d9ea3243cbebd5ce77183d49251.png

 

 

 

Under the Optimize tab, you can set the partitioning mechanism. Source partitioning is very important since this is where you optimize your performance. As explained here: “This option allows you to specify multiple partition (that is, filter) conditions to chunk a large source data set into multiple smaller portions. For each partition, the SAP CDC connector triggers a separate extraction process in the SAP source system”. This is the best way to speed up your extraction process but be aware, at the same time it will consume more resources on the SAP system. It is very crucial to select a good column to partition that evenly distributes the data. On the other hand, having too many partitions will overload your SAP source and you might not have performance benefits. There are plenty of resources online that talk about how to choose a good partitioning key.

 

 

 

Even though having a nice partitioning mechanism can increase your throughput, you should also closely monitor the utilization of the virtual machine hosting your SHIR and the SAP source system, to make sure that you have sufficient resources at all times.

 

I selected the “SALESDOCUMENTITEMTEXT” column to partition and defined two partitioning conditions: one partition has all the rows that are equal to “Y120 Bike”, and the other partition has all the rows that are equal to “Y200 Bike”, just as an example. There are 6 operators you can choose to implement your partitioning logic. It is important to note that, as you probably realized, you need to fill in all the partition values. My pipeline will only be processing rows that have “Y120 Bike” or “Y200 Bike” under the specified column, the rest of the rows will be dropped.

 

 

 

If you are going to reuse one data flow to extract data from multiple sources dynamically, you can choose a more generic column, such as a Date column. If you are going in this direction, I suggest you store the source options (ODP context, name, and key columns); and their respective partitioning column names and values in a dedicated meta-store (e.g, Cosmos DB) or in a configuration file to make your data flow metadata-driven, which will then be used as parameters.

 

You can read more about source partitioning here: Parametrizing the filter conditions for source partitioning.

 

 

b. Derived Column

 

575x305vv2.png.b2bb12d092373e7c7ea37bf9605de451.png

 

 

 

In this schema modifier, I am adding two columns to the end of the data that will be extracted from the SAP source

 

 

 

The “_operation_type” column is created using the following expression:

 

iif(isDelete(), ‘D’, iif(isUpdate(), ‘U’, iif(isUpsert(), ‘S’, iif(isInsert(), ‘I’, ‘X’))))

 

This means that: if a row is deleted it will have a ‘D’, if updated then ‘U’, if upserted then the ‘S’, and if a new row is inserted then it will have an ‘I’ flag. You can find the list of built-in Data Flow expression functions documented here.

 

 

 

The “_processed_time” column is the timestamp of the CDC process when it runs, using the "currentUTC()" expression function.

 

 

c. Sink

 

558x413vv2.png.39178ea5106c592fd825459d4cf63d4a.png

 

 

 

I prefer to use Delta as the dataset type and enable schema drift. If a new column is added to the source, we will also see this new column appearing in the target delta files, with the old rows having "NULL" values under that column.

 

532x423vv2.png.382a463b37f495fbb298cc5be0273f2b.png

 

 

 

I prefer using the “Allow insert’ update method only so that any change (e.g., I, U, S, D) can be appended to the target delta files as a new row. I prefer this method because it will be easier to merge these changes later in a subsequent pipeline, using the “_processed_ time” column we added in the previous step. You can think about this merging step as going from the Bronze layer to the Silver layer if you are familiar with the medallion architecture.

 

 

 

I am using the “CDSViewName” parameter in the folder path expression so that the delta files are created under the respective folder.

 

For optimizing your Delta sink, you can check out: Delta Sink Optimization Options.

 

 

 

2. Create a CDC Pipeline

 

517x399vv2.png.4e5b619b286f8084335c19e3bfdaa3be.png

 

Create a pipeline for your data flow from the previous step and create 5 parameters. I will shortly explain the “checkpoint id” parameter.

 

 

 

489x444vv2.png.c463e6b724cc38c5cb392d9845025847.png

 

 

 

Under the Data Flow Settings, there are two very important concepts to explain: checkpoint key and staging.

 

 

 

Checkpoint Key

 

As explained here, the “Checkpoint Key is used by the SAP CDC runtime to store status information about the change data capture process. This, for example, allows SAP CDC mapping data flows to automatically recover from error situations, or know whether a change data capture process for a given data flow has already been established. It is therefore important to use a unique Checkpoint Key for each source.”

 

I implement my checkpoint key with the following formula:

 

@base64(concat(pipeline().parameters.p_odp_name,pipeline().parameters.p_checkpoint_id))

 

This means that I am string concatenating two parameters (the ODP name and the checkpoint id) and then creating a base64 hash value out of this concatenated string.

 

The documentation says each source should have a unique key, which means that each ODP name needs a unique key so that the connector can establish a separate CDC process with each source. This is crucial if you are implementing a dynamic pipeline to process multiple sources in one data flow.

 

The second string I used to create the key is the checkpoint id. This is a parameter I have introduced to be able to control the CDC process of each source manually, so that I can trigger full extractions anytime I want, by simply incrementing the id.

 

 

 

 

 

Example: Let’s say there are 100 rows in the source view and the checkpoint_id=1.

When we start the pipeline for the first time, it will do a full extraction and write 100 rows to the sink.

If we keep the id=1 and retrigger the pipeline immediately after, the pipeline will not see any new rows, hence will not do anything (assuming no changes have been made to the view in this time frame).

 

 

Say there are 10 new rows in the source view, sometime later.

- If we keep the id=1 and retrigger the pipeline, the pipeline will see the 10 new rows and append them to the delta files.

- If we increment the id to 2 and retrigger the pipeline, the pipeline will start a new CDC process (full extraction) and will extract 110 rows.

 

 

It is also important to note that if you decrement the id from 2 to 1, it will continue the CDC process where it left off for id=1, instead of running a full extraction.

 

As I mentioned before, you can have a separate meta store to store the {ODP Name, Checkpoint ID} pair to track the history of all the ids used for each source.

 

 

 

TIP: There are two methods to run a pipeline: Debug and Trigger. Please be aware that these two methods create separate CDC processes (subscriptions) even though you use the same checkpoint key. For example, if you debug a process for the first time, you will see a full data extraction. If you trigger the process after the full extraction, you will see a full extraction again, instead of the changes only. This is documented here.

 

 

 

Staging

 

CDC connector will first load the data from the source into a staging folder of your choice. This staging folder will have all the changes that have happened since the last time the pipeline has run, then the connector will deduplicate these changes. Data will be written into the sink path after the deduplication.

 

 

 

For example, if a value of “A” is updated to “B” and then to “C”, upon triggering the CDC pipeline, we will only see the value of “C” due to this deduplication process.

 

You have no way of influencing (or disabling) this deduplication process. If you want to know all the changes that have happened, then you need to run your pipeline continuously, via scheduling, with close time intervals.

 

 

 

I suggest not using the data files stored in the staging folder. The reason is that staging is maintained by the connector. These data files can be deleted or changed (e.g., file extensions or compression types) by the connector at any time.

 

 

 

3. Create Delta files in Azure Data Lake

 

 

Now it’s time to run our pipelines. For simplicity, I will Debug my pipeline instead of triggering it. To debug, you need to turn on the Data flow debug cluster.

 

 

 

Initial Run (Full Extraction)

 

After running the pipeline for the first time, I see that my pipeline has finished successfully.

 

595x342vv2.png.011bd9f1091c1cf17aed72490e7bb6ef.png

 

 

 

I can use the integrated data explorer tool to look into my sink folder and see the created delta files.

 

570x238vv2.png.2acbe0af4b7b063ceb79957955e8216e.png

 

 

 

Then I can use Synapse Serverless SQL Pools to query this data by right-clicking and creating a new SQL script to look at the top 100 rows. This will automatically open a new window with a SQL script, which you can then run. If you look at the second column to the end, the operation type, you will see that all these new rows have a “S” flag. This is because the connector treats inserts and updates as upserts at the moment, which is a limitation that needs to be fixed in my opinion. I would have expected them to have an ‘I’ flag instead.

 

 

 

561x306vv2.png.520a7c34cbad11971734746a4214931f.png

 

 

 

If I group the results by the “SALESDOCUMENTITEMTEXT” column, I will see that there are only two distinct values, which I have specified previously as the partition values.

 

 

 

TIP: You can use this source partitioning mechanism as a selective CDC process if you want to get only a specific part of the full data from your SAP source into the data lake.

 

 

 

549x388vv2.png.9bed3578b29b15149d8aa8193a73e868.png

 

 

 

If you go to the Monitor page of Synapse and then look at the diagnostics of your data flow, you can see some useful metrics:

 

526x440vv2.png.76322ded96c5d5ad2d4f49e95f0ac6b4.png

 

 

 

You might, at some point, see that the number of rows copied from SAP to staging (rows copied — bottom line) is larger than the number of rows written into the sink folder (row calculated — top line). This happens due to the deduplication process I mentioned earlier.

 

 

 

Second Run (New Row)

 

Let’s update our SAP source and change the document with id SALESDOCUMENT=24676. I will create a new item with order quantity =1. Even though I added one item to the document, there are two changes for that document: the existing item and the new item are both considered to be changed, since the “SALESDOCUMENT” is one of the Key Columns defined in my parameters.

 

624x324vv2.png.cc1637b0fb21d23d2994fc53a56811ac.png

 

 

 

If we run the pipeline, we will see that there are 2 rows calculated, which are the changed items under that document:

 

600x329vv2.png.bcf495a42caa84f46f58902ce4bc7ea0.png

 

 

 

You might have realized all the columns are flagged as “New”, which I think is a UI bug that needs to be fixed.

 

 

 

If you check the data lake, you will see a new partition created in the folder.

 

834x193vv2.png.45b2b67959cc49258f5b77d84315dee0.png

 

 

 

If we query the data with a filter on the document, we will see three rows with two different processed time stamps. The top row is from the initial run, the other two are from this second run. The reason why we have three rows is that I have selected only the “Allow insert” method under the sink settings. Pay attention to the detail where I used the ‘*’ sign to query all the partitions under the folder since I have more than one partition now.

 

633x284vv2.png.02bc62011721ed1fd277565730a799a0.png

 

 

 

Again, all of these rows have the same ‘S’ flag. I would expect the second row to have an ‘I’ flag since we only inserted it, which I think needs fixing.

 

 

 

Third Run (Update)

 

Let’s change the order quantity of our new item from 1 to 2.

 

692x327vv2.png.3f29460be11d8590c11b5715a5311b9d.png

 

 

 

If we rerun the pipeline, we will again see 2 new rows calculated by the CDC connector. If we query the data, we will see 5 rows in total.

 

610x337vv2.png.6c3954c7f069e701b11df5cb9dff17d0.png

 

 

 

Again, I would expect the fourth row to have a ‘U’ flag since we only updated it. I think this needs fixing as well.

 

 

 

Fourth Run (Delete)

 

This time I deleted the item I have created and updated in the previous steps.

 

682x313vv2.png.09590c794878969137bc46c5f7f533aa.png

 

 

 

If we rerun the pipeline, we will again see 2 new rows calculated by the CDC connector. If we query the data, we will have 7 rows in total, where the one that we deleted has a ‘D’ flag.

 

642x332vv2.png.8eee0906857e34857c31c24c831bd770.png

 

 

 

For now, we need to live with only having ‘S’ and ‘D’ flags, until this is fixed. However, using the "_processed_time" column, one can merge these changes into the Silver layer in a separate pipeline.

 

 

 

Wrap up

 

 

I hope you found this blog useful. If you have any remarks please leave a comment. It will motivate me to write a follow-up blog where I can explain how to build a metadata-driven pipeline, by querying the DD03ND table to retrieve Key Column names, and the DDHEADANNO table to create ODP Names dynamically.

 

 

 

Here are some other blogs that are relevant:

 

 

Continue reading...

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

×
×
  • Create New...