Skip to content

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented Jan 25, 2021

closes #13504

Currently the DagFileProcessor parses the DAG files, writes it to the
dag table and then writes DAGs to serialized_dag table.

At the same time, the scheduler loop is constantly looking for the next
DAGs to process based on next_dagrun_create_after column of the DAG
table.

It might happen that as soon as the DagFileProcessor writes DAG to dag
table, the scheduling loop in the Scheduler picks up the DAG for preocessing.
However, as the DagFileProcessor has not written to serialized DAG table yet
the scheduler will error with "Serialized Dag not Found" error.

This would mainly happen when the DAGs are dynamic where the result of one DAG,
creates multiple DAGs.

This commit changes the order of writing DAG and Serialized DAG and hence
before a DAG is written to dag table it will be written to serialized_dag table.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

closes apache#13504

Currently the DagFileProcessor parses the DAG files, writes it to the
`dag` table and then writes DAGs to `serialized_dag` table.

At the same time, the scheduler loop is constantly looking for the next
DAGs to process based on ``next_dagrun_create_after`` column of the DAG
table.

It might happen that as soon as the DagFileProcessor writes DAG to `dag`
table, the scheduling loop in the Scheduler picks up the DAG for preocessing.
However, as the DagFileProcessor has not written to serialized DAG table yet
the scheduler will error with "Serialized Dag not Found" error.

This would mainly happen when the DAGs are dynamic where the result of one DAG,
creates multiple DAGs.

This commit changes the order of writing DAG and Serialized DAG and hence
before a DAG is written to `dag` table it will be written to `serialized_dag` table.
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Jan 25, 2021
@kaxil kaxil marked this pull request as ready for review January 25, 2021 21:55
@kaxil kaxil merged commit b9eb51a into apache:master Jan 25, 2021
@kaxil kaxil deleted the fix-serialized-dag-error branch January 25, 2021 21:55
@nik-davis
Copy link

Thanks @kaxil for the fix, we'll try that out today and let you know if there's any issues.

I haven't been able to reproduce the error locally yet - I imagine with a local DB connection the DAG serialization is happening too fast to trigger the race condition. I'm wondering if adding a large file import to one of the operators could artificially slow it down a bit, but I don't know enough about how the serialization works to know for sure.

I've only managed to get the error locally by removing a row from the serialized_dag table. It looks like the try/excepts you've added would handle this though as well.

Here's the script I was toying with locally anyway to try and trigger the error, in case that's of use (increase the value in the range function to generate more DAGs):

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago


def create_dag(dag_id):
    @dag(
        default_args={"owner": "airflow", "start_date": days_ago(1)},
        schedule_interval="@once",
        dag_id=dag_id,
        catchup=False,
    )
    def dynamic_dag():
        @task()
        def dynamic_task_1(**kwargs):
            return 1

        task_1 = dynamic_task_1()
        task_2 = dynamic_task_1(task_1)

    return dynamic_dag()


for i in range(2):
    dag_id = f"dynamic_dag_{i}"
    globals()[dag_id] = create_dag(dag_id)

kaxil added a commit that referenced this pull request Jan 27, 2021
closes #13504

Currently, the DagFileProcessor parses the DAG files, writes it to the
`dag` table and then writes DAGs to `serialized_dag` table.

At the same time, the scheduler loop is constantly looking for the next
DAGs to process based on ``next_dagrun_create_after`` column of the DAG
table.

It might happen that as soon as the DagFileProcessor writes DAG to `dag`
table, the scheduling loop in the Scheduler picks up the DAG for processing.
However, as the DagFileProcessor has not written to serialized DAG table yet
the scheduler will error with "Serialized Dag not Found" error.

This would mainly happen when the DAGs are dynamic where the result of one DAG,
creates multiple DAGs.

This commit changes the order of writing DAG and Serialized DAG and hence
before a DAG is written to `dag` table it will be written to `serialized_dag` table.

(cherry picked from commit b9eb51a)
@tooptoop4
Copy link
Contributor

did it work @nik-davis ?

@nik-davis
Copy link

@tooptoop4 I haven't seen the issue since using the fix - looks like it has worked

kaxil added a commit that referenced this pull request Feb 4, 2021
closes #13504

Currently, the DagFileProcessor parses the DAG files, writes it to the
`dag` table and then writes DAGs to `serialized_dag` table.

At the same time, the scheduler loop is constantly looking for the next
DAGs to process based on ``next_dagrun_create_after`` column of the DAG
table.

It might happen that as soon as the DagFileProcessor writes DAG to `dag`
table, the scheduling loop in the Scheduler picks up the DAG for processing.
However, as the DagFileProcessor has not written to serialized DAG table yet
the scheduler will error with "Serialized Dag not Found" error.

This would mainly happen when the DAGs are dynamic where the result of one DAG,
creates multiple DAGs.

This commit changes the order of writing DAG and Serialized DAG and hence
before a DAG is written to `dag` table it will be written to `serialized_dag` table.

(cherry picked from commit b9eb51a)
@fjmacagno
Copy link
Contributor

fjmacagno commented Feb 6, 2021

I am still seeing a lot of errors in 2.0.1rc2 like

airflow.exceptions.SerializedDagNotFound: DAG 'targeting_pipeline_dappack.shard_3' not found in serialized_
dag table

Is this not part of rc2? Or is it likely a different problem?

@kaxil
Copy link
Member Author

kaxil commented Feb 6, 2021

I am still seeing a lot of errors in 2.0.0rc2 like

airflow.exceptions.SerializedDagNotFound: DAG 'targeting_pipeline_dappack.shard_3' not found in serialized_
dag table

Is this not part of rc2? Or is it likely a different problem?

This is included in 2.0.1rc2 (not 2.0.0rc2) -- and yeah your problem might be different, difficult to tell without knowing details

@fjmacagno
Copy link
Contributor

Damn, oh well. I will put in a new issue.

(I meant 2.0.1, typo)

kaxil added a commit to astronomer/airflow that referenced this pull request Feb 16, 2021
closes apache#13504

Currently, the DagFileProcessor parses the DAG files, writes it to the
`dag` table and then writes DAGs to `serialized_dag` table.

At the same time, the scheduler loop is constantly looking for the next
DAGs to process based on ``next_dagrun_create_after`` column of the DAG
table.

It might happen that as soon as the DagFileProcessor writes DAG to `dag`
table, the scheduling loop in the Scheduler picks up the DAG for processing.
However, as the DagFileProcessor has not written to serialized DAG table yet
the scheduler will error with "Serialized Dag not Found" error.

This would mainly happen when the DAGs are dynamic where the result of one DAG,
creates multiple DAGs.

This commit changes the order of writing DAG and Serialized DAG and hence
before a DAG is written to `dag` table it will be written to `serialized_dag` table.

(cherry picked from commit b9eb51a)
(cherry picked from commit 253d20a)
@AramisN
Copy link

AramisN commented Oct 6, 2021

Said its been fixed in 2.0.x but i see it again in 2.1.x :|

Something bad has happened.
Please consider letting us know by creating a bug report using GitHub.

Python version: 3.8.5
Airflow version: 2.1.1
Node: f723608ad587

Traceback (most recent call last):
File "/root/miniconda3/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/root/miniconda3/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/root/miniconda3/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/root/miniconda3/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "/root/miniconda3/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/root/miniconda3/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functionsrule.endpoint
File "/root/miniconda3/lib/python3.8/site-packages/airflow/www/auth.py", line 34, in decorated
return func(*args, **kwargs)
File "/root/miniconda3/lib/python3.8/site-packages/airflow/www/decorators.py", line 97, in view_func
return f(*args, **kwargs)
File "/root/miniconda3/lib/python3.8/site-packages/airflow/www/decorators.py", line 60, in wrapper
return f(*args, **kwargs)
File "/root/miniconda3/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/root/miniconda3/lib/python3.8/site-packages/airflow/www/views.py", line 2170, in graph
dag = current_app.dag_bag.get_dag(dag_id)
File "/root/miniconda3/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/root/miniconda3/lib/python3.8/site-packages/airflow/models/dagbag.py", line 186, in get_dag
self._add_dag_from_db(dag_id=dag_id, session=session)
File "/root/miniconda3/lib/python3.8/site-packages/airflow/models/dagbag.py", line 258, in _add_dag_from_db
raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'DLS_SYS_REGION' not found in serialized_dag table

@kaxil
Copy link
Member Author

kaxil commented Oct 6, 2021

@AramisN There are many reasons for getting "not found in serialized_dag table" -- you might be facing a different error. The PR fixed breaking the scheduler, and from the stacktrace you are seeing this error on webserver.

Please create a new issue with steps to reproduce

@apache apache locked as off-topic and limited conversation to collaborators Oct 6, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Scheduler is unable to find serialized DAG in the serialized_dag table

6 participants