Airflow DAGs

Start Free Trial
July 28, 2020 by Updated July 9th, 2024

Now that you have read about how different components of Airflow work and how to run Apache Airflow locally, it’s time to start writing our first workflow or DAG (Directed Acyclic Graphs). As you may recall workflows are referred to as DAGs in Airflow. If you haven’t already read our previous blogs and wish to know about different components of airflow or how to install and run Airflow please do.

But before writing a DAG, it is important to learn the tools and components Apache Airflow provides to easily build pipelines, schedule them, and also monitor their runs. Here we will list some of the important concepts, provide examples, and use cases of the same. If you wish to read the complete documentation of these concepts, it’s available here on the Airflow Documentation site.

Airflow DAGs

DAGs are a collection of tasks where all the tasks (if connected) are connected via directed lines.  Traversing the graph, starting from any task, it is not possible to reach the same task again hence, the Acyclic nature of these workflows (or DAGs).

DAGs are defined using Python code in Airflow, here’s one of the examples dag from Apache Airflow’s Github repository. Here, we have shown only the part which defines the DAG, the rest of the objects will be covered later in this blog.

from airflow import DAG
dag = DAG(
dag_id='example_bash_operator',
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
tags=['example']
)

The above example shows how a DAG object is created. Now a dag consists of multiple tasks that are executed in order. In Airflow, tasks can be Operators, Sensors, or SubDags details of which we will cover in the later section of this blog. Using these operators or sensors one can define a complete DAG that will execute the tasks in the desired order. Here’s an image showing how the above example dag creates the tasks in DAG in order:

A DAG’s graph view on Webserver

DAG Graph View 

DAGs are stored in the DAGs directory in Airflow, from this directory Airflow’s Scheduler looks for file names with dag or airflow strings and parses all the DAGs at regular intervals, and keeps updating the metadata database about the changes (if any).

Trigger a DAG Button

Trigger a DAG

DAG run is simply metadata on each time a DAG is run. Whenever someone creates a DAG, a new entry is created in the dag_run table with the dag id and execution date which helps in uniquely identifying each run of the DAG. DAG Runs can also be viewed on the webserver under the browse section.

List DAG Runs

List of DAG Runs on Webserver

Dag Operators and Sensors

DAGs are composed of multiple tasks. In Airflow we use Operators and sensors (which is also a type of operator) to define tasks. Once an operator is instantiated within a given DAG, it is referred to as a task of the DAG.

DAG Operator

An Operator usually provides integration to some other service like MySQLOperator, SlackOperator, PrestoOperator, etc which provides a way to access these services from Airflow. Qubole provides QuboleOperator which allows users to run Presto, Hive, Hadoop, Spark, Zeppelin Notebooks, Jupyter Notebooks, and Data Import / Export on one’s Qubole account.

Some common operators available in Airflow are:

  • BashOperator – used to execute bash commands on the machine it runs on
  • PythonOperator – takes any Python function as an input and calls the same (this means the function should have a specific signature as well)
  • EmailOperator – sends emails using an SMTP server configured
  • SimpleHttpOperator – makes an HTTP request that can be used to trigger actions on a remote system.
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, JdbcOperator, etc. – used to run SQL commands
  • Qubole Operator: allows users to run and get results from Presto, Hive, Hadoop, Spark Commands, Zeppelin Notebooks, Jupyter Notebooks, and Data Import / Export Jobs on the configured Qubole account.

DAG Sensors

Sensors are special types of operators whose purpose is to wait on some external or internal trigger. These are commonly used to trigger some or all of the DAG, based on the occurrence of some external event. Some common types of sensors are:

  • ExternalTaskSensor: waits on another task (in a different DAG) to complete execution.
  • HivePartitionSensor: waits for a specific value of partition of hive table to get created
  • S3KeySensor: S3 Key sensors are used to wait for a specific file or directory to be available on an S3 bucket.

DAG Hooks

Hooks are interfaces to services external to the Airflow Cluster. While Operators provide a way to create tasks that may or may not communicate with some external service, hooks provide a uniform interface to access external services like S3, MySQL, Hive, Qubole, etc. Hooks are the building blocks for operators to interact with external services.

Tasks, Task Instances, And XCom

As mentioned in the previous section, any instance of the operator is referred to as a task. Each task is represented as a node in the DAG. Here’s an example of an operator being used to create a task.

run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag,
)

The above example is a bash operator, which takes a bash command as an argument. When the task executes, it runs the commands and the output can be found in the logs.

Link DAG Tasks

There are multiple ways to link tasks in a DAG to each other. Here’s a list of all the methods and we recommend using bitshift and relationship builders among these when needed to build a relationship while maintaining clean and readable code.

Set Upstream and Downstream

# this means task 2 will execute after task 1
task1.set_downstream(task2)
# this sets task 2 to run after task 1
task1.set_upstream(task2)

Bitshift Composition

task1 >> task2
# the above is the same as
task1.set_downstream(task2)task2 << task1
# which is the same as
task1.set_upstream(task2)# we can also chain these operators
# the line below chains task 2 after task 1 and task3 after 2, also task 4 is to be executed before task 3
task1 >> task2 >> task3 << task4# bitshift can also be used with lists
[task1, task2, task3] >> task4
[task1, task2, task3] >> task5
[task1, task2, task3] >> task6

Relationship Builders

chain and cross_downstream functions provide easier ways to set relationships between operators in a specific situation. Here are a few examples:

#  the last example demonstrated in bitshift composition can easily be replaced as this
cross_downstream([task1, task2, task3], [task4, task5, task6])
# also conditions like task1 >> task2 >> task3 >> task4 >> task5 can be replaced with:
chain(task1, task2, task3, task4, task5)

DAG Task Instance

Task Instance is a run of the task. While we define the tasks in a DAG, a DAG run is created when the DAG is executed, it contains the dag id and the execution date of the DAG to identify each unique run. Each DAG run consists of multiple tasks and every run of these tasks is referred to as a task instance.

A task instance goes through multiple states when running and a complete lifecycle can be easily found on the Airflow docs page. The happy flow consists of the following stages:

  • No status (scheduler created empty task instance)
  • Scheduled (scheduler determined task instance needs to run)
  • Queued (scheduler sent the task to the queue – to be run)
  • Running (worker picked up a task and is now executing it)
  • Success (task completed)

Here are all the states and the transitions of the task lifecycle.

Task Lifecycle

DAG XCom

XCom (cross-communication) provides a way of sharing messages or data between different tasks. Xcom at its core is a table that stores key-value pairs while also keeping tabs on which pair was provided by which task and dag. This is the Xcom table schema:

CIDnametypenot nulldflt_valuepk
0idINTEGER11
1keyVARCHAR(51)00
2valueBLOB00
3timestampDATETIME10
4execution_dateDATETIME10
5task_idVARCHAR(51)10
6dag_idVARCHAR(51)10

As you can observe, the Xcom table keeps key-value pairs while keeping track of which task and run of the dag added it (using execution date, task_id, and dag_id). Xcom provides methods namely the xcom_push and xcom_pull to add and retrieve entries from or to the xcom table. Here’s an example of two methods that can be called in two different Python operators to communicate with each other.

# inside a PythonOperator called 'pushing_task'
# python operator adds the return value to xcom
def push_function():
return value# or someone can also do xcom push to push the value with 
# provide_context=True
def push_function(**context):
context['task_instance'].xcom_push(key = 'test', value=value)# inside another PythonOperator where provide_context=True
def pull_function(**context):
value = context['task_instance'].xcom_pull(task_ids='pushing_task', key='test')

Xcom can also be used to populate dynamic content using templates:

SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}

That’s a lot of theoretical content for this blog. All these concepts will help you write DAGs in your day-to-day job. In our next blog, we will cover some advanced concepts regarding schedules, triggers, and templating. Please sign-up for Qubole Free Trial to experience Qubole Airflow which is tightly integrated with the Qubole Open Data Lake Platform.

Start Free Trial
Read Integration of RStudio & Qubole Platform come together at your Fingertips | Qubole