-
Notifications
You must be signed in to change notification settings - Fork 15.8k
Fix race condition when using Dynamic DAGs #13893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
closes apache#13504 Currently the DagFileProcessor parses the DAG files, writes it to the `dag` table and then writes DAGs to `serialized_dag` table. At the same time, the scheduler loop is constantly looking for the next DAGs to process based on ``next_dagrun_create_after`` column of the DAG table. It might happen that as soon as the DagFileProcessor writes DAG to `dag` table, the scheduling loop in the Scheduler picks up the DAG for preocessing. However, as the DagFileProcessor has not written to serialized DAG table yet the scheduler will error with "Serialized Dag not Found" error. This would mainly happen when the DAGs are dynamic where the result of one DAG, creates multiple DAGs. This commit changes the order of writing DAG and Serialized DAG and hence before a DAG is written to `dag` table it will be written to `serialized_dag` table.
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
Thanks @kaxil for the fix, we'll try that out today and let you know if there's any issues. I haven't been able to reproduce the error locally yet - I imagine with a local DB connection the DAG serialization is happening too fast to trigger the race condition. I'm wondering if adding a large file import to one of the operators could artificially slow it down a bit, but I don't know enough about how the serialization works to know for sure. I've only managed to get the error locally by removing a row from the serialized_dag table. It looks like the try/excepts you've added would handle this though as well. Here's the script I was toying with locally anyway to try and trigger the error, in case that's of use (increase the value in the range function to generate more DAGs):
|
closes #13504 Currently, the DagFileProcessor parses the DAG files, writes it to the `dag` table and then writes DAGs to `serialized_dag` table. At the same time, the scheduler loop is constantly looking for the next DAGs to process based on ``next_dagrun_create_after`` column of the DAG table. It might happen that as soon as the DagFileProcessor writes DAG to `dag` table, the scheduling loop in the Scheduler picks up the DAG for processing. However, as the DagFileProcessor has not written to serialized DAG table yet the scheduler will error with "Serialized Dag not Found" error. This would mainly happen when the DAGs are dynamic where the result of one DAG, creates multiple DAGs. This commit changes the order of writing DAG and Serialized DAG and hence before a DAG is written to `dag` table it will be written to `serialized_dag` table. (cherry picked from commit b9eb51a)
did it work @nik-davis ? |
@tooptoop4 I haven't seen the issue since using the fix - looks like it has worked |
closes #13504 Currently, the DagFileProcessor parses the DAG files, writes it to the `dag` table and then writes DAGs to `serialized_dag` table. At the same time, the scheduler loop is constantly looking for the next DAGs to process based on ``next_dagrun_create_after`` column of the DAG table. It might happen that as soon as the DagFileProcessor writes DAG to `dag` table, the scheduling loop in the Scheduler picks up the DAG for processing. However, as the DagFileProcessor has not written to serialized DAG table yet the scheduler will error with "Serialized Dag not Found" error. This would mainly happen when the DAGs are dynamic where the result of one DAG, creates multiple DAGs. This commit changes the order of writing DAG and Serialized DAG and hence before a DAG is written to `dag` table it will be written to `serialized_dag` table. (cherry picked from commit b9eb51a)
I am still seeing a lot of errors in 2.0.1rc2 like
Is this not part of rc2? Or is it likely a different problem? |
This is included in 2.0.1rc2 (not 2.0.0rc2) -- and yeah your problem might be different, difficult to tell without knowing details |
Damn, oh well. I will put in a new issue. (I meant 2.0.1, typo) |
closes apache#13504 Currently, the DagFileProcessor parses the DAG files, writes it to the `dag` table and then writes DAGs to `serialized_dag` table. At the same time, the scheduler loop is constantly looking for the next DAGs to process based on ``next_dagrun_create_after`` column of the DAG table. It might happen that as soon as the DagFileProcessor writes DAG to `dag` table, the scheduling loop in the Scheduler picks up the DAG for processing. However, as the DagFileProcessor has not written to serialized DAG table yet the scheduler will error with "Serialized Dag not Found" error. This would mainly happen when the DAGs are dynamic where the result of one DAG, creates multiple DAGs. This commit changes the order of writing DAG and Serialized DAG and hence before a DAG is written to `dag` table it will be written to `serialized_dag` table. (cherry picked from commit b9eb51a) (cherry picked from commit 253d20a)
Said its been fixed in 2.0.x but i see it again in 2.1.x :| Something bad has happened. Python version: 3.8.5
|
@AramisN There are many reasons for getting "not found in serialized_dag table" -- you might be facing a different error. The PR fixed breaking the scheduler, and from the stacktrace you are seeing this error on webserver. Please create a new issue with steps to reproduce |
closes #13504
Currently the DagFileProcessor parses the DAG files, writes it to the
dag
table and then writes DAGs toserialized_dag
table.At the same time, the scheduler loop is constantly looking for the next
DAGs to process based on
next_dagrun_create_after
column of the DAGtable.
It might happen that as soon as the DagFileProcessor writes DAG to
dag
table, the scheduling loop in the Scheduler picks up the DAG for preocessing.
However, as the DagFileProcessor has not written to serialized DAG table yet
the scheduler will error with "Serialized Dag not Found" error.
This would mainly happen when the DAGs are dynamic where the result of one DAG,
creates multiple DAGs.
This commit changes the order of writing DAG and Serialized DAG and hence
before a DAG is written to
dag
table it will be written toserialized_dag
table.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.