TLDR: It is possible to dynamically create dags with only one dag script. However, at task execution the original dag script will be parsed once again. This results in unnecessary parsing iterations of dags, which are not the parent dag of the task at hand.
We have a script to dynamically create dags as following:
dyn_dags = ['dag_load_hungsblog', 'dag_backup_hungsblog'] for dag_id in dyn_dags: # retrieve specifc task from a database task_list = get_tasks_for_specific_dag(dag_id) # create DAG dag = DAG(dag_id) with dag: for taks in task_list: do_the_task(task) # create task dependencies # pushing the dag to the globals variable, which Airflow retrieves to get information on existing dags globals()[dag_id] = dag
From a list of dags to be created dynamically, we query a database to retrieve the specific task in regards to that dag. We observed, that using this method, the “Fill DagBag” Process of each task increased to several seconds.
Required Airflow Background Information
Apparently Airflow does always parse a given dag script two times.
- The scheduler parses the file in an infinite loop (airflow doc). And (this is just an assumption) creates the appropriate graph in the UI.
- At execution time the python script which holds the dag definition is parsed as a whole once again (airflow doc)
“Dynamically generating dags can cause performance issues” (astronomer)
The increased processing time is because each task parses the python script once again at task execution. Additionally, each dag connects to a database firs to retrieve necessary task information, which is very costly timewise. Let’s look at the script provided above once again. Assuming we are executing a task within the dag_load_hungsblog, the task would parse the whole python script once again, thus retrieve a list of specific tasks for dag_load_hungsblog and dag_backup_hungsblog. However, obviously the latter retrieval is just unnecessary overhead.
To avoid tasks to execute code which originally belongs to a “foreign parent” dag a new experimental feature has been released in Airflow 2.4.
get_parsing_context() returns the current dagid and taskID to be executed.
However, task execution requires only a single DAG object to execute a task. Knowing this, we can skip the generation of unnecessary DAG objects when a task is executed, shortening the parsing time.Airflow
dyn_dags = ['dag_load_hungsblog', 'dag_backup_hungsblog'] current_dag_id = get_parsing_context().dag_id for dag_id in dyn_dags: if current_dag_id is not None and current_dag_id != dag_id: continue # do the task above [..]
We first retrieve the current dagID. If we are in a dag run, this would be filled, however if the scheduler is parsing this script, this value will be none (airflow doc). Then in each iteration we first check, if we are indeed in a dag run and if the current dag run is the one we want to execute. If we are in a “foreign” iteration containing metadata parsing of a foreign dag, we can skip it for the task at hand.
Using this experimental feature, we are able to limit parsing of the script to the only necessary components, thus reducing task execution time significantly.