DAGs
A DAG (Directed Acyclic Graph) specifies the dependencies between Tasks, and the order in which to execute them and run retries; the Tasks themselves describe what to do, be it fetching data, running analysis, triggering other systems, or more.
Note
The DAG itself doesn’t care about what is happening inside the tasks.
It is merely concerned with how to execute them: the order to run them in, how many times to retry them, if they have timeouts, and so on.
Declaring a DAG
-
Context manager:
-
Standard constructor:
-
@dag
decorator:@dag( schedule_interval=None, start_date=pendulum.datetime(2022, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) def example_dag_decorator(email: str = 'example@example.com'): """ DAG to send server IP to email. :param email: Email to send IP to. Defaults to example@example.com. :type email: str """ get_ip = GetRequestOperator( task_id='get_ip', url="http://httpbin.org/get", ) @task(multiple_outputs=True) def prepare_email(raw_json: Dict[str, Any]) -> Dict[str, str]: external_ip = raw_json['origin'] return { 'subject': f'Server connected from {external_ip}', 'body': f'Seems like today your server executing Airflow is connected from IP {external_ip}<br>', } email_info = prepare_email(get_ip.output) EmailOperator( task_id='send_email', to=email, subject=email_info['subject'], html_content=email_info['body'], ) dag = example_dag_decorator()
Note
Airflow will only load DAGs that appear in the top level of a DAG file. This means you cannot just declare a
function with @dag
- you must also call it at least once in your DAG file and assign it to a top-level object,
as you can see in the example above
Task Dependencies
Loading DAGs
dag_1 = DAG('this_dag_will_be_discovered')
def my_function():
dag_2 = DAG('but_this_dag_will_not')
my_function()
While both DAG constructors get called when the file is accessed, only dag_1
is at the top level (in the globals()
)
, and so only it is added to Airflow. dag_2
is not loaded.
Note
When searching for DAGs inside the DAG_FOLDER
, Airflow only considers Python files that contain the strings
airflow
and dag
(case-insensitively) as an optimization.
To consider all Python files instead, disable the DAG_DISCOVERY_SAFE_MODE
configuration flag.
You can also provide an .airflowignore
file inside your DAG_FOLDER
, or any of its subfolders, which describes
files for the loader to ignore. It covers the directory it’s in plus all subfolders underneath it, and should be one
regular expression per line, with #
indicating comments.
Running DAGs
DAGs will run in one of two ways:
- When they are triggered either manually or via the API
- On a defined schedule (
schedule_interval
), which is defined as part of the DAG
Every time you run a DAG, you are creating a new instance of that DAG which Airflow calls a DAG Run
. DAG Runs
can run in parallel for the same DAG, and each has a defined data interval, which identifies the period of data the
tasks should operate on.
Tasks specified inside a DAG are also instantiated into Task Instances
along with it.
Note
Logical date
— when DAG run triggered, or start of the data interval.
Start date
— logical date + scheduled interval.
DAG Assignment
Note
Every single Operator/Task must be assigned to a DAG in order to run.
Airflow has several ways of calculating the DAG without you passing it explicitly:
- If you declare your Operator inside a
with DAG
block. - If you declare your Operator inside a
@dag
decorator. - If you put your Operator upstream or downstream of an Operator that has a DAG.
Otherwise, you must pass it into each Operator with dag=
.
Default Arguments
Pass default_args
to the DAG when you create it, and it will auto-apply them to any operator tied to it:
import pendulum
with DAG(
dag_id='my_dag',
start_date=pendulum.datetime(2016, 1, 1, tz="UTC"),
schedule_interval='@daily',
catchup=False,
default_args={'retries': 2},
) as dag:
op = BashOperator(task_id='dummy', bash_command='Hello World!')
print(op.retries) # 2
Control Flow
By default, a DAG will only run a Task when all the Tasks it depends on are successful. There are several ways of modifying this, however:
- Branching
, where you can select which Task to move onto based on a condition — see
BranchPythonOperator
. - Latest Only , a special form of branching that only runs on DAGs running against the present.
- Depends On Past , where tasks can depend on themselves from a previous run.
- Trigger Rules , which let you set the conditions under which a DAG will run a task.
Trigger Rules
trigger_rule
is an argument for Task.
The options for trigger_rule are:
all_success
(default): All upstream tasks have succeededall_failed
: All upstream tasks are in a failed or upstream_failed stateall_done
: All upstream tasks are done with their executionone_failed
: At least one upstream task has failed (does not wait for all upstream tasks to be done)one_success
: At least one upstream task has succeeded (does not wait for all upstream tasks to be done)none_failed
: All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skippednone_failed_min_one_success
: All upstream tasks have not failed or upstream_failed, and at least one upstream task has succeeded.none_skipped
: No upstream task is in a skipped state - that is, all upstream tasks are in a success, failed, or upstream_failed statealways
: No dependencies at all, run this task at any time
Note
If BranchPythonOperator
is used → use none_failed_min_one_success
. See documentation for
details.
Dynamic DAGs
You are free to use loops, functions, and more to define your DAG.
with DAG("loop_example") as dag:
first = DummyOperator(task_id="first")
last = DummyOperator(task_id="last")
options = ["branch_a", "branch_b", "branch_c", "branch_d"]
for option in options:
t = DummyOperator(task_id=option)
first >> t >> last
DAG Visualization
TaskGroups
A TaskGroup can be used to organize tasks into hierarchical groups in Graph view. It is useful for creating repeating patterns and cutting down visual clutter.
Unlike SubDAGs, TaskGroups are purely a UI grouping concept. Tasks in TaskGroups live on the same original DAG, and honor all the DAG settings and pool configurations.
with TaskGroup("group1") as group1:
task1 = DummyOperator(task_id="task1")
task2 = DummyOperator(task_id="task2")
task3 = DummyOperator(task_id="task3")
group1 >> task3
Edge Labels
You can also label the dependency edges between different tasks in the Graph view - this can be especially useful for branching areas of your DAG, so you can label the conditions under which certain branches might run.
from airflow.utils.edgemodifier import Label
with DAG(
"example_branch_labels",
schedule_interval="@daily",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
ingest = DummyOperator(task_id="ingest")
analyse = DummyOperator(task_id="analyze")
check = DummyOperator(task_id="check_integrity")
describe = DummyOperator(task_id="describe_integrity")
error = DummyOperator(task_id="email_error")
save = DummyOperator(task_id="save")
report = DummyOperator(task_id="report")
ingest >> analyse >> check
check >> Label("No errors") >> save >> report
check >> Label("Errors found") >> describe >> error >> report
DAG & Task Documentation
"""
### My great DAG
"""
import pendulum
dag = DAG(
"my_dag",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval="@daily",
catchup=False,
)
dag.doc_md = __doc__ # markdown
t = BashOperator("foo", dag=dag)
t.doc_md = """\
#Title"
Here's a [url](www.airbnb.com)
"""
SubDAGs (deprecated)
Attention
SubDAG is deprecated hence TaskGroup is always the preferred choice.
Packaging DAGs
You can package the DAG and all of its Python files up as a single zip file. For instance, you could ship two dags along with a dependency they need as a zip file with the following contents:
.airflowignore
A .airflowignore
file specifies the directories or files in DAG_FOLDER
or PLUGINS_FOLDER
that Airflow should intentionally ignore.
Each line in .airflowignore
specifies a regular expression pattern. If a directory’s name matches any of the patterns,
this directory and all its subfolders would not be scanned by Airflow at all. This improves efficiency of DAG finding.
.airflowignore
file should be put in your DAG_FOLDER
.
The scope of a .airflowignore
file is the directory it is in plus all its subfolders. You can also prepare
.airflowignore
file for a subfolder in DAG_FOLDER
and it would only be applicable for that subfolder.