AirFlow high order, task dependent correlation demo in two DAG s with different start times

Preface background

There is a scheduling requirement. After querying the previous history DAG, it is found that there is a DAG that can be used as the front of my new scheduling, so I want to see how the task s between DAGs are related, so I have the following Demo.
If you can surf the Internet scientifically and your English listening is good, you can move here and the boss will speak better!

principle

DAG A and DAG B are written in two different py files. A task in my a is a dependency of a task in my b. only when a is finished, can my b execute. So what should I do? Then we should arrange a supervisor to see whether the task specified in a is successfully executed and the execution is successful. Then the supervisor can withdraw and let the task in B begin to execute.

Master.py

vim master.py

Example reference address

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor

default_args={
        "owner": "airflow",
        "start_date": datetime(2020,11, 22),
        }
with DAG(
dag_id="master_dag",
default_args=default_args,
schedule_interval="20 15 * * *",
#concurrency=1, 
#max_active_runs=1, 
tags=['example1'],
 ) as child_dag:
# [START howto_operator_external_task_sensor]
     child_task1 = ExternalTaskSensor(
     task_id="master_task1",
     external_dag_id="slave_dag",
     external_task_id="slave_task1",
    # timeout=600,
    # allowed_states=['success'],
    # failed_states=['failed', 'skipped'],
     execution_delta=timedelta(minutes=5),
    # mode= "reschedule",
     )

 # [END howto_operator_external_task_sensor]
     child_task2 = BashOperator(task_id="master_task2",bash_command="echo i am master!",dag=child_dag)
child_task1 >> child_task2

Slave.py

vim slave.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor
default_args={
        "owner": "airflow",
        "start_date": datetime(2020,11, 22),
        }
with DAG(
dag_id="slave_dag",
default_args=default_args,
schedule_interval="15 15 * * *",
#concurrency=1, 
#max_active_runs=1, 
tags=['example2'],
) as slave_dag:
 # [START howto_operator_external_task_marker]
    '''
    parent_task = ExternalTaskMarker(
     task_id="slave_task2",
     external_dag_id="master_dag",
     external_task_id="master_task1",
     execution_delta=timedelta(minutes=3)
      )
parent_task
'''
    slave_task = BashOperator(
    task_id ="slave_task1",
    bash_command ="echo i am slave!",
    )

slave_task

Detailed explanation of relationship

Here, the tasks in the master need to wait for the tasks in the slave to be completed, so a supervisor needs to watch. Then the supervisor has an officially provided component ExternalTaskSensor to complete, so dependency needs to be introduced,
from airflow.sensors.external_task_sensor import ExternalTaskSensor, because my starttime is not always, so when scheduling, the execution time needs to use timedelta. In the sensor construction, a param is provided, which is execution_delta, and then the timedelta can be passed in, and the passed in value is particularly particular!!! I try it out. After all, English is too bad and I don't understand the video very well. The parameters passed in are 2 DAG start_ Absolute value of time difference. My example is that the master starts five minutes later, so I fill in 5 here.

The reason is this. Ideally, if you ask the master about the slave task and the slave task is still running, the sensor can wait for it to complete. If it has been completed for a long time, the sensor will not respond to the slave.
So the summary is execution_ The parameter delta = timedelta (minutes = 5) is filled in the sensor, and its value depends on the start-up time difference between the two DAG s! The corresponding parameters can be transmitted in hours, minutes and seconds. The corresponding API confirmation needs to be found.

summary

The official also provided a special scene

If you want every parent_ Parent on task_ Dag is cleared, child_task1
At child_dag specific execution_date should also be cleared, ExternalTaskMarker
Should be used. Please note, child_task1 parent is cleared only when the user selects "recursive" when clearing_ task.

I didn't use this scene, so I don't want to try it. If you are interested, you can try it.
The Task Association in DAG is officially monitored by the corresponding sensor, and airflow also provides the monitoring of the status of HTTP requests. Generally speaking, it is a very powerful scheduling tool. I am also switching from AZKB to AF recently. The experience is good, that is, I need some experience in functional programming or PY development. Many DAG scripts need to be written in py, but it is not very difficult. When the domestic leaders have a better summary, I will also strengthen my learning. Recently, there is a new scheduling framework dolphin, which can realize drag and drop scheduling. I'll try it when I'm free.

Tags: Big Data

Posted by tom2oo8 on Fri, 06 May 2022 02:11:16 +0300