-
Notifications
You must be signed in to change notification settings - Fork 1
fix: support for parallel queues #715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
cddfede to
7660759
Compare
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements support for parallel scan queues in BEC, allowing users to run multiple experiments concurrently. The key changes enable clients to select a target queue dynamically, with automatic cleanup of idle non-primary queues after 60 seconds of inactivity.
- Adds
set_default_scan_queue()andget_default_scan_queue()methods to the client API - Implements auto-shutdown timer for idle secondary queues (primary queue never auto-shuts down)
- Updates all queue-related operations to support multiple queues and respect the user's queue selection
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| bec_server/bec_server/scan_server/scan_queue.py | Implements QueueManager.remove_queue() method and ScanQueue auto-shutdown timer logic with AUTO_SHUTDOWN_TIME constant |
| bec_server/bec_server/scan_server/scan_server.py | Reorders initialization to create device_manager before queue_manager, adds explicit primary queue creation |
| bec_server/bec_server/scan_server/tests/utils.py | Comments out shutdown method override in ScanServerMock |
| bec_server/bec_server/scan_server/tests/fixtures.py | Adds server.shutdown() call to fixture cleanup |
| bec_server/tests/tests_scan_server/test_scan_server_queue.py | Adds test for auto-shutdown behavior, reduces AUTO_SHUTDOWN_TIME for testing, removes obsolete test |
| bec_server/tests/tests_scan_server/test_scan_guard.py | Updates MockQueue to include signal_event and stop_worker, shuts down queue_manager before mocking |
| bec_server/tests/tests_scan_bundler/test_scan_bundler.py | Removes obsolete scan queue callback test |
| bec_server/bec_server/scan_bundler/scan_bundler.py | Removes scan_queue_status registration and current_queue tracking (no longer needed) |
| bec_lib/bec_lib/scan_manager.py | Adds set_default_scan_queue/get_default_scan_queue methods, updates modification methods to use target queue |
| bec_lib/bec_lib/scans.py | Adds scan_queue parameter to _run method and prepare_scan_request |
| bec_lib/bec_lib/scan_items.py | Updates current_scan_info to use target queue, updates update_with_queue_status to iterate all queues |
| bec_lib/bec_lib/queue_items.py | Updates _update_with_buffer and update_with_status to iterate all queues, updates update_with_client_message to use target queue |
| bec_lib/bec_lib/device.py | Updates _prepare_rpc_msg to use client's default scan queue instead of hardcoded "primary" |
| bec_lib/bec_lib/devicemanager.py | Updates docstring comment for monitored_devices method |
| bec_lib/bec_lib/client.py | Adds typeguard import for typechecked decorator |
| bec_lib/tests/test_devices.py | Adds mock for get_default_scan_queue in RPC test |
| bec_ipython_client/bec_ipython_client/callbacks/ipython_live_updates.py | Updates _element_in_queue and _process_queue to use default scan queue |
Comments suppressed due to low confidence (1)
bec_lib/bec_lib/scan_manager.py:93
- The request_scan_interruption method does not specify the target queue in the ScanQueueModificationMessage, unlike request_scan_abortion, request_scan_halt, and request_scan_continuation. This means interruption requests will always go to the "primary" queue (the default), even when the user has set a different default queue. Consider adding the target_queue parameter similar to the other modification methods.
return self.connector.send(
MessageEndpoints.scan_queue_modification_request(),
messages.ScanQueueModificationMessage(scan_id=scan_id, action=action, parameter={}),
)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
c8fcf08 to
3395d81
Compare
3395d81 to
f4b6441
Compare
cappel89
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor comments below. While testing, I realized that you currently won't be able to run 2 scans in parallel. One for sure is that if there are multiple devices that rely on a state (as discusssed previously offline), for example SimWaveForm..
However, another issue seems to be that multiple clients submitting to different queues get conflicting scan_numbers and dataset_numbers.
I will try to solve the scan number problem. Shouldn't be too difficult. The ownership however, is probably something for another PR. |
55b2574 to
1cb4676
Compare
482f25d to
9685043
Compare
|
Both. We are currently stopping all enabled devices. Once we figure out a nice way to distribute ownership, we should be able to limit the stop to only the relevant devices |
This PR fixes the support for parallel queues. Clients can now set their target queue by running
Any subsequent scan will be submitted to the "secondary" queue. (Note that the name can be chosen arbitrarily).
For submitting just a single scan to a different queue, use the scan's scan_queue kwarg, e.g.
Except for the
primaryqueue, all queues will be shut down after being idle for 60 s. The primary queue will not shut down automatically.Please note that in order to fix the support for parallel scans, I had to change the scan number assignment logic. Now, scan numbers are incremented at the beginning of a scan, not retrospectively at the end. Otherwise, two parallel scans would receive the same scan number. I kept the logic in the client the same though: The method is still called "next_scan_number" and "next_dataset_number" and the IPython client shows indeed the next scan number.
closes #714
closes #713
closes #717
For a full support of parallel scans, we need to implement a proper device locking mechanism, cf. #643