Skip to content

Scheduler is unable to find serialized DAG in the serialized_dag table #13504

@arch-DJ

Description

@arch-DJ

Apache Airflow version: 2.0.0

Kubernetes version (if you are using kubernetes) (use kubectl version): Not relevant

Environment:

  • Cloud provider or hardware configuration:

  • OS (e.g. from /etc/os-release): CentOS Linux 7 (Core)

  • Kernel (e.g. uname -a): Linux us01odcres-jamuaar-0003 3.10.0-957.5.1.el7.x86_64 Improving the search functionality in the graph view #1 SMP Fri Feb 1 14:54:57 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux

  • Install tools: PostgreSQL 12.2

  • Others:

What happened:

I have 2 dag files say, dag1.py and dag2.py.
dag1.py creates a static DAG i.e. once it's parsed it will create 1 specific DAG.
dag2.py creates dynamic DAGs based on json files kept in an external location.

The static DAG (generated from dag1.py) has a task in the later stage which generates json files and they get picked up by dag2.py which creates dynamic DAGs.

The dynamic DAGs which get created are unpaused by default and get scheduled once.
This whole process used to work fine with airflow 1.x where DAG serialization was not mandatory and was turned off by default.

But with Airflow 2.0 I am getting the following exception occasionally when the dynamically generated DAGs try to get scheduled by the scheduler.

[2021-01-06 10:09:38,742] {scheduler_job.py:1293} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute
    self._run_scheduler_loop()
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1474, in _do_scheduling
    self._create_dag_runs(query.all(), session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1557, in _create_dag_runs
    dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 171, in get_dag
    self._add_dag_from_db(dag_id=dag_id, session=session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 227, in _add_dag_from_db
    raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'dynamic_dag_1' not found in serialized_dag table

When I checked the serialized_dag table manually, I am able to see the DAG entry there.
I found the last_updated column value to be 2021-01-06 10:09:38.757076+05:30
Whereas the exception got logged at [2021-01-06 10:09:38,742] which is little before the last_updated time.

I think this means that the Scheduler tried to look for the DAG entry in the serialized_dag table before DagFileProcessor created the entry.

Is this right or something else can be going on here?

What you expected to happen:

Scheduler should start looking for the DAG entry in the serialized_dag table only after DagFileProcessor has added it.
Here it seems that DagFileProcessor added the DAG entry in the dag table, scheduler immediately fetched this dag_id from it and tried to find the same in serialized_dag table even before DagFileProcessor could add that.

How to reproduce it:
It occurs occasionally and there is no well defined way to reproduce it.

Anything else we need to know:

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions