Tasks
A DAG runs through a series of Tasks. There are three basic kinds of Task:
Operators
, predefined tasks that you can string together quickly to build most parts of your DAGs.Sensors
, a special subclass of Operators which are entirely about waiting for an external event to happen.- A
TaskFlow
-decorated@task
, which is a custom Python function packaged up as a Task.
Attention
Airflow sends out Tasks to run on Workers as space becomes available, so there’s no guarantee all the tasks in your DAG will run on the same worker or the same machine.
Relationships
Tasks have dependencies declared on each other.
, where>>
and <<
— bitshift operators.
Note
Tasks don’t pass information to each other by default, and run entirely independently.
If you want to pass information from one Task to another, you should use XComs.
Task Instances
Much in the same way that a DAG is instantiated into a DAG Run each time it runs, the tasks under a DAG are instantiated into Task Instances.
The possible states for a Task Instance are:
none
: The Task has not yet been queued for execution (its dependencies are not yet met)scheduled
: The scheduler has determined the Task’s dependencies are met and it should runqueued
: The task has been assigned to an Executor and is awaiting a workerrunning
: The task is running on a worker (or on a local/synchronous executor)success
: The task finished running without errorsshutdown
: The task was externally requested to shut down when it was runningrestarting
: The task was externally requested to restart when it was runningfailed
: The task had an error during execution and failed to runskipped
: The task was skipped due to branching, LatestOnly, or similar.upstream_failed
: An upstream task failed and the Trigger Rule says we needed itup_for_retry
: The task failed, but has retry attempts left and will be rescheduled.up_for_reschedule
: The task is a Sensor that is inreschedule
modesensing
: The task is a Smart Sensordeferred
: The task has been deferred to a triggerremoved
: The task has vanished from the DAG since the run started
Ideally, the task should flow like this: none
→ scheduled
→ queued
→ running
→ success
.
Relationship Terminology
Firstly, it can have upstream
and downstream
tasks:
When a DAG runs, it will create instances for each of these tasks that are upstream/downstream of each other, but which all have the same data interval.
There may also be instances of the same task, but for different data intervals (from other runs of the same DAG):
previous
and next
.
Note
previous
and next
≠ upstream
and downstream
!
Timeouts
execution_timeout: datetime.timedelta
controls the maximum time allowed for every task execution. If
execution_timeout
is breached, the task times out and AirflowTaskTimeout
is raised.
In addition, sensors have a timeout
parameter. This only matters for sensors in reschedule
mode. timeout controls the maximum time allowed for the sensor to succeed. If timeout
is breached, AirflowSensorTimeout
will be raised and the sensor fails immediately without retrying.
sensor = SFTPSensor(
task_id="sensor",
path="/root/test",
execution_timeout=timedelta(seconds=60), # controls the maximum time allowed for every execution.
timeout=3600, # controls the maximum time allowed for the sensor to succeed; raises `AirflowSensorTimeout`.
retries=2,
mode="reschedule",
)
SLAs
An SLA, or a Service Level Agreement, is an expectation for the maximum time a Task should take. If a task takes longer than this to run, it is then visible in the «SLA Misses» part of the user interface, as well as going out in an email of all tasks that missed their SLA.
Tasks over their SLA are not cancelled, though — they are allowed to run to completion. If you want to cancel a task after a certain runtime is reached, you want Timeouts instead.
Note
Only scheduled tasks will be checked against SLA. For example, manually triggered tasks will not invoke an SLA miss.
Special Exceptions
If you want to control your task’s state from within custom Task/Operator code, Airflow provides two special exceptions you can raise:
AirflowSkipException
will mark the current task as skippedAirflowFailException
will mark the current task as failed ignoring any remaining retry attempts
These can be useful if your code has extra knowledge about its environment and wants to fail/skip faster — e.g., skipping when it knows there’s no data available, or fast-failing when it detects its API key is invalid (as that will not be fixed by a retry).
Zombie/Undead Tasks
Airflow detects two kinds of task/process mismatch:
- Zombie tasks are tasks that are supposed to be running but suddenly died (e.g. their process was killed, or the machine died). Airflow will find these periodically, clean them up, and either fail or retry the task depending on its settings.
- Undead tasks are tasks that are not supposed to be running but are, often caused when you manually edit Task Instances via the UI. Airflow will find them periodically and terminate them
Executor Configuration
Here’s an example of setting the Docker image for a task that will run on the KubernetesExecutor
: