Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions django/dispatch/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,28 @@ def _make_id(target):
NO_RECEIVERS = object()


async def _gather(*coros):
if len(coros) == 0:
return []

if len(coros) == 1:
return [await coros[0]]

async def run(i, coro):
results[i] = await coro

try:
async with asyncio.TaskGroup() as tg:
results = [None] * len(coros)
for i, coro in enumerate(coros):
tg.create_task(run(i, coro))
return results
except BaseExceptionGroup as exception_group:
if len(exception_group.exceptions) == 1:
raise exception_group.exceptions[0]
raise
Comment on lines +25 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be simplified to:

Suggested change
async def _gather(*coros):
if len(coros) == 0:
return []
if len(coros) == 1:
return [await coros[0]]
async def run(i, coro):
results[i] = await coro
try:
async with asyncio.TaskGroup() as tg:
results = [None] * len(coros)
for i, coro in enumerate(coros):
tg.create_task(run(i, coro))
return results
except BaseExceptionGroup as exception_group:
if len(exception_group.exceptions) == 1:
raise exception_group.exceptions[0]
raise
async def gather_tasks(*coros):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(coro) for coro in coros]
return [task.result() for task in tasks]

unless there's reason for the error handling? (BaseExceptionGroup).

Also, it might be worth moving the function to a utils.py file, so that it can easily be reused by other places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the error handling is important to maintain backwards compatibility with the old use of gather

Copy link
Contributor Author

@graingert graingert Apr 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping an extra reference to tasks here will result in reference cycles in the traceback of any errors that will delay garbage collection if there's any errors in signals

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To maintain backward compatibility, shouldn't the first exception be raised even when the exception group contains multiple exceptions?



class Signal:
"""
Base class for all signals
Expand Down Expand Up @@ -166,7 +188,7 @@ def send(self, sender, **named):

If any receivers are asynchronous, they are called after all the
synchronous receivers via a single call to async_to_sync(). They are
also executed concurrently with asyncio.gather().
also executed concurrently with asyncio.TaskGroup().

Arguments:

Expand All @@ -191,7 +213,7 @@ def send(self, sender, **named):
if async_receivers:

async def asend():
async_responses = await asyncio.gather(
async_responses = await _gather(
*(
receiver(signal=self, sender=sender, **named)
for receiver in async_receivers
Expand All @@ -215,7 +237,7 @@ async def asend(self, sender, **named):
sync_to_async() adaption before executing any asynchronous receivers.

If any receivers are asynchronous, they are grouped and executed
concurrently with asyncio.gather().
concurrently with asyncio.TaskGroup().

Arguments:

Expand Down Expand Up @@ -248,9 +270,9 @@ def sync_send():
async def sync_send():
return []

responses, async_responses = await asyncio.gather(
responses, async_responses = await _gather(
sync_send(),
asyncio.gather(
_gather(
*(
receiver(signal=self, sender=sender, **named)
for receiver in async_receivers
Expand All @@ -274,7 +296,7 @@ def send_robust(self, sender, **named):

If any receivers are asynchronous, they are called after all the
synchronous receivers via a single call to async_to_sync(). They are
also executed concurrently with asyncio.gather().
also executed concurrently with asyncio.TaskGroup().

Arguments:

Expand Down Expand Up @@ -320,7 +342,7 @@ async def asend_and_wrap_exception(receiver):
return response

async def asend():
async_responses = await asyncio.gather(
async_responses = await _gather(
*(
asend_and_wrap_exception(receiver)
for receiver in async_receivers
Expand All @@ -339,7 +361,7 @@ async def asend_robust(self, sender, **named):
sync_to_async() adaption before executing any asynchronous receivers.

If any receivers are asynchronous, they are grouped and executed
concurrently with asyncio.gather.
concurrently with asyncio.TaskGroup.

Arguments:

Expand Down Expand Up @@ -394,9 +416,9 @@ async def asend_and_wrap_exception(receiver):
return err
return response

responses, async_responses = await asyncio.gather(
responses, async_responses = await _gather(
sync_send(),
asyncio.gather(
_gather(
*(asend_and_wrap_exception(receiver) for receiver in async_receivers),
),
)
Expand Down
2 changes: 1 addition & 1 deletion docs/topics/signals.txt
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ in order to reduce the number of sync/async calling-style switches within a
they are async before being called. This means that an asynchronous receiver
registered before a synchronous receiver may be executed after the synchronous
receiver. In addition, async receivers are executed concurrently using
``asyncio.gather()``.
``asyncio.TaskGroup()``.

All built-in signals, except those in the async request-response cycle, are
dispatched using :meth:`Signal.send`.
Expand Down
Loading