Skip to content

Conversation

@jcrist
Copy link

@jcrist jcrist commented Jul 6, 2020

Previously when the executor.start() contextmanager would exit early (say on an interrupt or error), there might be background work still happening, since the executor wouldn't wait for or cancel the remaining submitted tasks.

We now attempt to cancel all pending work running on the executor, and wait for tasks that haven't completed in cases where we can't efficiently/robustly cancel them. This ensures that when executor.start() exits, no tasks will continue to progress.

  • For a DaskExecutor with a temporary cluster, we shutdown the cluster, so no lingering work can persist.
  • For a DaskExecutor with an external cluster (specified with an address) or an inproc cluster (using local threads only), we stop all pending tasks and wait for all running tasks to finish.
  • For a LocalDaskExecutor, we terminate the backing pool as quickly as possible, then wait for the pool to close.
    • For a process pool, this terminates the processes
    • For a thread pool, we attempt to interrupt the threads (CPython specific).
  • A LocalExecutor runs everything in the main thread, so no background tasks can persist.

This is a precursor for the cancellation implementation (#2771).

Please describe your work and make sure your PR:

  • adds new tests (if appropriate)
  • add a changelog entry in the changes/ directory (if appropriate)
  • updates docstrings for any new functions or function arguments, including docs/outline.toml for API reference docs (if appropriate)

jcrist added 4 commits July 6, 2020 15:58
Previously, the dask executors could leave work continuuing in the
background upon exit from the `start` contextmanager. This could lead to
odd behavior, where tasks from a flow run might still be running after
`flow.run()` has completed.

This PR changes the semantics of the `start` contextmanager so that no
lingering work remains after an executor has exited the `start`
contextmanager.

- For a `DaskExecutor` with a temporary cluster, we shutdown the
  cluster, so no lingering work can persist.
- For a `DaskExecutor` with an external cluster (specified with an
  `address`), we stop all pending tasks and wait for all running tasks
  to finish.
- For a `LocalDaskExecutor`, we terminate the backing pool as quickly as
  possible, then wait for the pool to close.
  - For a process pool, this terminates the processes
  - For a thread pool, we attempt to interrupt the threads (CPython
    specific).
- A `LocalExecutor` runs everything in the main thread, so no background
  tasks can persist.

This is a precursor to getting Cancellation to work in a robust way.
@codecov
Copy link

codecov bot commented Jul 7, 2020

Codecov Report

Merging #2920 into master will decrease coverage by 0.03%.
The diff coverage is 90.65%.

The current limits are really old.
Copy link
Member

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this feels like a solid enhancement. Left a few minor comments.

@jcrist
Copy link
Author

jcrist commented Jul 8, 2020

I believe all comments have been addressed.

@jcrist jcrist merged commit 30c88b3 into PrefectHQ:master Jul 8, 2020
@jcrist jcrist deleted the cleanup-tasks-executor branch July 8, 2020 02:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants