Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion django/core/handlers/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,25 @@ async def send_response(self, response, send):
"headers": response_headers,
}
)
# Streaming responses with TaskGroup/Timeout need to be pinned to their
# iterator.
if response.streaming_acmgr:
async with response.streaming_acmgr_content as content:
async for part in content:
for chunk, _ in self.chunk_bytes(part):
await send(
{
"type": "http.response.body",
"body": chunk,
# Ignore "more" as there may be more parts; instead,
# use an empty final closing message with False.
"more_body": True,
}
)
# Final closing message.
await send({"type": "http.response.body"})
# Streaming responses need to be pinned to their iterator.
if response.streaming:
elif response.streaming:
# - Consume via `__aiter__` and not `streaming_content` directly, to
# allow mapping of a sync iterator.
# - Use aclosing() when consuming aiter. See
Expand All @@ -342,6 +359,7 @@ async def send_response(self, response, send):
)
# Final closing message.
await send({"type": "http.response.body"})

# Other responses just need chunking.
else:
# Yield chunks of response.
Expand Down
2 changes: 2 additions & 0 deletions django/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
HttpResponseRedirect,
HttpResponseServerError,
JsonResponse,
StreamingAcmgrHttpResponse,
StreamingHttpResponse,
)

Expand All @@ -36,6 +37,7 @@
"HttpResponse",
"HttpResponseBase",
"StreamingHttpResponse",
"StreamingAcmgrHttpResponse",
"HttpResponseRedirect",
"HttpResponsePermanentRedirect",
"HttpResponseNotModified",
Expand Down
76 changes: 76 additions & 0 deletions django/http/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ class HttpResponse(HttpResponseBase):
"""

streaming = False
streaming_acmgr = False

def __init__(self, content=b"", *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -445,6 +446,7 @@ class StreamingHttpResponse(HttpResponseBase):
"""

streaming = True
streaming_acmgr = False

def __init__(self, streaming_content=(), *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -542,6 +544,80 @@ def getvalue(self):
return b"".join(self.streaming_content)


class StreamingAcmgrHttpResponse(HttpResponseBase):
"""
A streaming HTTP response class with an iterator as content.

This should only be iterated once, when the response is streamed to the
client. However, it can be appended to or replaced with a new iterator
that wraps the original content (or yields entirely new content).
"""

is_async = True
streaming = True
streaming_acmgr = True

def __init__(self, streaming_acmgr_content=(), *args, **kwargs):
super().__init__(*args, **kwargs)
# `streaming_content` should be an
# AbstractAsyncContextManager[AsyncIterator[bytes]].
self.streaming_acmgr_content = streaming_acmgr_content

def __repr__(self):
return "<%(cls)s status_code=%(status_code)d%(content_type)s>" % {
"cls": self.__class__.__qualname__,
"status_code": self.status_code,
"content_type": self._content_type_for_repr,
}

@property
def content(self):
raise AttributeError(
"This %s instance has no `content` attribute. Use "
"`streaming_content` instead." % self.__class__.__name__
)

@property
def text(self):
raise AttributeError(
"This %s instance has no `text` attribute." % self.__class__.__name__
)

@property
def streaming_content(self):
raise AttributeError(
"This %s instance has no `streaming_content` attribute. Use "
"`streaming_acmgr_content` instead." % self.__class__.__name__
)

def __iter__(self):
warnings.warn(
"StreamingHttpResponse must consume asynchronous iterators in order to "
"serve them synchronously. Use a synchronous iterator instead.",
Warning,
stacklevel=2,
)

# async iterator. Consume in async_to_sync and map back.
async def to_list():
async with self.streaming_acmgr_content as v:
return [chunk async for chunk in v]

return map(
self.make_bytes, iter(async_to_sync(to_list)(self.streaming_acmgr_content))
)

async def __aiter__(self):
warnings.warn(
"StreamingAcmgrHttpResponse must consume asynchronous iterators in"
" order to serve them asynchronously. Use a cmgr instead.",
Warning,
stacklevel=2,
)
async with self.streaming_acmgr_content as v:
return iter([chunk async for chunk in v])


class FileResponse(StreamingHttpResponse):
"""
A streaming HTTP response class optimized for files.
Expand Down
21 changes: 20 additions & 1 deletion django/middleware/gzip.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import contextlib

from django.utils.cache import patch_vary_headers
from django.utils.deprecation import MiddlewareMixin
from django.utils.regex_helper import _lazy_re_compile
Expand Down Expand Up @@ -30,7 +32,24 @@ def process_response(self, request, response):
if not re_accepts_gzip.search(ae):
return response

if response.streaming:
if response.streaming_acmgr:
original_iterator = response.streaming_acmgr_content

@contextlib.asynccontextmanager
async def gzip_acmgr_wrapper():
async with original_iterator as v:
async for chunk in v:
yield compress_string(
chunk,
max_random_bytes=self.max_random_bytes,
)

response.streaming_acmgr_content = gzip_acmgr_wrapper()
# Delete the `Content-Length` header for streaming content, because
# we won't know the compressed size until we stream it.
del response.headers["Content-Length"]

elif response.streaming:
if response.is_async:
# pull to lexical scope to capture fixed reference in case
# streaming_content is set again later.
Expand Down
35 changes: 35 additions & 0 deletions tests/asgi/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,3 +659,38 @@ async def test_streaming_disconnect(self):
# 'last\n' isn't sent.
with self.assertRaises(asyncio.TimeoutError):
await communicator.receive_output(timeout=0.2)

async def test_streaming_acmgr(self):
scope = self.async_request_factory._base_scope(
path="/streaming_acmgr/", query_string=b"sleep=0.001"
)
application = get_asgi_application()
communicator = ApplicationCommunicator(application, scope)
await communicator.send_input({"type": "http.request"})
# Fetch http.response.start.
await communicator.receive_output(timeout=1)
# Fetch the 'first' and 'last'.
first_response = await communicator.receive_output(timeout=1)
self.assertEqual(first_response["body"], b"first\n")
second_response = await communicator.receive_output(timeout=1)
self.assertEqual(second_response["body"], b"last\n")
# Fetch the rest of the response so that coroutines are cleaned up.
await communicator.receive_output(timeout=1)
with self.assertRaises(asyncio.TimeoutError):
await communicator.receive_output(timeout=1)

async def test_streaming_acmgr_disconnect(self):
scope = self.async_request_factory._base_scope(
path="/streaming_acmgr/", query_string=b"sleep=0.1"
)
application = get_asgi_application()
communicator = ApplicationCommunicator(application, scope)
await communicator.send_input({"type": "http.request"})
await communicator.receive_output(timeout=1)
first_response = await communicator.receive_output(timeout=1)
self.assertEqual(first_response["body"], b"first\n")
# Disconnect the client.
await communicator.send_input({"type": "http.disconnect"})
# 'last\n' isn't sent.
with self.assertRaises(asyncio.TimeoutError):
await communicator.receive_output(timeout=0.2)
45 changes: 44 additions & 1 deletion tests/asgi/urls.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import asyncio
import contextlib
import threading
import time

from django.http import FileResponse, HttpResponse, StreamingHttpResponse
from django.http import (
FileResponse,
HttpResponse,
StreamingAcmgrHttpResponse,
StreamingHttpResponse,
)
from django.urls import path
from django.views.decorators.csrf import csrf_exempt

Expand Down Expand Up @@ -56,6 +62,42 @@ async def streaming_view(request):
return StreamingHttpResponse(streaming_inner(sleep_time))


class QueueIterator:
def __init__(self, q, eof):
self._q = q
self._eof = eof

def __aiter__(self):
return self

async def __anext__(self):
msg = await self._q.get()
if msg is self._eof:
raise StopAsyncIteration
return msg


@contextlib.asynccontextmanager
async def streaming_acmgr_inner(sleep_time):
eof = object()
q = asyncio.Queue(0)

async def push():
await q.put(b"first\n")
await asyncio.sleep(sleep_time)
await q.put(b"last\n")
await q.put(eof)

async with asyncio.TaskGroup() as tg:
tg.create_task(push())
yield QueueIterator(q=q, eof=eof)


async def streaming_acmgr_view(request):
sleep_time = float(request.GET["sleep"])
return StreamingAcmgrHttpResponse(streaming_acmgr_inner(sleep_time))


test_filename = __file__


Expand All @@ -67,4 +109,5 @@ async def streaming_view(request):
path("wait/", sync_waiter),
path("delayed_hello/", hello_with_delay),
path("streaming/", streaming_view),
path("streaming_acmgr/", streaming_acmgr_view),
]