-
Notifications
You must be signed in to change notification settings - Fork 15.8k
Fix processor cleanup on DagFileProcessorManager #22685
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
References to processors weren't being cleaned up after killing them in the event of a timeout. This lead to a crash caused by an unhandled exception when trying to read from a closed end of a pipe.
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
When calling `_kill_process()` we're generating zombies which weren't being `wait()`ed for. This led to a process leak we fix by just calling `waitpid()` on the appropriate PIDs.
airflow/dag_processing/processor.py
Outdated
os.kill(self._process.pid, signal.SIGKILL) | ||
|
||
# Reap the spawned zombie | ||
os.waitpid(self._process.pid, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed there might be a short time between even SIGKILL gets processed so waiting here makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if there's anything here that could be racy, but there are some details here: https://bugs.python.org/issue42558.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ough. Very interesting . So it seems that this one (or similar) is safer (ad waitpid might crash for Python 3.9+) :)
Do I read it right ?
while self._process.poll() is None:
sleep(0.001)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is how I read it too.
@pcolladosoto you wanna make that change? I don't think I had anything else that needed changing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @malthe! I just pushed the changes: I decided to import the time
module so that I could call time.sleep()
within the while
loop. Hope I made the right choice... You can check out the changes on 27502be. Thanks a lot for the input! This is something that I'm sure will come in handy for future projects 😸
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
According to @potiuk's and @malthe's input, the way we were reaping the zombies could cause some racy and unwanted situations. As seen on the discussion over at `https://bugs.python.org/issue42558` we can safely reap the spawned zombies with the changes we have introduced.
As suggested by @potiuk explaining why we chose to actively wait on an scenario such as this one can indeed be useful for anybody taking a look at the code some time from now... Co-authored-by: Jarek Potiuk <[email protected]>
After accepting the changes proposed on the PR we found a small typo (we make those on a daily basis) and a trailing whitespace we though was nice to delete. Hope we made the right choice!
We were calling `poll()` through the `_process` attribute and, as shown on the static checks triggered by GitHub, it's not defined for the `BaseProcess` class. We instead have to call `poll()` through `BaseProcess`'s `_popen` attribute.
References to processors weren't being cleaned up after killing them in the event of a timeout. This lead to a crash caused by an unhandled exception when trying to read from a closed end of a pipe.
When calling `_kill_process()` we're generating zombies which weren't being `wait()`ed for. This led to a process leak we fix by just calling `waitpid()` on the appropriate PIDs.
According to @potiuk's and @malthe's input, the way we were reaping the zombies could cause some racy and unwanted situations. As seen on the discussion over at `https://bugs.python.org/issue42558` we can safely reap the spawned zombies with the changes we have introduced.
As suggested by @potiuk explaining why we chose to actively wait on an scenario such as this one can indeed be useful for anybody taking a look at the code some time from now... Co-authored-by: Jarek Potiuk <[email protected]>
After accepting the changes proposed on the PR we found a small typo (we make those on a daily basis) and a trailing whitespace we though was nice to delete. Hope we made the right choice!
We were calling `poll()` through the `_process` attribute and, as shown on the static checks triggered by GitHub, it's not defined for the `BaseProcess` class. We instead have to call `poll()` through `BaseProcess`'s `_popen` attribute.
Hi @potiuk! Is there anything else that needs to be done on my end? 🤔 Thanks for your time! |
I guess fixing static checks and tests failing since you asked. But I think GitHub already told you so by all the notificaitons and you can see it in the red status of the PR. |
This PR fixes the cleanup procedures of the
DAGFileProcessor
s being spawned by theDagFileProcessorManager
instance. Thus, this PR closes #22191.We came across this bug when encountering a timeout triggering the forceful killing of these
DAGFileProcessor
s.We have had somewhat lengthy discussion on an issue (#22191) which we encourage anyone to read for some more insight into the cause, discovery and posterior fix. People on that thread were extremely helpful! 😸
We have not included tests because we're unsure of how to test a behaviour such as this one. If pointed in the right direction we would be more than happy to add them. We can say however that we applied these changes on our own production Airflow instance and we haven't encountered the issue ever since.
On top of that we would be more than welcome if you made any suggestions to the code: for instance, when cleaning up a dictionary we're iterating over we decided to take note of the problematic
DAGFileProcessor
s to then remove them on a secondfor
loop. Our background in programming is much stronger on some other languages and so we feel really uncomfortable pushing Python 'to the limit' in terms of relying on its implementation to make design choices. If there's anything that can be done better by all means say so.Another fertile topic for discussion is how to
wait()
for the processes being killed through theSIGKILL
signal. This has been brought up by @dlesco on #22191 and we agree with him on adding an optional timeout to the operation to avoid blocking in very bizarre circumstances (which the current solution would do). However, we decided to contribute our initial approach and then iterate on solutions within the PR.Thanks a lot for letting me contribute to a tool with the expertise and size of Airflow: it's truly an honour.
Hope to make the merge as seamless as possible 😜
closes: #22191