Skip to content

Conversation

pcolladosoto
Copy link
Contributor

This PR fixes the cleanup procedures of the DAGFileProcessors being spawned by the DagFileProcessorManager instance. Thus, this PR closes #22191.

We came across this bug when encountering a timeout triggering the forceful killing of these DAGFileProcessors.

We have had somewhat lengthy discussion on an issue (#22191) which we encourage anyone to read for some more insight into the cause, discovery and posterior fix. People on that thread were extremely helpful! 😸

We have not included tests because we're unsure of how to test a behaviour such as this one. If pointed in the right direction we would be more than happy to add them. We can say however that we applied these changes on our own production Airflow instance and we haven't encountered the issue ever since.

On top of that we would be more than welcome if you made any suggestions to the code: for instance, when cleaning up a dictionary we're iterating over we decided to take note of the problematic DAGFileProcessors to then remove them on a second for loop. Our background in programming is much stronger on some other languages and so we feel really uncomfortable pushing Python 'to the limit' in terms of relying on its implementation to make design choices. If there's anything that can be done better by all means say so.

Another fertile topic for discussion is how to wait() for the processes being killed through the SIGKILL signal. This has been brought up by @dlesco on #22191 and we agree with him on adding an optional timeout to the operation to avoid blocking in very bizarre circumstances (which the current solution would do). However, we decided to contribute our initial approach and then iterate on solutions within the PR.

Thanks a lot for letting me contribute to a tool with the expertise and size of Airflow: it's truly an honour.

Hope to make the merge as seamless as possible 😜

closes: #22191

References to processors weren't being cleaned up after
killing them in the event of a timeout. This lead to
a crash caused by an unhandled exception when trying to
read from a closed end of a pipe.
@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Apr 1, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Apr 1, 2022

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

When calling `_kill_process()` we're generating
zombies which weren't being `wait()`ed for. This
led to a process leak we fix by just calling
`waitpid()` on the appropriate PIDs.
os.kill(self._process.pid, signal.SIGKILL)

# Reap the spawned zombie
os.waitpid(self._process.pid, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed there might be a short time between even SIGKILL gets processed so waiting here makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if there's anything here that could be racy, but there are some details here: https://bugs.python.org/issue42558.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ough. Very interesting . So it seems that this one (or similar) is safer (ad waitpid might crash for Python 3.9+) :)

Do I read it right ?

while self._process.poll() is None:
    sleep(0.001)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is how I read it too.

@pcolladosoto you wanna make that change? I don't think I had anything else that needed changing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @malthe! I just pushed the changes: I decided to import the time module so that I could call time.sleep() within the while loop. Hope I made the right choice... You can check out the changes on 27502be. Thanks a lot for the input! This is something that I'm sure will come in handy for future projects 😸

@potiuk
Copy link
Member

potiuk commented Apr 1, 2022

This looks really great fix. @ashb @uranusjr @malthe -> the usual suspectcs :). can you also take a look please.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Apr 1, 2022
@github-actions
Copy link

github-actions bot commented Apr 1, 2022

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

According to @potiuk's and @malthe's input, the way
we were reaping the zombies could cause some racy and
unwanted situations. As seen on the discussion over at
`https://bugs.python.org/issue42558` we can safely
reap the spawned zombies with the changes we have
introduced.
pcolladosoto and others added 2 commits April 2, 2022 21:15
As suggested by @potiuk explaining why we chose to actively wait on an scenario such as this one can indeed be useful for anybody taking a look at the code some time from now...

Co-authored-by: Jarek Potiuk <[email protected]>
After accepting the changes proposed on the PR
we found a small typo (we make those on a daily basis)
and a trailing whitespace we though was nice to delete.
Hope we made the right choice!
pcolladosoto and others added 7 commits April 3, 2022 18:28
We were calling `poll()` through the `_process` attribute
and, as shown on the static checks triggered by GitHub,
it's not defined for the `BaseProcess` class. We instead
have to call `poll()` through `BaseProcess`'s `_popen`
attribute.
References to processors weren't being cleaned up after
killing them in the event of a timeout. This lead to
a crash caused by an unhandled exception when trying to
read from a closed end of a pipe.
When calling `_kill_process()` we're generating
zombies which weren't being `wait()`ed for. This
led to a process leak we fix by just calling
`waitpid()` on the appropriate PIDs.
According to @potiuk's and @malthe's input, the way
we were reaping the zombies could cause some racy and
unwanted situations. As seen on the discussion over at
`https://bugs.python.org/issue42558` we can safely
reap the spawned zombies with the changes we have
introduced.
As suggested by @potiuk explaining why we chose to actively wait on an scenario such as this one can indeed be useful for anybody taking a look at the code some time from now...

Co-authored-by: Jarek Potiuk <[email protected]>
After accepting the changes proposed on the PR
we found a small typo (we make those on a daily basis)
and a trailing whitespace we though was nice to delete.
Hope we made the right choice!
We were calling `poll()` through the `_process` attribute
and, as shown on the static checks triggered by GitHub,
it's not defined for the `BaseProcess` class. We instead
have to call `poll()` through `BaseProcess`'s `_popen`
attribute.
@pcolladosoto
Copy link
Contributor Author

Hi @potiuk! Is there anything else that needs to be done on my end? 🤔

Thanks for your time!

@potiuk
Copy link
Member

potiuk commented Apr 4, 2022

I guess fixing static checks and tests failing since you asked. But I think GitHub already told you so by all the notificaitons and you can see it in the red status of the PR.