S
SamPanda
Background
In this blog post, we will explore the use of the SAP CDC connector within Azure Data Factory/Azure Synapse Analytics to extract data from different SAP Frameworks by leveraging metadata tables. In the following example, we will extract data from the S4 HANA Finance system and save it as a delta table in ADLS Gen2. To read and report on the delta table, we will explore the capabilities of Microsoft Fabric.
Table of Contents
Introduction to SAP CDC Connector
Different SAP Frameworks and SAP CDC connector support
Important Concepts
Demo: Metadata-driven pipeline
Introduction to SAP CDC Connector
SAP CDC connector enables the possibility of connecting SAP applications to various Azure services. The connector is part of Azure Data Factory and Azure Synapse Analytics.
Working with customers so far with available data extraction SAP connectors in ADF/Synapse, we constantly got feedback that there should be an easy way to extract only the incremental/recently loaded data from SAP sources to the Azure data services without extra maintenance and overhead.
You can use a manual, limited workaround to extract mostly new or updated records. In a process called watermarking, extraction requires using a timestamp column, monotonically increasing values, and continuously tracking the highest value since the last extraction. But some tables don't have a column that you can use for watermarking. This process also doesn't identify a deleted record as a change in the dataset.
To address and overcome these challenges, SAP CDC connector comes in handy and helps us to maintain the connectivity to our SAP sources and also extract the delta changes in a much easier way.
Different SAP Frameworks and SAP CDC connector support
SAP CDC connector works on ODP framework, and it supports all SAP systems - ECC, S/4HANA, BW, BW/4HANA, HANA etc., irrespective of location (On-Prem, Multi-Cloud, RISE, etc.)
Change data capture (CDC) | SAP Help Portal
The CDC process of data extraction uses the ODP framework to replicate the delta in a SAP source dataset.
Data extraction via ODP requires a properly configured user on SAP systems. The user must be authorized for ODP API invocations over Remote Function Call (RFC) modules. The user configuration is the same configuration that's required for data extractions via ODP from SAP source systems into BW or BW/4HANA.
Important Concepts
Checkpoint
The delta tracking or the CDC is mainly and only governed by checkpoint key for SAP CDC pipeline runs. Hence the checkpoint key is particularly important for the incremental delta extractions of data and to track the history of data extracted for each ODP object. Please be very careful when selecting the checkpoint key and try to use a unique checkpoint key for each ODP object. In this demo we will be parameterizing the checkpoint key for each ODP object.
SAP CDC advanced topics - Azure Data Factory | Microsoft Learn
Change type indicator.
The change type indicator basically means the type of operation that happened on the data, it can be insert, update, upsert or delete. To track the type of operation that happened on our source data we will use a derived column transformation in our dataflow and see those results.
Staging location
When creating a pipeline for SAP CDC extraction process you will have to point out a staging location. This can be your ADLS gen2 storage account, this serves as an intermediate layer for the extracted data from SAP source to do any optimizations or transformations on the data. Please be mindful of not using this staging location for any other purposes like copy activity or data flows, etc.
Run Mode
The CDC connector supports three types of run modes:
Depending upon your business needs you can set this to be the most appropriate one for you.
Read more about this here: SAP CDC advanced topics - Azure Data Factory | Microsoft Learn
In this demo we will be showcasing both the behaviors ‘Full on the first run, then incremental’ and ‘Full on every run’.
Demo: Metadata-driven pipeline
Objective
In this example, we are going to explore the following.
Prerequisites
We have detailed documentation (here) regarding the prerequisites that we need before using the SAP CDC connector. Here are some high-level prerequisites.
SAP Pre-requisites:
Minimum SAP system requirements:
For ODP-based extraction scenarios:
For ABAP CDS-based extraction scenarios.
For SLT based scenarios:
Configure Integration User:
To connect the SAP system (Source System) to ADF, we would require a user on the SAP system that can be used in the linked service configuration.
Data extractions via ODP require a properly configured user on SAP systems. The user must be authorized for ODP API invocations over Remote Function Call (RFC) modules. The user configuration is the same configuration that's required for data extractions via ODP from SAP source systems into BW or BW/4HANA.
Data Flow Diagram
In this demo, we will be using the SAP S4 HANA 2021 CAL version as a source system and loading the data in delta format in ADLS gen2 storage account. We will eventually extend this to Microsoft Fabric using OneLake shortcut functionality and demonstrate the report on the extracted data.
Resource Provisioning
Set up the linked service.
Step-by-step guidance is mentioned here.
It's essential to emphasize that the subscriber’s name must be unique, as the SAP system identifies the connector based on this name. This uniqueness greatly simplifies monitoring and troubleshooting of the connector within the SAP environment, particularly within the Operation Data Queue (ODQMON).
In this setup the SAP Servers are hosted in Azure VM which are inside the Azure Virtual Network, hence we are providing the Private IP address in the Server Name.
Creating the metadata tables.
In this case, we need 2 metadata tables.
IF OBJECT_ID(N'[dbo].[tbl_metadata]', 'U') IS NOT NULL
DROP TABLE [dbo].[tbl_metadata];
GO
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[tbl_metadata]') AND type in (N'U'))
CREATE TABLE [dbo].[tbl_metadata] (
checkPointKey varchar(64) NOT NULL,
sapContext varchar(64) NOT NULL,
sapObjectName varchar(64) NOT NULL,
sapRunMode varchar(255) NOT NULL,
sapKeyColumns varchar(255) NOT NULL,
sapPartitions varchar(255) NULL,
destdeltaContainer varchar(50) NOT NULL,
deltaKeyColumns varchar(255) NOT NULL,
destdeltaFolder varchar(50) NOT NULL,
deltaPartition varchar(250) NULL,
stagingStorageFolder varchar(255),
columnSelectionAndMappingIdentifier varchar(50)
)
Truncate table dbo.tbl_metadata
GO
INSERT INTO dbo.tbl_metadata
SELECT
'CheckPointFor_CMMPOITMDX$F-1' AS checkPointKey,
'ABAP_CDS' AS sapContext,
'CMMPOITMDX$F' AS sapObjectName,
'fullAndIncrementalLoad' AS sapRunMode,
' ["PURCHASEORDER", "PURCHASEORDERITEM"]' AS sapKeyColumns,
'[ [{ "fieldName": "PURCHASEORDER", "sign": "I", "option": "BT", "low": "4500000001" , "high": "4500001000" }]]' AS sapPartitions,
'datafs' AS destdeltaContainer,
'["PURCHASEORDER", "PURCHASEORDERITEM"]' AS deltaKeyColumns,
'dest/delta_tables/CMMPOITMDX_F_sel' AS deltaFolder,
'PURCHASEORDER' AS deltaPartition,
'adf_staging/CMMPOITMDX_F_sel' AS stagingStorageFolder,
'CMMPOITMDX_F_sel' AS columnSelectionAndMappingIdentifier
GO
INSERT INTO dbo.tbl_metadata
SELECT
'CheckPointFor_CSDSLSDOCITMDX1$' AS checkPointKey,
'ABAP_CDS' AS sapContext,
'CSDSLSDOCITMDX1$F' AS sapObjectName,
'fullAndIncrementalLoad' AS sapRunMode,
' [ "SALESDOCUMENT", "SALESDOCUMENTITEM"]' AS sapKeyColumns,
'' AS sapPartitions,
'datafs' AS destdeltaContainer,
' [ "SALESDOCUMENT", "SALESDOCUMENTITEM"]' AS deltaKeyColumns,
'dest/delta_tables/CSDSLSDOCITMDX1_F_sel' AS deltaFolder,
'' AS deltaPartition,
'adf_staging/CSDSLSDOCITMDX1_F_sel' AS stagingStorageFolder,
'CSDSLSDOCITMDX1_F_sel' AS columnSelectionAndMappingIdentifier
IF OBJECT_ID(N'[dbo].[tbl_columnSelectionAndMapping]', 'U') IS NOT NULL
DROP TABLE [dbo].[tbl_columnSelectionAndMapping];
GO
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[tbl_columnSelectionAndMapping]') AND type in (N'U'))
CREATE TABLE tbl_columnSelectionAndMapping(
columnSelectionAndMappingIdentifier varchar(50),
prevColumnName varchar(50),
newColumnName varchar(50),
PRIMARY KEY (columnSelectionAndMappingIdentifier, prevColumnName)
)
/*
inserting the columns names and mapping.
*/
TRUNCATE TABLE tbl_columnSelectionAndMapping
GO
INSERT INTO tbl_columnSelectionAndMapping
Select 'CSDSLSDOCITMDX1_F_sel' ,'SALESDOCUMENT' ,'SALESDOCUMENT'
UNION ALL
Select 'CSDSLSDOCITMDX1_F_sel' ,'SALESDOCUMENTITEM' ,'SALESDOCUMENTITEM'
UNION ALL
Select 'CSDSLSDOCITMDX1_F_sel' ,'SDDOCUMENTCATEGORY' ,'CATEGORY'
GO
INSERT INTO tbl_columnSelectionAndMapping
Select 'CMMPOITMDX_F_sel' ,'PURCHASEORDER' ,'PURCHASEORDER'
UNION ALL
Select 'CMMPOITMDX_F_sel' ,'PURCHASEORDERITEM' ,'PURCHASEORDERITEM'
UNION ALL
Select 'CMMPOITMDX_F_sel' ,'PURCHASINGORGANIZATION' ,'ORG'
Creation of Metadata-driven Pipeline
Here is the pipeline that we are going to create for this use case:
Lookup Activity:
The Lookup activity reads the data from tbl_metadata and supplies the value to the ForEach Activity.
ForEach Activity:
DataFlow Activity
Here are the DataFlow top-level Settings from the pipeline view.
The parameters can be declared from the DataFlow pane, and the values can be assigned from the pipeline view. The parameter values are assigned from the ForEach item values.
Here is the different component view of the Dataflow.
Please note that in this data flow, we have 2 sinks.
ColumnMapping source
This source is used to read the data from the other metadata table that we have for the column mapping and selection.
cachedSinkColumnSelection sink
We are caching the output from the mapping table, so that we can use this in the main dataflow activities.
Source1
We are reading the source data from the SAP Finance system here. More detail about the parametrization of the fields can be found here.
Select1
Using the select transformation, we select the required columns from the source data, and perform the column renaming / mapping before saving the data into sink.
In the expression builder put the below expression
Matching condition: !isNull(cachedSinkColumnSelection#lookup(name).prevColumnName)
cachedSinkColumnSelection#lookup($$).newColumnName
Output column name expression: cachedSinkColumnSelection#lookup($$).newColumnName
Sink1
The data from the select1 is merged (insert/update) with the target delta table.
(Optional) - derivedColumn1ChangeEvent
In various situations, we opt to develop our custom data transformation logic by leveraging change events (update, insert, delete events). For instance, if we aim to establish a Slowly Changing Dimension Type 2 (SCD Type 2) within our data model, the standard merging process in the mapping dataflow in ADF may not suffice, requiring the creation of our unique logic for implementation. During the development of the data ingestion process, it becomes crucial to understand how alterations occurred in the SAP system. These change-events enable us to replicate these events at the Azure End effectively. This is precisely where the utilization of the changeEvent-derived column becomes imperative. Here is the expression for the changeEvent derive column: case(isInsert(),"I",isDelete(), "D", isUpdate(), "U", isUpsert(), 'U')
(Optional) – sinkFlatfiles
File name: concat($sapObjectName, '-', toString(currentTimestamp(), 'yyyy-MM-ddHHmmss'), '.csv')
Execution of the Pipeline
Note: When you run a pipeline it’s good practice to trigger the pipeline and not run it using debug option. With debug option there is a dependency with the UI session for the subscriber process. The checkpoint key used for extraction using debug will change when a UI session gets switched making the extraction process for delta tracking reset. You can read more about it here. The best option would be to trigger the pipeline rather debug.
Full Extraction.
We will now execute the pipeline for the first time. For this example, we are retrieving the data from 2 tables. The first run is going to be full extract.
The run mode in the mapping table is kept FullandIncremental for the table: CSDSLSDOCITMDX1$F, and Full for CMMPOITMDX$F.
Delta Queues Monitoring (ODQMON):
Here we can see the first load as mentioned as the “Initial Load”. And count 6499 is matching with the count that we got in the dataflow in the previous screenshot.
Incremental/Delta Extraction.
Now, we are changing the record in a sales order table. The data also can be changed from the SAP Frontend (GUI or WEBGUI) by altering the transaction data.
Once we run the adf pipeline, we can see that for the item table it’s a full load again (as per the mapping table), and for the sales table, we have the incremental load.
From the ODQMON, let’s look at the last transaction.
So, if we look at the flat file, we will be able to see the same changes with the change event.
Visualization of the Data in Microsoft Fabric
Since the data is in delta format in ADLS Gen2, we just need to create a shortcut to the storage into Lakehouse in Microsoft Fabric. Once this is done, we will be able to explore the data using the Microsoft Fabric spark engine/ sql engine.
Attaching the Delta tables into Microsoft Fabric Lakehouse.
How to create a shortcut to a Delta table.
Please take note that in order to enable the data to automatically refresh upon completion of data writing by the Azure Data Factory, it is necessary to establish a table shortcut, rather than using the managed table from the File shortcut.
Conclusion
In this comprehensive demonstration, we illustrated the process of extracting data from SAP through a metadata-driven pipeline. While we didn't delve deeply into data transformation here, you have the flexibility to use DataFlow transformation activities or a Spark notebook to implement complex transformation logic before loading the data into a Delta table. If your use case aligns with the Extraction (E) – Load (L) – Transformation (T) workflow, you can certainly follow the aforementioned tutorial and then use a Spark notebook to create your business logic on top of the Delta table, thereby preparing it for the serving layer or presentation layer. We trust that this blog post will serve as a valuable starting point for constructing your pipeline to import SAP data into Azure.
Continue reading...
In this blog post, we will explore the use of the SAP CDC connector within Azure Data Factory/Azure Synapse Analytics to extract data from different SAP Frameworks by leveraging metadata tables. In the following example, we will extract data from the S4 HANA Finance system and save it as a delta table in ADLS Gen2. To read and report on the delta table, we will explore the capabilities of Microsoft Fabric.
Table of Contents
Introduction to SAP CDC Connector
Different SAP Frameworks and SAP CDC connector support
Important Concepts
Demo: Metadata-driven pipeline
Introduction to SAP CDC Connector
SAP CDC connector enables the possibility of connecting SAP applications to various Azure services. The connector is part of Azure Data Factory and Azure Synapse Analytics.
Working with customers so far with available data extraction SAP connectors in ADF/Synapse, we constantly got feedback that there should be an easy way to extract only the incremental/recently loaded data from SAP sources to the Azure data services without extra maintenance and overhead.
You can use a manual, limited workaround to extract mostly new or updated records. In a process called watermarking, extraction requires using a timestamp column, monotonically increasing values, and continuously tracking the highest value since the last extraction. But some tables don't have a column that you can use for watermarking. This process also doesn't identify a deleted record as a change in the dataset.
To address and overcome these challenges, SAP CDC connector comes in handy and helps us to maintain the connectivity to our SAP sources and also extract the delta changes in a much easier way.
Different SAP Frameworks and SAP CDC connector support
SAP CDC connector works on ODP framework, and it supports all SAP systems - ECC, S/4HANA, BW, BW/4HANA, HANA etc., irrespective of location (On-Prem, Multi-Cloud, RISE, etc.)
Change data capture (CDC) | SAP Help Portal
The CDC process of data extraction uses the ODP framework to replicate the delta in a SAP source dataset.
Data extraction via ODP requires a properly configured user on SAP systems. The user must be authorized for ODP API invocations over Remote Function Call (RFC) modules. The user configuration is the same configuration that's required for data extractions via ODP from SAP source systems into BW or BW/4HANA.
Important Concepts
Checkpoint
The delta tracking or the CDC is mainly and only governed by checkpoint key for SAP CDC pipeline runs. Hence the checkpoint key is particularly important for the incremental delta extractions of data and to track the history of data extracted for each ODP object. Please be very careful when selecting the checkpoint key and try to use a unique checkpoint key for each ODP object. In this demo we will be parameterizing the checkpoint key for each ODP object.
SAP CDC advanced topics - Azure Data Factory | Microsoft Learn
Change type indicator.
The change type indicator basically means the type of operation that happened on the data, it can be insert, update, upsert or delete. To track the type of operation that happened on our source data we will use a derived column transformation in our dataflow and see those results.
Staging location
When creating a pipeline for SAP CDC extraction process you will have to point out a staging location. This can be your ADLS gen2 storage account, this serves as an intermediate layer for the extracted data from SAP source to do any optimizations or transformations on the data. Please be mindful of not using this staging location for any other purposes like copy activity or data flows, etc.
Run Mode
The CDC connector supports three types of run modes:
- Full on the first run, then incremental
- Full on every run
- Incremental changes only
Depending upon your business needs you can set this to be the most appropriate one for you.
Read more about this here: SAP CDC advanced topics - Azure Data Factory | Microsoft Learn
In this demo we will be showcasing both the behaviors ‘Full on the first run, then incremental’ and ‘Full on every run’.
Demo: Metadata-driven pipeline
Objective
In this example, we are going to explore the following.
- Loading data from multiple SAP Objects from a single pipeline using the metadata table.
- Dynamically selecting columns and rows on the SAP objects.
- Parameterizing all the configuration values in the Dataflow.
- Dynamic column mapping using the metadata table.
- Dynamic filter condition while extracting the data from the SAP system.
- Capturing the change events from the SAP Source system (like: Update/ INSERT/ DELETE)
Prerequisites
We have detailed documentation (here) regarding the prerequisites that we need before using the SAP CDC connector. Here are some high-level prerequisites.
- Setting up the Self-hosted Integration runtime.
- Set up the operational data provisioning (ODP) framework.
SAP Pre-requisites:
Minimum SAP system requirements:
For ODP-based extraction scenarios:
- SAP system should be at the minimum level of SAP NetWeaver 7.0 SPS 24 & above.
For ABAP CDS-based extraction scenarios.
- SAP system should run on SAP NetWeaver 7.4 SPS08 & above. To support delta extractions based on ABAP CDS, it is required for the SAP systems to be running minimum on SAP NetWeaver 7.5 SPS05 & above.
For SLT based scenarios:
- Ensure your SLT system is running on minimum SAP NetWeaver 7.4 SPS08 & above and DMIS 2011 SP 05 or above is installed as add-on on the SLT server & source system (if standalone deployment is followed).
Configure Integration User:
To connect the SAP system (Source System) to ADF, we would require a user on the SAP system that can be used in the linked service configuration.
Data extractions via ODP require a properly configured user on SAP systems. The user must be authorized for ODP API invocations over Remote Function Call (RFC) modules. The user configuration is the same configuration that's required for data extractions via ODP from SAP source systems into BW or BW/4HANA.
- Create a user as per the customer/partner naming convention. Ensure the user type is set to “SYSTEM”.
- Assign the SAP standard profiles S_BI-WHM_RFC, S_BI-WX_RFC for the user.
- If there are any security restrictions that prevent customers/partners from assigning SAP standard profiles to users, then follow the below-mentioned SAP Notes to create custom roles and assign the same.
- 2855052 - To authorize ODP API usage
- 460089 - To authorize ODP RFC invocations
Data Flow Diagram
In this demo, we will be using the SAP S4 HANA 2021 CAL version as a source system and loading the data in delta format in ADLS gen2 storage account. We will eventually extend this to Microsoft Fabric using OneLake shortcut functionality and demonstrate the report on the extracted data.
Resource Provisioning
- Active S4 HANA system for source data.
- Azure Data Factory Instance. The same can be implemented in the Azure Synapse Analytics workspace as well, however here we have implemented it using Azure Synapse Analytics.
- Self-hosted integration runtime.
- ADLS Gen2
- Azure SQL DB – for storing the metadata tables.
- Microsoft Fabric workspace with active Fabric capacity.
- Access to SAP application to test the delta extraction.
- Access ODQMON UI to check the delta tracking.
Set up the linked service.
Step-by-step guidance is mentioned here.
It's essential to emphasize that the subscriber’s name must be unique, as the SAP system identifies the connector based on this name. This uniqueness greatly simplifies monitoring and troubleshooting of the connector within the SAP environment, particularly within the Operation Data Queue (ODQMON).
In this setup the SAP Servers are hosted in Azure VM which are inside the Azure Virtual Network, hence we are providing the Private IP address in the Server Name.
Creating the metadata tables.
In this case, we need 2 metadata tables.
- tbl_metadata: this table contains all the configuration values that are required in the mapping data flow for the dynamic configuration.
IF OBJECT_ID(N'[dbo].[tbl_metadata]', 'U') IS NOT NULL
DROP TABLE [dbo].[tbl_metadata];
GO
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[tbl_metadata]') AND type in (N'U'))
CREATE TABLE [dbo].[tbl_metadata] (
checkPointKey varchar(64) NOT NULL,
sapContext varchar(64) NOT NULL,
sapObjectName varchar(64) NOT NULL,
sapRunMode varchar(255) NOT NULL,
sapKeyColumns varchar(255) NOT NULL,
sapPartitions varchar(255) NULL,
destdeltaContainer varchar(50) NOT NULL,
deltaKeyColumns varchar(255) NOT NULL,
destdeltaFolder varchar(50) NOT NULL,
deltaPartition varchar(250) NULL,
stagingStorageFolder varchar(255),
columnSelectionAndMappingIdentifier varchar(50)
)
Truncate table dbo.tbl_metadata
GO
INSERT INTO dbo.tbl_metadata
SELECT
'CheckPointFor_CMMPOITMDX$F-1' AS checkPointKey,
'ABAP_CDS' AS sapContext,
'CMMPOITMDX$F' AS sapObjectName,
'fullAndIncrementalLoad' AS sapRunMode,
' ["PURCHASEORDER", "PURCHASEORDERITEM"]' AS sapKeyColumns,
'[ [{ "fieldName": "PURCHASEORDER", "sign": "I", "option": "BT", "low": "4500000001" , "high": "4500001000" }]]' AS sapPartitions,
'datafs' AS destdeltaContainer,
'["PURCHASEORDER", "PURCHASEORDERITEM"]' AS deltaKeyColumns,
'dest/delta_tables/CMMPOITMDX_F_sel' AS deltaFolder,
'PURCHASEORDER' AS deltaPartition,
'adf_staging/CMMPOITMDX_F_sel' AS stagingStorageFolder,
'CMMPOITMDX_F_sel' AS columnSelectionAndMappingIdentifier
GO
INSERT INTO dbo.tbl_metadata
SELECT
'CheckPointFor_CSDSLSDOCITMDX1$' AS checkPointKey,
'ABAP_CDS' AS sapContext,
'CSDSLSDOCITMDX1$F' AS sapObjectName,
'fullAndIncrementalLoad' AS sapRunMode,
' [ "SALESDOCUMENT", "SALESDOCUMENTITEM"]' AS sapKeyColumns,
'' AS sapPartitions,
'datafs' AS destdeltaContainer,
' [ "SALESDOCUMENT", "SALESDOCUMENTITEM"]' AS deltaKeyColumns,
'dest/delta_tables/CSDSLSDOCITMDX1_F_sel' AS deltaFolder,
'' AS deltaPartition,
'adf_staging/CSDSLSDOCITMDX1_F_sel' AS stagingStorageFolder,
'CSDSLSDOCITMDX1_F_sel' AS columnSelectionAndMappingIdentifier
- tbl_ columnSelectionAndMapping: This table holds the column mapping between the source SAP objects and Delta table. This mapping table solves the below purposes.
- Dynamic Column renaming while storing the data in the destination delta table.
- Dynamic column selection from the source side.
IF OBJECT_ID(N'[dbo].[tbl_columnSelectionAndMapping]', 'U') IS NOT NULL
DROP TABLE [dbo].[tbl_columnSelectionAndMapping];
GO
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[tbl_columnSelectionAndMapping]') AND type in (N'U'))
CREATE TABLE tbl_columnSelectionAndMapping(
columnSelectionAndMappingIdentifier varchar(50),
prevColumnName varchar(50),
newColumnName varchar(50),
PRIMARY KEY (columnSelectionAndMappingIdentifier, prevColumnName)
)
/*
inserting the columns names and mapping.
*/
TRUNCATE TABLE tbl_columnSelectionAndMapping
GO
INSERT INTO tbl_columnSelectionAndMapping
Select 'CSDSLSDOCITMDX1_F_sel' ,'SALESDOCUMENT' ,'SALESDOCUMENT'
UNION ALL
Select 'CSDSLSDOCITMDX1_F_sel' ,'SALESDOCUMENTITEM' ,'SALESDOCUMENTITEM'
UNION ALL
Select 'CSDSLSDOCITMDX1_F_sel' ,'SDDOCUMENTCATEGORY' ,'CATEGORY'
GO
INSERT INTO tbl_columnSelectionAndMapping
Select 'CMMPOITMDX_F_sel' ,'PURCHASEORDER' ,'PURCHASEORDER'
UNION ALL
Select 'CMMPOITMDX_F_sel' ,'PURCHASEORDERITEM' ,'PURCHASEORDERITEM'
UNION ALL
Select 'CMMPOITMDX_F_sel' ,'PURCHASINGORGANIZATION' ,'ORG'
Creation of Metadata-driven Pipeline
Here is the pipeline that we are going to create for this use case:
Lookup Activity:
The Lookup activity reads the data from tbl_metadata and supplies the value to the ForEach Activity.
ForEach Activity:
This activity traverses through the metadata table rows and calls the dataflow activity for each row of configuration values.- In the settings -> Items section put the dynamic value: @activity('Lookup the metadata from SQL table').output.value
- Uncheck the Sequential checkbox to make the multiple SAP Objects data transfer happen in parallel.
DataFlow Activity
Here are the DataFlow top-level Settings from the pipeline view.
The parameters can be declared from the DataFlow pane, and the values can be assigned from the pipeline view. The parameter values are assigned from the ForEach item values.
- Staging storage folder: concat(item().destdeltaContainer, '/', item().stagingStorageFolder)
Here is the different component view of the Dataflow.
Please note that in this data flow, we have 2 sinks.
- We will configure a sink for a Delta table where we will perform UPSERT operations on the data.
- Another sink option is available to capture raw data and change events from the SAP system, saving them as delimited (CSV) files in the file system. This sink is entirely optional and may not be necessary for your specific use case. However, if you wish to maintain a historical record of the data merged with the target table, you can choose to persist with the flat files in storage.
ColumnMapping source
This source is used to read the data from the other metadata table that we have for the column mapping and selection.
- Inline azure sql db dataset to read the data from the table: columnSelectionAndMapping
- Provide the Query in the expression builder, so that it only fetches the rows which are required for the current SAP object. concat("Select * from tbl_columnSelectionAndMapping where columnSelectionAndMappingIdentifier = '", $columnSelectionAndMappingIdentifier, "'")
- The fields from the table.
cachedSinkColumnSelection sink
We are caching the output from the mapping table, so that we can use this in the main dataflow activities.
Source1
We are reading the source data from the SAP Finance system here. More detail about the parametrization of the fields can be found here.
- The Inline dataset enables us to access the data from the SAP system via the SAP CDC connector.
- We parameterize various configuration values.
- By using the partition condition, we can select the data from the source and enhance the data extraction from the SAP system, because the SAP CDC connector launches a separate Extraction process in the SAP source system for every partition.
Select1
Using the select transformation, we select the required columns from the source data, and perform the column renaming / mapping before saving the data into sink.
In the expression builder put the below expression
Matching condition: !isNull(cachedSinkColumnSelection#lookup(name).prevColumnName)
cachedSinkColumnSelection#lookup($$).newColumnName
Output column name expression: cachedSinkColumnSelection#lookup($$).newColumnName
Sink1
The data from the select1 is merged (insert/update) with the target delta table.
- Inline Dataset as Delta table.
- Defining the Folder Path where the delta table is saved. Also, check-in all the options which are required for UPSERT and DELETE on the target table from the changed data that are being fetched by the SAP CDC connector from the SAP System.
- Select the Auto mapping.
- Use the current partitioning.
(Optional) - derivedColumn1ChangeEvent
In various situations, we opt to develop our custom data transformation logic by leveraging change events (update, insert, delete events). For instance, if we aim to establish a Slowly Changing Dimension Type 2 (SCD Type 2) within our data model, the standard merging process in the mapping dataflow in ADF may not suffice, requiring the creation of our unique logic for implementation. During the development of the data ingestion process, it becomes crucial to understand how alterations occurred in the SAP system. These change-events enable us to replicate these events at the Azure End effectively. This is precisely where the utilization of the changeEvent-derived column becomes imperative. Here is the expression for the changeEvent derive column: case(isInsert(),"I",isDelete(), "D", isUpdate(), "U", isUpsert(), 'U')
(Optional) – sinkFlatfiles
File name: concat($sapObjectName, '-', toString(currentTimestamp(), 'yyyy-MM-ddHHmmss'), '.csv')
Execution of the Pipeline
Note: When you run a pipeline it’s good practice to trigger the pipeline and not run it using debug option. With debug option there is a dependency with the UI session for the subscriber process. The checkpoint key used for extraction using debug will change when a UI session gets switched making the extraction process for delta tracking reset. You can read more about it here. The best option would be to trigger the pipeline rather debug.
Full Extraction.
We will now execute the pipeline for the first time. For this example, we are retrieving the data from 2 tables. The first run is going to be full extract.
The run mode in the mapping table is kept FullandIncremental for the table: CSDSLSDOCITMDX1$F, and Full for CMMPOITMDX$F.
Delta Queues Monitoring (ODQMON):
- You can also monitor the replication by logging onto SAP system and exiting transaction code ODQMON, the Delta Queue Monitoring.
- On the initial screen, you see all delta queues available in the system.
- You can use the filters to narrow the list. By default, the list includes only entries where delta tracking is enabled.
- To include also requests for full extraction (without delta initialization), change the Request Select field to “All” or “Full”.
- In SAP, we use the T-Code ODQMON to monitor the change records from the CDS views. Details of ODQMON.
Here we can see the first load as mentioned as the “Initial Load”. And count 6499 is matching with the count that we got in the dataflow in the previous screenshot.
Incremental/Delta Extraction.
Now, we are changing the record in a sales order table. The data also can be changed from the SAP Frontend (GUI or WEBGUI) by altering the transaction data.
Once we run the adf pipeline, we can see that for the item table it’s a full load again (as per the mapping table), and for the sales table, we have the incremental load.
From the ODQMON, let’s look at the last transaction.
So, if we look at the flat file, we will be able to see the same changes with the change event.
Visualization of the Data in Microsoft Fabric
Since the data is in delta format in ADLS Gen2, we just need to create a shortcut to the storage into Lakehouse in Microsoft Fabric. Once this is done, we will be able to explore the data using the Microsoft Fabric spark engine/ sql engine.
Attaching the Delta tables into Microsoft Fabric Lakehouse.
How to create a shortcut to a Delta table.
Please take note that in order to enable the data to automatically refresh upon completion of data writing by the Azure Data Factory, it is necessary to establish a table shortcut, rather than using the managed table from the File shortcut.
Conclusion
In this comprehensive demonstration, we illustrated the process of extracting data from SAP through a metadata-driven pipeline. While we didn't delve deeply into data transformation here, you have the flexibility to use DataFlow transformation activities or a Spark notebook to implement complex transformation logic before loading the data into a Delta table. If your use case aligns with the Extraction (E) – Load (L) – Transformation (T) workflow, you can certainly follow the aforementioned tutorial and then use a Spark notebook to create your business logic on top of the Delta table, thereby preparing it for the serving layer or presentation layer. We trust that this blog post will serve as a valuable starting point for constructing your pipeline to import SAP data into Azure.
Continue reading...