Posted July 24, 20231 yr Apache Airflow is a widely used task orchestration framework, which gained its popularity due to Python-based programmatic interface - the language of first choice by Data engineers and Data ops. The framework allows defining complex pipelines that move data around different parts, potentially implemented using different technologies. The following article shows how to setup managed instance of Apache Airflow and define a very simple DAG (direct acyclic graph) of tasks that does the following: Uses Azure registered application to authenticate with the ADX cluster. Schedules daily execution of a simple KQL query that calculates HTTP errors statistics based on Web log records for the last day. Pre-requisites for completing the steps of this tutorial are: Microsoft Azure account. Running Azure Data Explorer (ADX) cluster. 1. Create a new or use an existing instance of Azure Data Factory, which provides managed Airflow as part of its services: 2. Once ADF instances is up and running, launch Azure Data Factory studio. Under Manage -> Workflow orchestration manager, click on New and define a new Airflow integration. To enable ADX plug-in in Airflow instance, add "apache-airflow-providers-microsoft-azure" under Airflow requirements. By the time of writing this tutorial, it took for me about 20 minutes for the integration to be created, so please be patient. 3. While the instance of Airflow is being created, you can setup an App registration through which Airflow tasks will be able to run KQL queries on your ADX cluster. First, go to Azure -> App registrations and create a new registration. Generate a new secret under Certificates and secrets. Then, go to your ADX instance, and give the newly defined App a Database Viewers permission. 4. After the Airflow integration in ADF is created, open the Airflow interface by clicking on Monitor icon. Now, it's time to define a connection to your ADX cluster in Airflow. Go to Admin -> Connections, and fill in the following: All values in the above dialog are not real ones. 5. The last step is importing an Airflow DAG file, and letting it run. Here's a source code of a simple Airflow DAG that consists of a single step that invokes a KQL query: from datetime import datetime, timedelta from airflow import DAG from airflow.providers.microsoft.azure.operators.adx import AzureDataExplorerQueryOperator with DAG( 'my-first-dag', start_date=datetime(2023, 7, 1), max_active_runs=1, schedule='@daily', default_args={ 'depends_on_past': False, 'retries': 0, }, catchup=False ) as dag: op_daily_http_errors = AzureDataExplorerQueryOperator( task_id='daily-http-errors', query=''' WebLog | where Timestamp > ago(1d) | where isnotnull(HttpError) | summarize count(), take_any(Path) by HttpError ''', database='Test', azure_data_explorer_conn_id='adx' ) Upload the file to an Azure storage account. Make sure to call the folder under which the file is uploaded as dags. Click on Import files: Linked the Azure storage hosting your DAG file: Click on Apply, then select the dags directory and click on Import. Once imported, the DAG should start running automatically. Next steps: Read more about Airflow DAG concepts. Learn about KQL queries best practices. Continue reading...
Join the conversation
You can post now and register later. If you have an account, sign in now to post with your account.