Перейти к содержанию

Tasks

Documentation

A DAG runs through a series of Tasks. There are three basic kinds of Task:

  1. Operators, predefined tasks that you can string together quickly to build most parts of your DAGs.
  2. Sensors, a special subclass of Operators which are entirely about waiting for an external event to happen.
  3. A TaskFlow-decorated @task, which is a custom Python function packaged up as a Task.

Attention

Airflow sends out Tasks to run on Workers as space becomes available, so there’s no guarantee all the tasks in your DAG will run on the same worker or the same machine.

Relationships

Tasks have dependencies declared on each other.

first_task >> [second_task, third_task]
fourth_task << third_task
, where >> and << — bitshift operators.

Note

Tasks don’t pass information to each other by default, and run entirely independently.

If you want to pass information from one Task to another, you should use XComs.

Task Instances

Much in the same way that a DAG is instantiated into a DAG Run each time it runs, the tasks under a DAG are instantiated into Task Instances.

The possible states for a Task Instance are:

  • none: The Task has not yet been queued for execution (its dependencies are not yet met)
  • scheduled: The scheduler has determined the Task’s dependencies are met and it should run
  • queued: The task has been assigned to an Executor and is awaiting a worker
  • running: The task is running on a worker (or on a local/synchronous executor)
  • success: The task finished running without errors
  • shutdown: The task was externally requested to shut down when it was running
  • restarting: The task was externally requested to restart when it was running
  • failed: The task had an error during execution and failed to run
  • skipped: The task was skipped due to branching, LatestOnly, or similar.
  • upstream_failed: An upstream task failed and the Trigger Rule says we needed it
  • up_for_retry: The task failed, but has retry attempts left and will be rescheduled.
  • up_for_reschedule: The task is a Sensor that is in reschedule mode
  • sensing: The task is a Smart Sensor
  • deferred: The task has been deferred to a trigger
  • removed: The task has vanished from the DAG since the run started

Task Instance State Diagram

Ideally, the task should flow like this: nonescheduledqueuedrunningsuccess.

Relationship Terminology

Firstly, it can have upstream and downstream tasks:

task1 >> task2  # task2 - upstream for task1; task1 — downstream for task2.

When a DAG runs, it will create instances for each of these tasks that are upstream/downstream of each other, but which all have the same data interval.

There may also be instances of the same task, but for different data intervals (from other runs of the same DAG): previous and next.

Note

previous and nextupstream and downstream!

Timeouts

execution_timeout: datetime.timedelta controls the maximum time allowed for every task execution. If execution_timeout is breached, the task times out and AirflowTaskTimeout is raised.

In addition, sensors have a timeout parameter. This only matters for sensors in reschedule mode. timeout controls the maximum time allowed for the sensor to succeed. If timeout is breached, AirflowSensorTimeout will be raised and the sensor fails immediately without retrying.

sensor = SFTPSensor(
    task_id="sensor",
    path="/root/test",
    execution_timeout=timedelta(seconds=60),  # controls the maximum time allowed for every execution.
    timeout=3600,  # controls the maximum time allowed for the sensor to succeed; raises `AirflowSensorTimeout`.
    retries=2,
    mode="reschedule",
)

SLAs

An SLA, or a Service Level Agreement, is an expectation for the maximum time a Task should take. If a task takes longer than this to run, it is then visible in the «SLA Misses» part of the user interface, as well as going out in an email of all tasks that missed their SLA.

Tasks over their SLA are not cancelled, though — they are allowed to run to completion. If you want to cancel a task after a certain runtime is reached, you want Timeouts instead.

Note

Only scheduled tasks will be checked against SLA. For example, manually triggered tasks will not invoke an SLA miss.

example_sla_dag.py
def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    print(
        "The callback arguments are: ",
        {
            "dag": dag,
            "task_list": task_list,
            "blocking_task_list": blocking_task_list,
            "slas": slas,
            "blocking_tis": blocking_tis,
        },
    )

@dag(
    schedule_interval="*/2 * * * *",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    sla_miss_callback=sla_callback,  # will be called when the SLA is missed.
    default_args={'email': "email@example.com"},
)
def example_sla_dag():
    @task(sla=datetime.timedelta(seconds=10))
    def sleep_20():
        """Sleep for 20 seconds"""
        time.sleep(20)

    @task
    def sleep_30():
        """Sleep for 30 seconds"""
        time.sleep(30)

    sleep_20() >> sleep_30()


dag = example_sla_dag()

Special Exceptions

If you want to control your task’s state from within custom Task/Operator code, Airflow provides two special exceptions you can raise:

  • AirflowSkipException will mark the current task as skipped
  • AirflowFailException will mark the current task as failed ignoring any remaining retry attempts

These can be useful if your code has extra knowledge about its environment and wants to fail/skip faster — e.g., skipping when it knows there’s no data available, or fast-failing when it detects its API key is invalid (as that will not be fixed by a retry).

Zombie/Undead Tasks

Airflow detects two kinds of task/process mismatch:

  • Zombie tasks are tasks that are supposed to be running but suddenly died (e.g. their process was killed, or the machine died). Airflow will find these periodically, clean them up, and either fail or retry the task depending on its settings.
  • Undead tasks are tasks that are not supposed to be running but are, often caused when you manually edit Task Instances via the UI. Airflow will find them periodically and terminate them

Executor Configuration

Here’s an example of setting the Docker image for a task that will run on the KubernetesExecutor:

MyOperator(...,
    executor_config={
        "KubernetesExecutor":
            {"image": "myCustomDockerImage"}
    }
)