Перейти к содержанию

DAGs

Documentation

A DAG (Directed Acyclic Graph) specifies the dependencies between Tasks, and the order in which to execute them and run retries; the Tasks themselves describe what to do, be it fetching data, running analysis, triggering other systems, or more.

Note

The DAG itself doesn’t care about what is happening inside the tasks.

It is merely concerned with how to execute them: the order to run them in, how many times to retry them, if they have timeouts, and so on.

Declaring a DAG

  1. Context manager:

    with DAG(
        "my_dag_name",
        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
        schedule_interval="@daily",
        catchup=False,
    ) as dag:
        op = DummyOperator(task_id="task")
    

  2. Standard constructor:

    my_dag = DAG(
        "my_dag_name",
        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
        schedule_interval="@daily",
        catchup=False,
    )
    op = DummyOperator(task_id="task", dag=my_dag)
    

  3. @dag decorator:

    @dag(
        schedule_interval=None,
        start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
        catchup=False,
        tags=['example'],
    )
    def example_dag_decorator(email: str = 'example@example.com'):
        """
        DAG to send server IP to email.
    
        :param email: Email to send IP to. Defaults to example@example.com.
        :type email: str
        """
        get_ip = GetRequestOperator(
            task_id='get_ip',
            url="http://httpbin.org/get",
        )
    
        @task(multiple_outputs=True)
        def prepare_email(raw_json: Dict[str, Any]) -> Dict[str, str]:
            external_ip = raw_json['origin']
            return {
                'subject': f'Server connected from {external_ip}',
                'body': f'Seems like today your server executing Airflow is connected from IP {external_ip}<br>',
            }
    
        email_info = prepare_email(get_ip.output)
    
        EmailOperator(
            task_id='send_email',
            to=email,
            subject=email_info['subject'],
            html_content=email_info['body'],
        )
    
    dag = example_dag_decorator()
    

Note

Airflow will only load DAGs that appear in the top level of a DAG file. This means you cannot just declare a function with @dag - you must also call it at least once in your DAG file and assign it to a top-level object, as you can see in the example above

Task Dependencies

first_task >> [second_task, third_task]
third_task << fourth_task

Loading DAGs

dag_1 = DAG('this_dag_will_be_discovered')

def my_function():
    dag_2 = DAG('but_this_dag_will_not')

my_function()

While both DAG constructors get called when the file is accessed, only dag_1 is at the top level (in the globals()) , and so only it is added to Airflow. dag_2 is not loaded.

Note

When searching for DAGs inside the DAG_FOLDER, Airflow only considers Python files that contain the strings airflow and dag (case-insensitively) as an optimization.

To consider all Python files instead, disable the DAG_DISCOVERY_SAFE_MODE configuration flag.

You can also provide an .airflowignore file inside your DAG_FOLDER, or any of its subfolders, which describes files for the loader to ignore. It covers the directory it’s in plus all subfolders underneath it, and should be one regular expression per line, with # indicating comments.

Running DAGs

DAGs will run in one of two ways:

  • When they are triggered either manually or via the API
  • On a defined schedule (schedule_interval), which is defined as part of the DAG

Every time you run a DAG, you are creating a new instance of that DAG which Airflow calls a DAG Run. DAG Runs can run in parallel for the same DAG, and each has a defined data interval, which identifies the period of data the tasks should operate on.

Tasks specified inside a DAG are also instantiated into Task Instances along with it.

Note

Logical date — when DAG run triggered, or start of the data interval.

Start date — logical date + scheduled interval.

DAG Assignment

Note

Every single Operator/Task must be assigned to a DAG in order to run.

Airflow has several ways of calculating the DAG without you passing it explicitly:

  • If you declare your Operator inside a with DAG block.
  • If you declare your Operator inside a @dag decorator.
  • If you put your Operator upstream or downstream of an Operator that has a DAG.

Otherwise, you must pass it into each Operator with dag=.

Default Arguments

Pass default_args to the DAG when you create it, and it will auto-apply them to any operator tied to it:

import pendulum

with DAG(
    dag_id='my_dag',
    start_date=pendulum.datetime(2016, 1, 1, tz="UTC"),
    schedule_interval='@daily',
    catchup=False,
    default_args={'retries': 2},
) as dag:
    op = BashOperator(task_id='dummy', bash_command='Hello World!')
    print(op.retries)  # 2

Control Flow

By default, a DAG will only run a Task when all the Tasks it depends on are successful. There are several ways of modifying this, however:

  • Branching , where you can select which Task to move onto based on a condition — see BranchPythonOperator.
  • Latest Only , a special form of branching that only runs on DAGs running against the present.
  • Depends On Past , where tasks can depend on themselves from a previous run.
  • Trigger Rules , which let you set the conditions under which a DAG will run a task.
Trigger Rules

trigger_rule is an argument for Task.

The options for trigger_rule are:

  1. all_success (default): All upstream tasks have succeeded
  2. all_failed: All upstream tasks are in a failed or upstream_failed state
  3. all_done: All upstream tasks are done with their execution
  4. one_failed: At least one upstream task has failed (does not wait for all upstream tasks to be done)
  5. one_success: At least one upstream task has succeeded (does not wait for all upstream tasks to be done)
  6. none_failed: All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped
  7. none_failed_min_one_success: All upstream tasks have not failed or upstream_failed, and at least one upstream task has succeeded.
  8. none_skipped: No upstream task is in a skipped state - that is, all upstream tasks are in a success, failed, or upstream_failed state
  9. always: No dependencies at all, run this task at any time

Note

If BranchPythonOperator is used → use none_failed_min_one_success. See documentation for details.

Dynamic DAGs

You are free to use loops, functions, and more to define your DAG.

with DAG("loop_example") as dag:

    first = DummyOperator(task_id="first")
    last = DummyOperator(task_id="last")

    options = ["branch_a", "branch_b", "branch_c", "branch_d"]
    for option in options:
        t = DummyOperator(task_id=option)
        first >> t >> last

DAG Visualization

TaskGroups

A TaskGroup can be used to organize tasks into hierarchical groups in Graph view. It is useful for creating repeating patterns and cutting down visual clutter.

Unlike SubDAGs, TaskGroups are purely a UI grouping concept. Tasks in TaskGroups live on the same original DAG, and honor all the DAG settings and pool configurations.

with TaskGroup("group1") as group1:
    task1 = DummyOperator(task_id="task1")
    task2 = DummyOperator(task_id="task2")

task3 = DummyOperator(task_id="task3")

group1 >> task3

Edge Labels

You can also label the dependency edges between different tasks in the Graph view - this can be especially useful for branching areas of your DAG, so you can label the conditions under which certain branches might run.

from airflow.utils.edgemodifier import Label

with DAG(
        "example_branch_labels",
        schedule_interval="@daily",
        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
        catchup=False,
) as dag:
   ingest = DummyOperator(task_id="ingest")
   analyse = DummyOperator(task_id="analyze")
   check = DummyOperator(task_id="check_integrity")
   describe = DummyOperator(task_id="describe_integrity")
   error = DummyOperator(task_id="email_error")
   save = DummyOperator(task_id="save")
   report = DummyOperator(task_id="report")

   ingest >> analyse >> check
   check >> Label("No errors") >> save >> report
   check >> Label("Errors found") >> describe >> error >> report

Labeling different branches

DAG & Task Documentation

"""
### My great DAG
"""
import pendulum

dag = DAG(
    "my_dag",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule_interval="@daily",
    catchup=False,
)
dag.doc_md = __doc__  # markdown

t = BashOperator("foo", dag=dag)
t.doc_md = """\
#Title"
Here's a [url](www.airbnb.com)
"""

SubDAGs (deprecated)

Attention

SubDAG is deprecated hence TaskGroup is always the preferred choice.

SubDAGs Documentation

Packaging DAGs

You can package the DAG and all of its Python files up as a single zip file. For instance, you could ship two dags along with a dependency they need as a zip file with the following contents:

my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py

.airflowignore

A .airflowignore file specifies the directories or files in DAG_FOLDER or PLUGINS_FOLDER that Airflow should intentionally ignore.

Each line in .airflowignore specifies a regular expression pattern. If a directory’s name matches any of the patterns, this directory and all its subfolders would not be scanned by Airflow at all. This improves efficiency of DAG finding.

.airflowignore file should be put in your DAG_FOLDER.

The scope of a .airflowignore file is the directory it is in plus all its subfolders. You can also prepare .airflowignore file for a subfolder in DAG_FOLDER and it would only be applicable for that subfolder.