1. Introduction
This section is non-normative.
Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of APIs for creating and interfacing with such streaming data, embodied in readable streams, writable streams, and transform streams.
These APIs have been designed to efficiently map to low-level I/O primitives, including specializations for byte streams where appropriate. They allow easy composition of multiple streams into pipe chains, or can be used directly via readers and writers. Finally, they are designed to automatically provide backpressure and queuing.
This standard provides the base stream primitives which other parts of the web platform can use to
expose their streaming data. For example, [FETCH] exposes Response
bodies as
ReadableStream
instances. More generally, the platform is full of streaming abstractions waiting
to be expressed as streams: multimedia streams, file streams, inter-global communication, and more
benefit from being able to process data incrementally instead of buffering it all into memory and
processing it in one go. By providing the foundation for these streams to be exposed to developers,
the Streams Standard enables use cases like:
-
Video effects: piping a readable video stream through a transform stream that applies effects in real time.
-
Decompression: piping a file stream through a transform stream that selectively decompresses files from a .tgz archive, turning them into
img
elements as the user scrolls through an image gallery. -
Image decoding: piping an HTTP response stream through a transform stream that decodes bytes into bitmap data, and then through another transform that translates bitmaps into PNGs. If installed inside the
fetch
hook of a service worker, this would allow developers to transparently polyfill new image formats. [SERVICE-WORKERS]
Web developers can also use the APIs described here to create their own streams, with the same APIs as those provided by the platform. Other developers can then transparently compose platform-provided streams with those supplied by libraries. In this way, the APIs described here provide unifying abstraction for all streams, encouraging an ecosystem to grow around these shared and composable interfaces.
2. Model
A chunk is a single piece of data that is written to or read from a stream. It can
be of any type; streams can even contain chunks of different types. A chunk will often not be the
most atomic unit of data for a given stream; for example a byte stream might contain chunks
consisting of 16 KiB Uint8Array
s, instead of single bytes.
2.1. Readable streams
A readable stream represents a source of data, from which you can read. In other
words, data comes
out of a readable stream. Concretely, a readable stream is an instance of the
ReadableStream
class.
Although a readable stream can be created with arbitrary behavior, most readable streams wrap a lower-level I/O source, called the underlying source. There are two types of underlying source: push sources and pull sources.
Push sources push data at you, whether or not you are listening for it. They may also provide a mechanism for pausing and resuming the flow of data. An example push source is a TCP socket, where data is constantly being pushed from the OS level, at a rate that can be controlled by changing the TCP window size.
Pull sources require you to request data from them. The data may be available synchronously, e.g. if it is held by the operating system’s in-memory buffers, or asynchronously, e.g. if it has to be read from disk. An example pull source is a file handle, where you seek to specific locations and read specific amounts.
Readable streams are designed to wrap both types of sources behind a single, unified interface. For
web developer–created streams, the implementation details of a source are provided by an object with certain methods and properties that is passed to
the ReadableStream()
constructor.
Chunks are enqueued into the stream by the stream’s underlying source. They can then be read
one at a time via the stream’s public interface, in particular by using a readable stream reader
acquired using the stream’s getReader()
method.
Code that reads from a readable stream using its public interface is known as a consumer.
Consumers also have the ability to cancel a readable
stream, using its cancel()
method. This indicates that the consumer has lost
interest in the stream, and will immediately close the stream, throw away any queued chunks, and
execute any cancellation mechanism of the underlying source.
Consumers can also tee a readable stream using its
tee()
method. This will lock the stream, making it
no longer directly usable; however, it will create two new streams, called branches, which can be consumed independently.
For streams representing bytes, an extended version of the readable stream is provided to handle
bytes efficiently, in particular by minimizing copies. The underlying source for such a readable
stream is called an underlying byte source. A readable stream whose underlying source is
an underlying byte source is sometimes called a readable byte stream. Consumers of
a readable byte stream can acquire a BYOB reader using the stream’s
getReader()
method.
2.2. Writable streams
A writable stream represents a destination for data, into which you can write. In
other words, data goes in to a writable stream. Concretely, a writable stream is an
instance of the WritableStream
class.
Analogously to readable streams, most writable streams wrap a lower-level I/O sink, called the underlying sink. Writable streams work to abstract away some of the complexity of the underlying sink, by queuing subsequent writes and only delivering them to the underlying sink one by one.
Chunks are written to the stream via its public interface, and are passed one at a time to the
stream’s underlying sink. For web developer-created streams, the implementation details of the
sink are provided by an object with certain methods that is
passed to the WritableStream()
constructor.
Code that writes into a writable stream using its public interface is known as a producer.
Producers also have the ability to abort a writable stream,
using its abort()
method. This indicates that the producer believes something has
gone wrong, and that future writes should be discontinued. It puts the stream in an errored state,
even without a signal from the underlying sink, and it discards all writes in the stream’s
internal queue.
2.3. Transform streams
A transform stream consists of a pair of streams: a writable stream, known as its writable side, and a readable stream, known as its readable side. In a manner specific to the transform stream in question, writes to the writable side result in new data being made available for reading from the readable side.
Concretely, any object with a writable
property and a readable
property
can serve as a transform stream. However, the standard TransformStream
class makes it much
easier to create such a pair that is properly entangled. It wraps a transformer, which
defines algorithms for the specific transformation to be performed. For web developer–created
streams, the implementation details of a transformer are provided by an
object with certain methods and properties that is passed to the TransformStream()
constructor. Other specifications might use the GenericTransformStream
mixin to create classes
with the same writable
/readable
property pair but other custom APIs
layered on top.
An identity transform stream is a type of transform stream which forwards all
chunks written to its writable side to its readable side, without any changes. This can
be useful in a variety of scenarios. By default, the
TransformStream
constructor will create an identity transform stream, when no
transform()
method is present on the transformer object.
Some examples of potential transform streams include:
-
A GZIP compressor, to which uncompressed bytes are written and from which compressed bytes are read;
-
A video decoder, to which encoded bytes are written and from which uncompressed video frames are read;
-
A text decoder, to which bytes are written and from which strings are read;
-
A CSV-to-JSON converter, to which strings representing lines of a CSV file are written and from which corresponding JavaScript objects are read.
2.4. Pipe chains and backpressure
Streams are primarily used by piping them to each other. A readable stream can be piped
directly to a writable stream, using its pipeTo()
method, or it can be piped
through one or more transform streams first, using its pipeThrough()
method.
A set of streams piped together in this way is referred to as a pipe chain. In a pipe chain, the original source is the underlying source of the first readable stream in the chain; the ultimate sink is the underlying sink of the final writable stream in the chain.
Once a pipe chain is constructed, it will propagate signals regarding how fast chunks should flow through it. If any step in the chain cannot yet accept chunks, it propagates a signal backwards through the pipe chain, until eventually the original source is told to stop producing chunks so fast. This process of normalizing flow from the original source according to how fast the chain can process chunks is called backpressure.
Concretely, the original source is given the
controller.desiredSize
(or
byteController.desiredSize
) value, and can then adjust
its rate of data flow accordingly. This value is derived from the
writer.desiredSize
corresponding to the ultimate sink, which gets updated as the ultimate sink finishes writing chunks. The
pipeTo()
method used to construct the chain automatically ensures this
information propagates back through the pipe chain.
When teeing a readable stream, the backpressure signals from its two branches will aggregate, such that if neither branch is read from, a backpressure signal will be sent to the underlying source of the original stream.
Piping locks the readable and writable streams, preventing them from being manipulated for the duration of the pipe operation. This allows the implementation to perform important optimizations, such as directly shuttling data from the underlying source to the underlying sink while bypassing many of the intermediate queues.
2.5. Internal queues and queuing strategies
Both readable and writable streams maintain internal queues, which they use for similar purposes. In the case of a readable stream, the internal queue contains chunks that have been enqueued by the underlying source, but not yet read by the consumer. In the case of a writable stream, the internal queue contains chunks which have been written to the stream by the producer, but not yet processed and acknowledged by the underlying sink.
A queuing strategy is an object that determines how a stream should signal backpressure based on the state of its internal queue. The queuing strategy assigns a size to each chunk, and compares the total size of all chunks in the queue to a specified number, known as the high water mark. The resulting difference, high water mark minus total size, is used to determine the desired size to fill the stream’s queue.
For readable streams, an underlying source can use this desired size as a backpressure signal, slowing down chunk generation so as to try to keep the desired size above or at zero. For writable streams, a producer can behave similarly, avoiding writes that would cause the desired size to go negative.
Concretely, a queuing strategy for web developer–created streams is given by
any JavaScript object with a highWaterMark
property. For byte streams the
highWaterMark
always has units of bytes. For other streams the default unit is
chunks, but a size()
function can be included in the strategy object
which returns the size for a given chunk. This permits the highWaterMark
to be
specified in arbitrary floating-point units.
In JavaScript, such a strategy could be written manually as
, or using the built-in CountQueuingStrategy
class, as
.
2.6. Locking
A readable stream reader, or simply reader, is an
object that allows direct reading of chunks from a readable stream. Without a reader, a
consumer can only perform high-level operations on the readable stream: canceling the stream, or piping the readable stream to a writable stream. A reader is
acquired via the stream’s getReader()
method.
A readable byte stream has the ability to vend two types of readers: default readers and BYOB readers. BYOB ("bring your
own buffer") readers allow reading into a developer-supplied buffer, thus minimizing copies. A
non-byte readable stream can only vend default readers. Default readers are instances of the
ReadableStreamDefaultReader
class, while BYOB readers are instances of
ReadableStreamBYOBReader
.
Similarly, a writable stream writer, or simply
writer, is an object that allows direct writing of chunks to a writable stream. Without a
writer, a producer can only perform the high-level operations of aborting the stream or piping a readable stream to the writable stream. Writers are
represented by the WritableStreamDefaultWriter
class.
Under the covers, these high-level operations actually use a reader or writer themselves.
A given readable or writable stream only has at most one reader or writer at a time. We say in this
case the stream is locked, and that the
reader or writer is active. This state can be
determined using the readableStream.locked
or
writableStream.locked
properties.
A reader or writer also has the capability to release its lock, which makes it no longer active, and allows further readers or
writers to be acquired. This is done via the
defaultReader.releaseLock()
,
byobReader.releaseLock()
, or
writer.releaseLock()
method, as appropriate.
3. Conventions
This specification depends on the Infra Standard. [INFRA]
This specification uses the abstract operation concept from the JavaScript specification for its internal algorithms. This includes treating their return values as completion records, and the use of ! and ? prefixes for unwrapping those completion records. [ECMASCRIPT]
This specification also uses the internal slot concept and notation from the JavaScript specification. (Although, the internal slots are on Web IDL platform objects instead of on JavaScript objects.)
The reasons for the usage of these foreign JavaScript specification conventions are largely historical. We urge you to avoid following our example when writing your own web specifications.
In this specification, all numbers are represented as double-precision 64-bit IEEE 754 floating
point values (like the JavaScript Number type or Web IDL unrestricted double
type), and all
arithmetic operations performed on them must be done in the standard way for such values. This is
particularly important for the data structure described in § 8.1 Queue-with-sizes. [IEEE-754]
4. Readable streams
4.1. Using readable streams
readableStream. pipeTo( writableStream) . then(() => console. log( "All data successfully written!" )) . catch ( e=> console. error( "Something went wrong!" , e));
readableStream. pipeTo( new WritableStream({ write( chunk) { console. log( "Chunk received" , chunk); }, close() { console. log( "All data successfully read!" ); }, abort( e) { console. error( "Something went wrong!" , e); } }));
By returning promises from your write()
implementation, you can signal
backpressure to the readable stream.
read()
method to get
successive chunks. For example, this code logs the next chunk in the stream, if available:
const reader= readableStream. getReader(); reader. read(). then( ({ value, done}) => { if ( done) { console. log( "The stream was already closed!" ); } else { console. log( value); } }, e=> console. error( "The stream became errored and cannot be read from!" , e) );
This more manual method of reading a stream is mainly useful for library authors building new high-level operations on streams, beyond the provided ones of piping and teeing.
const reader= readableStream. getReader({ mode: "byob" }); let startingAB= new ArrayBuffer( 1024 ); const buffer= await readInto( startingAB); console. log( "The first 1024 bytes: " , buffer); async function readInto( buffer) { let offset= 0 ; while ( offset< buffer. byteLength) { const { value: view, done} = await reader. read( new Uint8Array( buffer, offset, buffer. byteLength- offset)); buffer= view. buffer; if ( done) { break ; } offset+= view. byteLength; } return buffer; }
An important thing to note here is that the final buffer
value is different from the
startingAB
, but it (and all intermediate buffers) shares the same backing memory
allocation. At each step, the buffer is transferred to a new
ArrayBuffer
object. The view
is destructured from the return value of reading a
new Uint8Array
, with that ArrayBuffer
object as its buffer
property, the
offset that bytes were written to as its byteOffset
property, and the number of
bytes that were written as its byteLength
property.
Note that this example is mostly educational. For practical purposes, the
min
option of read()
provides an easier and more direct way to read an exact number of bytes:
const reader= readableStream. getReader({ mode: "byob" }); const { value: view, done} = await reader. read( new Uint8Array( 1024 ), { min: 1024 }); console. log( "The first 1024 bytes: " , view);
4.2. The ReadableStream
class
The ReadableStream
class is a concrete instance of the general readable stream concept. It
is adaptable to any chunk type, and maintains an internal queue to keep track of data supplied
by the underlying source but not yet read by any consumer.
4.2.1. Interface definition
The Web IDL definition for the ReadableStream
class is given as follows:
[Exposed=*,Transferable ]interface {
ReadableStream constructor (optional object ,
underlyingSource optional QueuingStrategy = {});
strategy static ReadableStream from (any );
asyncIterable readonly attribute boolean locked ;Promise <undefined >cancel (optional any );
reason ReadableStreamReader getReader (optional ReadableStreamGetReaderOptions = {});
options ReadableStream pipeThrough (ReadableWritablePair ,
transform optional StreamPipeOptions = {});
options Promise <undefined >pipeTo (WritableStream ,
destination optional StreamPipeOptions = {});
options sequence <ReadableStream >tee ();async_iterable <any >(optional ReadableStreamIteratorOptions = {}); };
options typedef (ReadableStreamDefaultReader or ReadableStreamBYOBReader );
ReadableStreamReader enum {
ReadableStreamReaderMode };
"byob" dictionary {
ReadableStreamGetReaderOptions ReadableStreamReaderMode ; };
mode dictionary {
ReadableStreamIteratorOptions boolean =
preventCancel false ; };dictionary {
ReadableWritablePair required ReadableStream ;
readable required WritableStream ; };
writable dictionary {
StreamPipeOptions boolean =
preventClose false ;boolean =
preventAbort false ;boolean =
preventCancel false ;AbortSignal ; };
signal
4.2.2. Internal slots
Instances of ReadableStream
are created with the internal slots described in the following
table:
Internal Slot | Description (non-normative) |
---|---|
[[controller]] | A ReadableStreamDefaultController or
ReadableByteStreamController created with the ability to control the state and queue of this
stream
|
[[Detached]] | A boolean flag set to true when the stream is transferred |
[[disturbed]] | A boolean flag set to true when the stream has been read from or canceled |
[[reader]] | A ReadableStreamDefaultReader or ReadableStreamBYOBReader
instance, if the stream is locked to a reader, or undefined if it is not
|
[[state]] | A string containing the stream’s current state, used internally; one
of "readable ", "closed ", or "errored "
|
[[storedError]] | A value indicating how the stream failed, to be given as a failure reason or exception when trying to operate on an errored stream |
4.2.3. The underlying source API
The ReadableStream()
constructor accepts as its first argument a JavaScript object representing
the underlying source. Such objects can contain any of the following properties:
dictionary {
UnderlyingSource UnderlyingSourceStartCallback start ;UnderlyingSourcePullCallback pull ;UnderlyingSourceCancelCallback cancel ;ReadableStreamType type ; [EnforceRange ]unsigned long long autoAllocateChunkSize ; };typedef (ReadableStreamDefaultController or ReadableByteStreamController );
ReadableStreamController callback =
UnderlyingSourceStartCallback any (ReadableStreamController );
controller callback =
UnderlyingSourcePullCallback Promise <undefined > (ReadableStreamController );
controller callback =
UnderlyingSourceCancelCallback Promise <undefined > (optional any );
reason enum {
ReadableStreamType "bytes" };
start(controller)
, of type UnderlyingSourceStartCallback-
A function that is called immediately during creation of the
ReadableStream
.Typically this is used to adapt a push source by setting up relevant event listeners, as in the example of § 10.1 A readable stream with an underlying push source (no backpressure support), or to acquire access to a pull source, as in § 10.4 A readable stream with an underlying pull source.
If this setup process is asynchronous, it can return a promise to signal success or failure; a rejected promise will error the stream. Any thrown exceptions will be re-thrown by the
ReadableStream()
constructor. pull(controller)
, of type UnderlyingSourcePullCallback-
A function that is called whenever the stream’s internal queue of chunks becomes not full, i.e. whenever the queue’s desired size becomes positive. Generally, it will be called repeatedly until the queue reaches its high water mark (i.e. until the desired size becomes non-positive).
For push sources, this can be used to resume a paused flow, as in § 10.2 A readable stream with an underlying push source and backpressure support. For pull sources, it is used to acquire new chunks to enqueue into the stream, as in § 10.4 A readable stream with an underlying pull source.
This function will not be called until
start()
successfully completes. Additionally, it will only be called repeatedly if it enqueues at least one chunk or fulfills a BYOB request; a no-oppull()
implementation will not be continually called.If the function returns a promise, then it will not be called again until that promise fulfills. (If the promise rejects, the stream will become errored.) This is mainly used in the case of pull sources, where the promise returned represents the process of acquiring a new chunk. Throwing an exception is treated the same as returning a rejected promise.
cancel(reason)
, of type UnderlyingSourceCancelCallback-
A function that is called whenever the consumer cancels the stream, via
stream.cancel()
orreader.cancel()
. It takes as its argument the same value as was passed to those methods by the consumer.Readable streams can additionally be canceled under certain conditions during piping; see the definition of the
pipeTo()
method for more details.For all streams, this is generally used to release access to the underlying resource; see for example § 10.1 A readable stream with an underlying push source (no backpressure support).
If the shutdown process is asynchronous, it can return a promise to signal success or failure; the result will be communicated via the return value of the
cancel()
method that was called. Throwing an exception is treated the same as returning a rejected promise.Even if the cancelation process fails, the stream will still close; it will not be put into an errored state. This is because a failure in the cancelation process doesn’t matter to the consumer’s view of the stream, once they’ve expressed disinterest in it by canceling. The failure is only communicated to the immediate caller of the corresponding method.
This is different from the behavior of the
close
andabort
options of aWritableStream
’s underlying sink, which upon failure put the correspondingWritableStream
into an errored state. Those correspond to specific actions the producer is requesting and, if those actions fail, they indicate something more persistently wrong. type
(byte streams only), of type ReadableStreamType-
Can be set to "
bytes
" to signal that the constructedReadableStream
is a readable byte stream. This ensures that the resultingReadableStream
will successfully be able to vend BYOB readers via itsgetReader()
method. It also affects the controller argument passed to thestart()
andpull()
methods; see below.For an example of how to set up a readable byte stream, including using the different controller interface, see § 10.3 A readable byte stream with an underlying push source (no backpressure support).
Setting any value other than "
bytes
" or undefined will cause theReadableStream()
constructor to throw an exception. autoAllocateChunkSize
(byte streams only), of type unsigned long long-
Can be set to a positive integer to cause the implementation to automatically allocate buffers for the underlying source code to write into. In this case, when a consumer is using a default reader, the stream implementation will automatically allocate an
ArrayBuffer
of the given size, so thatcontroller.byobRequest
is always present, as if the consumer was using a BYOB reader.This is generally used to cut down on the amount of code needed to handle consumers that use default readers, as can be seen by comparing § 10.3 A readable byte stream with an underlying push source (no backpressure support) without auto-allocation to § 10.5 A readable byte stream with an underlying pull source with auto-allocation.
The type of the controller argument passed to the start()
and
pull()
methods depends on the value of the type
option. If type
is set to undefined (including via omission), then
controller will be a ReadableStreamDefaultController
. If it’s set to
"bytes
", then controller will be a ReadableByteStreamController
.
4.2.4. Constructor, methods, and properties
stream = new
ReadableStream
(underlyingSource[, strategy])-
Creates a new
ReadableStream
wrapping the provided underlying source. See § 4.2.3 The underlying source API for more details on the underlyingSource argument.The strategy argument represents the stream’s queuing strategy, as described in § 7.1 The queuing strategy API. If it is not provided, the default behavior will be the same as a
CountQueuingStrategy
with a high water mark of 1. stream =
ReadableStream.from
(asyncIterable)-
Creates a new
ReadableStream
wrapping the provided iterable or async iterable.This can be used to adapt various kinds of objects into a readable stream, such as an array, an async generator, or a Node.js readable stream.
isLocked = stream.
locked
-
Returns whether or not the readable stream is locked to a reader.
await stream.
cancel
([ reason ])-
Cancels the stream, signaling a loss of interest in the stream by a consumer. The supplied reason argument will be given to the underlying source’s
cancel()
method, which might or might not use it.The returned promise will fulfill if the stream shuts down successfully, or reject if the underlying source signaled that there was an error doing so. Additionally, it will reject with a
TypeError
(without attempting to cancel the stream) if the stream is currently locked. reader = stream.
getReader
()-
Creates a
ReadableStreamDefaultReader
and locks the stream to the new reader. While the stream is locked, no other reader can be acquired until this one is released.This functionality is especially useful for creating abstractions that desire the ability to consume a stream in its entirety. By getting a reader for the stream, you can ensure nobody else can interleave reads with yours or cancel the stream, which would interfere with your abstraction.
reader = stream.
getReader
({mode
: "byob
" })-
Creates a
ReadableStreamBYOBReader
and locks the stream to the new reader.This call behaves the same way as the no-argument variant, except that it only works on readable byte streams, i.e. streams which were constructed specifically with the ability to handle "bring your own buffer" reading. The returned BYOB reader provides the ability to directly read individual chunks from the stream via its
read()
method, into developer-supplied buffers, allowing more precise control over allocation. readable = stream.
pipeThrough
({writable
,readable
}[, {preventClose
,preventAbort
,preventCancel
,signal
}])-
Provides a convenient, chainable way of piping this readable stream through a transform stream (or any other
{ writable, readable }
pair). It simply pipes the stream into the writable side of the supplied pair, and returns the readable side for further use.Piping a stream will lock it for the duration of the pipe, preventing any other consumer from acquiring a reader.
await stream.
pipeTo
(destination[, {preventClose
,preventAbort
,preventCancel
,signal
}])-
Pipes this readable stream to a given writable stream destination. The way in which the piping process behaves under various error conditions can be customized with a number of passed options. It returns a promise that fulfills when the piping process completes successfully, or rejects if any errors were encountered.
Piping a stream will lock it for the duration of the pipe, preventing any other consumer from acquiring a reader.
Errors and closures of the source and destination streams propagate as follows:
-
An error in this source readable stream will abort destination, unless
preventAbort
is truthy. The returned promise will be rejected with the source’s error, or with any error that occurs during aborting the destination. -
An error in destination will cancel this source readable stream, unless
preventCancel
is truthy. The returned promise will be rejected with the destination’s error, or with any error that occurs during canceling the source. -
When this source readable stream closes, destination will be closed, unless
preventClose
is truthy. The returned promise will be fulfilled once this process completes, unless an error is encountered while closing the destination, in which case it will be rejected with that error. -
If destination starts out closed or closing, this source readable stream will be canceled, unless
preventCancel
is true. The returned promise will be rejected with an error indicating piping to a closed stream failed, or with any error that occurs during canceling the source.
The
signal
option can be set to anAbortSignal
to allow aborting an ongoing pipe operation via the correspondingAbortController
. In this case, this source readable stream will be canceled, and destination aborted, unless the respective optionspreventCancel
orpreventAbort
are set. -
[branch1, branch2] = stream.
tee
()-
Tees this readable stream, returning a two-element array containing the two resulting branches as new
ReadableStream
instances.Teeing a stream will lock it, preventing any other consumer from acquiring a reader. To cancel the stream, cancel both of the resulting branches; a composite cancellation reason will then be propagated to the stream’s underlying source.
If this stream is a readable byte stream, then each branch will receive its own copy of each chunk. If not, then the chunks seen in each branch will be the same object. If the chunks are not immutable, this could allow interference between the two branches.
new ReadableStream(underlyingSource, strategy)
constructor steps are:
-
If underlyingSource is missing, set it to null.
-
Let underlyingSourceDict be underlyingSource, converted to an IDL value of type
UnderlyingSource
.We cannot declare the underlyingSource argument as having the
UnderlyingSource
type directly, because doing so would lose the reference to the original object. We need to retain the object so we can invoke the various methods on it. -
Perform ! InitializeReadableStream(this).
-
If underlyingSourceDict["
type
"] is "bytes
":-
If strategy["
size
"] exists, throw aRangeError
exception. -
Let highWaterMark be ? ExtractHighWaterMark(strategy, 0).
-
Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark).
-
-
Otherwise,
-
Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
-
Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
-
Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark, sizeAlgorithm).
from(asyncIterable)
method steps
are:
-
Return ? ReadableStreamFromIterable(asyncIterable).
locked
getter steps are:
-
Return ! IsReadableStreamLocked(this).
cancel(reason)
method steps are:
-
If ! IsReadableStreamLocked(this) is true, return a promise rejected with a
TypeError
exception. -
Return ! ReadableStreamCancel(this, reason).
getReader(options)
method steps
are:
-
If options["
mode
"] does not exist, return ? AcquireReadableStreamDefaultReader(this). -
Return ? AcquireReadableStreamBYOBReader(this).
function readAllChunks( readableStream) { const reader= readableStream. getReader(); const chunks= []; return pump(); function pump() { return reader. read(). then(({ value, done}) => { if ( done) { return chunks; } chunks. push( value); return pump(); }); } }
Note how the first thing it does is obtain a reader, and from then on it uses the reader exclusively. This ensures that no other consumer can interfere with the stream, either by reading chunks or by canceling the stream.
pipeThrough(transform, options)
method steps are:
-
If ! IsReadableStreamLocked(this) is true, throw a
TypeError
exception. -
If ! IsWritableStreamLocked(transform["
writable
"]) is true, throw aTypeError
exception. -
Let signal be options["
signal
"] if it exists, or undefined otherwise. -
Let promise be ! ReadableStreamPipeTo(this, transform["
writable
"], options["preventClose
"], options["preventAbort
"], options["preventCancel
"], signal). -
Set promise.[[PromiseIsHandled]] to true.
-
Return transform["
readable
"].
pipeThrough(transform, options)
would look like
httpResponseBody. pipeThrough( decompressorTransform) . pipeThrough( ignoreNonImageFilesTransform) . pipeTo( mediaGallery);
pipeTo(destination, options)
method steps are:
-
If ! IsReadableStreamLocked(this) is true, return a promise rejected with a
TypeError
exception. -
If ! IsWritableStreamLocked(destination) is true, return a promise rejected with a
TypeError
exception. -
Let signal be options["
signal
"] if it exists, or undefined otherwise. -
Return ! ReadableStreamPipeTo(this, destination, options["
preventClose
"], options["preventAbort
"], options["preventCancel
"], signal).
AbortSignal
, as follows:
const controller= new AbortController(); readable. pipeTo( writable, { signal: controller. signal}); // ... some time later ... controller. abort();
(The above omits error handling for the promise returned by pipeTo()
.
Additionally, the impact of the preventAbort
and
preventCancel
options what happens when piping is stopped are worth
considering.)
ReadableStream
being piped, while writing into
the same WritableStream
:
const controller= new AbortController(); const pipePromise= readable1. pipeTo( writable, { preventAbort: true , signal: controller. signal}); // ... some time later ... controller. abort(); // Wait for the pipe to complete before starting a new one: try { await pipePromise; } catch ( e) { // Swallow "AbortError" DOMExceptions as expected, but rethrow any unexpected failures. if ( e. name!== "AbortError" ) { throw e; } } // Start the new pipe! readable2. pipeTo( writable);
tee()
method steps are:
-
Return ? ReadableStreamTee(this, false).
cacheEntry
representing an on-disk file, and another writable stream
httpRequestBody
representing an upload to a remote server, you could pipe the same
readable stream to both destinations at once:
const [ forLocal, forRemote] = readableStream. tee(); Promise. all([ forLocal. pipeTo( cacheEntry), forRemote. pipeTo( httpRequestBody) ]) . then(() => console. log( "Saved the stream to the cache and also uploaded it!" )) . catch ( e=> console. error( "Either caching or uploading failed: " , e));
4.2.5. Asynchronous iteration
for await (const chunk of stream) { ... }
for await (const chunk of stream.values({
preventCancel
: true })) { ... }-
Asynchronously iterates over the chunks in the stream’s internal queue.
Asynchronously iterating over the stream will lock it, preventing any other consumer from acquiring a reader. The lock will be released if the async iterator’s
return()
method is called, e.g. bybreak
ing out of the loop.By default, calling the async iterator’s
return()
method will also cancel the stream. To prevent this, use the stream’svalues()
method, passing true for thepreventCancel
option.
ReadableStream
, given stream,
iterator, and args, are:
-
Let reader be ? AcquireReadableStreamDefaultReader(stream).
-
Set iterator’s reader to reader.
-
Let preventCancel be args[0]["
preventCancel
"]. -
Set iterator’s prevent cancel to preventCancel.
ReadableStream
, given stream and iterator, are:
-
Let reader be iterator’s reader.
-
Assert: reader.[[stream]] is not undefined.
-
Let promise be a new promise.
-
Let readRequest be a new read request with the following items:
- chunk steps, given chunk
-
-
Resolve promise with chunk.
-
- close steps
-
-
Perform ! ReadableStreamDefaultReaderRelease(reader).
-
Resolve promise with end of iteration.
-
- error steps, given e
-
-
Perform ! ReadableStreamDefaultReaderRelease(reader).
-
Reject promise with e.
-
-
Perform ! ReadableStreamDefaultReaderRead(this, readRequest).
-
Return promise.
ReadableStream
, given stream, iterator, and arg, are:
-
Let reader be iterator’s reader.
-
Assert: reader.[[stream]] is not undefined.
-
Assert: reader.[[readRequests]] is empty, as the async iterator machinery guarantees that any previous calls to
next()
have settled before this is called. -
If iterator’s prevent cancel is false:
-
Let result be ! ReadableStreamReaderGenericCancel(reader, arg).
-
Perform ! ReadableStreamDefaultReaderRelease(reader).
-
Return result.
-
-
Perform ! ReadableStreamDefaultReaderRelease(reader).
-
Return a promise resolved with undefined.
4.2.6. Transfer via postMessage()
destination.postMessage(rs, { transfer: [rs] });
-
Sends a
ReadableStream
to another frame, window, or worker.The transferred stream can be used exactly like the original. The original will become locked and no longer directly usable.
ReadableStream
objects are transferable objects. Their transfer steps, given value
and dataHolder, are:
-
If ! IsReadableStreamLocked(value) is true, throw a "
DataCloneError
"DOMException
. -
Let port1 be a new
MessagePort
in the current Realm. -
Let port2 be a new
MessagePort
in the current Realm. -
Entangle port1 and port2.
-
Let writable be a new
WritableStream
in the current Realm. -
Perform ! SetUpCrossRealmTransformWritable(writable, port1).
-
Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
-
Set promise.[[PromiseIsHandled]] to true.
-
Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
-
Let deserializedRecord be ! StructuredDeserializeWithTransfer(dataHolder.[[port]], the current Realm).
-
Let port be deserializedRecord.[[Deserialized]].
-
Perform ! SetUpCrossRealmTransformReadable(value, port).
4.3. The ReadableStreamGenericReader
mixin
The ReadableStreamGenericReader
mixin defines common internal slots, getters and methods that
are shared between ReadableStreamDefaultReader
and ReadableStreamBYOBReader
objects.
4.3.1. Mixin definition
The Web IDL definition for the ReadableStreamGenericReader
mixin is given as follows:
interface mixin {
ReadableStreamGenericReader readonly attribute Promise <undefined >closed ;Promise <undefined >cancel (optional any ); };
reason
4.3.2. Internal slots
Instances of classes including the ReadableStreamGenericReader
mixin are created with the
internal slots described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[closedPromise]] | A promise returned by the reader’s
closed getter
|
[[stream]] | A ReadableStream instance that owns this reader
|
4.3.3. Methods and properties
closed
getter steps are:
-
Return this.[[closedPromise]].
cancel(reason)
method steps are:
-
If this.[[stream]] is undefined, return a promise rejected with a
TypeError
exception. -
Return ! ReadableStreamReaderGenericCancel(this, reason).
4.4. The ReadableStreamDefaultReader
class
The ReadableStreamDefaultReader
class represents a default reader designed to be vended by a
ReadableStream
instance.
4.4.1. Interface definition
The Web IDL definition for the ReadableStreamDefaultReader
class is given as follows:
[Exposed=*]interface {
ReadableStreamDefaultReader constructor (ReadableStream );
stream Promise <ReadableStreamReadResult >read ();undefined releaseLock (); };ReadableStreamDefaultReader includes ReadableStreamGenericReader ;dictionary {
ReadableStreamReadResult any ;
value boolean ; };
done
4.4.2. Internal slots
Instances of ReadableStreamDefaultReader
are created with the internal slots defined by
ReadableStreamGenericReader
, and those described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[readRequests]] | A list of read requests, used when a consumer requests chunks sooner than they are available |
A read request is a struct containing three algorithms to perform in reaction to filling the readable stream’s internal queue or changing its state. It has the following items:
- chunk steps
-
An algorithm taking a chunk, called when a chunk is available for reading
- close steps
-
An algorithm taking no arguments, called when no chunks are available because the stream is closed
- error steps
-
An algorithm taking a JavaScript value, called when no chunks are available because the stream is errored
4.4.3. Constructor, methods, and properties
reader = new
ReadableStreamDefaultReader
(stream)-
This is equivalent to calling
stream.
.getReader()
await reader.
closed
-
Returns a promise that will be fulfilled when the stream becomes closed, or rejected if the stream ever errors or the reader’s lock is released before the stream finishes closing.
await reader.
cancel
([ reason ])-
If the reader is active, behaves the same as
stream.
.cancel
(reason) { value, done } = await reader.
read
()-
Returns a promise that allows access to the next chunk from the stream’s internal queue, if available.
- If the chunk does become available, the promise will be fulfilled with an object of the form
.{ value: theChunk, done: false } - If the stream becomes closed, the promise will be fulfilled with an object of the form
.{ value: undefined , done: true } - If the stream becomes errored, the promise will be rejected with the relevant error.
If reading a chunk causes the queue to become empty, more data will be pulled from the underlying source.
- If the chunk does become available, the promise will be fulfilled with an object of the form
reader.
releaseLock
()-
Releases the reader’s lock on the corresponding stream. After the lock is released, the reader is no longer active. If the associated stream is errored when the lock is released, the reader will appear errored in the same way from now on; otherwise, the reader will appear closed.
If the reader’s lock is released while it still has pending read requests, then the promises returned by the reader’s
read()
method are immediately rejected with aTypeError
. Any unread chunks remain in the stream’s internal queue and can be read later by acquiring a new reader.
new ReadableStreamDefaultReader(stream)
constructor steps are:
-
Perform ? SetUpReadableStreamDefaultReader(this, stream).
read()
method steps are:
-
If this.[[stream]] is undefined, return a promise rejected with a
TypeError
exception. -
Let promise be a new promise.
-
Let readRequest be a new read request with the following items:
- chunk steps, given chunk
- close steps
- error steps, given e
-
-
Reject promise with e.
-
-
Perform ! ReadableStreamDefaultReaderRead(this, readRequest).
-
Return promise.
releaseLock()
method steps are:
-
If this.[[stream]] is undefined, return.
-
Perform ! ReadableStreamDefaultReaderRelease(this).
4.5. The ReadableStreamBYOBReader
class
The ReadableStreamBYOBReader
class represents a BYOB reader designed to be vended by a
ReadableStream
instance.
4.5.1. Interface definition
The Web IDL definition for the ReadableStreamBYOBReader
class is given as follows:
[Exposed=*]interface {
ReadableStreamBYOBReader constructor (ReadableStream );
stream Promise <ReadableStreamReadResult >read (ArrayBufferView ,
view optional ReadableStreamBYOBReaderReadOptions = {});
options undefined releaseLock (); };ReadableStreamBYOBReader includes ReadableStreamGenericReader ;dictionary { [
ReadableStreamBYOBReaderReadOptions EnforceRange ]unsigned long long = 1; };
min
4.5.2. Internal slots
Instances of ReadableStreamBYOBReader
are created with the internal slots defined by
ReadableStreamGenericReader
, and those described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[readIntoRequests]] | A list of read-into requests, used when a consumer requests chunks sooner than they are available |
A read-into request is a struct containing three algorithms to perform in reaction to filling the readable byte stream’s internal queue or changing its state. It has the following items:
- chunk steps
-
An algorithm taking a chunk, called when a chunk is available for reading
- close steps
-
An algorithm taking a chunk or undefined, called when no chunks are available because the stream is closed
- error steps
-
An algorithm taking a JavaScript value, called when no chunks are available because the stream is errored
The close steps take a chunk so that it can return the
backing memory to the caller if possible. For example,
byobReader.read(chunk)
will fulfill with
for closed streams. If the stream is
canceled, the backing memory is discarded and
byobReader.read(chunk)
fulfills with the more traditional
instead.
4.5.3. Constructor, methods, and properties
reader = new
ReadableStreamBYOBReader
(stream)-
This is equivalent to calling
stream.
.getReader
({mode
: "byob
" }) await reader.
closed
-
Returns a promise that will be fulfilled when the stream becomes closed, or rejected if the stream ever errors or the reader’s lock is released before the stream finishes closing.
await reader.
cancel
([ reason ])-
If the reader is active, behaves the same
stream.
.cancel
(reason) { value, done } = await reader.
read
(view[, {min
}])-
Attempts to read bytes into view, and returns a promise resolved with the result:
- If the chunk does become available, the promise will be fulfilled with an object of the form
. In this case, view will be detached and no longer usable, but{ value: newView, done: false } newView
will be a new view (of the same type) onto the same backing memory region, with the chunk’s data written into it. - If the stream becomes closed, the promise will be fulfilled with an object of the form
. In this case, view will be detached and no longer usable, but{ value: newView, done: true } newView
will be a new view (of the same type) onto the same backing memory region, with no modifications, to ensure the memory is returned to the caller. - If the reader is canceled, the promise will be fulfilled with
an object of the form
. In this case, the backing memory region of view is discarded and not returned to the caller.{ value: undefined , done: true } - If the stream becomes errored, the promise will be rejected with the relevant error.
If reading a chunk causes the queue to become empty, more data will be pulled from the underlying source.
If
min
is given, then the promise will only be fulfilled as soon as the given minimum number of elements are available. Here, the "number of elements" is given bynewView
’slength
(for typed arrays) ornewView
’sbyteLength
(forDataView
s). If the stream becomes closed, then the promise is fulfilled with the remaining elements in the stream, which might be fewer than the initially requested amount. If not given, then the promise resolves when at least one element is available. - If the chunk does become available, the promise will be fulfilled with an object of the form
reader.
releaseLock
()-
Releases the reader’s lock on the corresponding stream. After the lock is released, the reader is no longer active. If the associated stream is errored when the lock is released, the reader will appear errored in the same way from now on; otherwise, the reader will appear closed.
If the reader’s lock is released while it still has pending read requests, then the promises returned by the reader’s
read()
method are immediately rejected with aTypeError
. Any unread chunks remain in the stream’s internal queue and can be read later by acquiring a new reader.
new ReadableStreamBYOBReader(stream)
constructor
steps are:
-
Perform ? SetUpReadableStreamBYOBReader(this, stream).
read(view, options)
method steps are:
-
If view.[[ByteLength]] is 0, return a promise rejected with a
TypeError
exception. -
If view.[[ViewedArrayBuffer]].[[ByteLength]] is 0, return a promise rejected with a
TypeError
exception. -
If ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true, return a promise rejected with a
TypeError
exception. -
If options["
min
"] is 0, return a promise rejected with aTypeError
exception. -
If view has a [[TypedArrayName]] internal slot,
-
If options["
min
"] > view.[[ArrayLength]], return a promise rejected with aRangeError
exception.
-
-
Otherwise (i.e., it is a
DataView
),-
If options["
min
"] > view.[[ByteLength]], return a promise rejected with aRangeError
exception.
-
-
If this.[[stream]] is undefined, return a promise rejected with a
TypeError
exception. -
Let promise be a new promise.
-
Let readIntoRequest be a new read-into request with the following items:
- chunk steps, given chunk
- close steps, given chunk
- error steps, given e
-
-
Reject promise with e.
-
-
Perform ! ReadableStreamBYOBReaderRead(this, view, options["
min
"], readIntoRequest). -
Return promise.
releaseLock()
method steps are:
-
If this.[[stream]] is undefined, return.
-
Perform ! ReadableStreamBYOBReaderRelease(this).
4.6. The ReadableStreamDefaultController
class
The ReadableStreamDefaultController
class has methods that allow control of a
ReadableStream
’s state and internal queue. When constructing a ReadableStream
that is
not a readable byte stream, the underlying source is given a corresponding
ReadableStreamDefaultController
instance to manipulate.
4.6.1. Interface definition
The Web IDL definition for the ReadableStreamDefaultController
class is given as follows:
[Exposed=*]interface {
ReadableStreamDefaultController readonly attribute unrestricted double ?desiredSize ;undefined close ();undefined enqueue (optional any );
chunk undefined error (optional any ); };
e
4.6.2. Internal slots
Instances of ReadableStreamDefaultController
are created with the internal slots described in
the following table:
Internal Slot | Description (non-normative) |
---|---|
[[cancelAlgorithm]] | A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying source |
[[closeRequested]] | A boolean flag indicating whether the stream has been closed by its underlying source, but still has chunks in its internal queue that have not yet been read |
[[pullAgain]] | A boolean flag set to true if the stream’s mechanisms requested a call to the underlying source’s pull algorithm to pull more data, but the pull could not yet be done since a previous call is still executing |
[[pullAlgorithm]] | A promise-returning algorithm that pulls data from the underlying source |
[[pulling]] | A boolean flag set to true while the underlying source’s pull algorithm is executing and the returned promise has not yet fulfilled, used to prevent reentrant calls |
[[queue]] | A list representing the stream’s internal queue of chunks |
[[queueTotalSize]] | The total size of all the chunks stored in [[queue]] (see § 8.1 Queue-with-sizes) |
[[started]] | A boolean flag indicating whether the underlying source has finished starting |
[[strategyHWM]] | A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying source |
[[strategySizeAlgorithm]] | An algorithm to calculate the size of enqueued chunks, as part of the stream’s queuing strategy |
[[stream]] | The ReadableStream instance controlled
|
4.6.3. Methods and properties
desiredSize = controller.
desiredSize
-
Returns the desired size to fill the controlled stream’s internal queue. It can be negative, if the queue is over-full. An underlying source ought to use this information to determine when and how to apply backpressure.
controller.
close
()-
Closes the controlled readable stream. Consumers will still be able to read any previously-enqueued chunks from the stream, but once those are read, the stream will become closed.
controller.
enqueue
(chunk)-
Enqueues the given chunk chunk in the controlled readable stream.
controller.
error
(e)-
Errors the controlled readable stream, making all future interactions with it fail with the given error e.
desiredSize
getter steps are:
close()
method steps are:
-
If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a
TypeError
exception. -
Perform ! ReadableStreamDefaultControllerClose(this).
enqueue(chunk)
method steps are:
-
If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a
TypeError
exception. -
Perform ? ReadableStreamDefaultControllerEnqueue(this, chunk).
error(e)
method steps are:
-
Perform ! ReadableStreamDefaultControllerError(this, e).
4.6.4. Internal methods
The following are internal methods implemented by each ReadableStreamDefaultController
instance.
The readable stream implementation will polymorphically call to either these, or to their
counterparts for BYOB controllers, as discussed in § 4.9.2 Interfacing with controllers.
-
Perform ! ResetQueue(this).
-
Let result be the result of performing this.[[cancelAlgorithm]], passing reason.
-
Perform ! ReadableStreamDefaultControllerClearAlgorithms(this).
-
Return result.
-
Let stream be this.[[stream]].
-
If this.[[queue]] is not empty,
-
Let chunk be ! DequeueValue(this).
-
If this.[[closeRequested]] is true and this.[[queue]] is empty,
-
Perform ! ReadableStreamDefaultControllerClearAlgorithms(this).
-
Perform ! ReadableStreamClose(stream).
-
-
Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
-
Perform readRequest’s chunk steps, given chunk.
-
-
Otherwise,
-
Perform ! ReadableStreamAddReadRequest(stream, readRequest).
-
Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
-
-
Return.
4.7. The ReadableByteStreamController
class
The ReadableByteStreamController
class has methods that allow control of a ReadableStream
’s
state and internal queue. When constructing a ReadableStream
that is a readable byte stream, the underlying source is given a corresponding ReadableByteStreamController
instance to manipulate.
4.7.1. Interface definition
The Web IDL definition for the ReadableByteStreamController
class is given as follows:
[Exposed=*]interface {
ReadableByteStreamController readonly attribute ReadableStreamBYOBRequest ?byobRequest ;readonly attribute unrestricted double ?desiredSize ;undefined close ();undefined enqueue (ArrayBufferView );
chunk undefined error (optional any ); };
e
4.7.2. Internal slots
Instances of ReadableByteStreamController
are created with the internal slots described in the
following table:
Internal Slot | Description (non-normative) |
---|---|
[[autoAllocateChunkSize]] | A positive integer, when the automatic buffer allocation feature is enabled. In that case, this value specifies the size of buffer to allocate. It is undefined otherwise. |
[[byobRequest]] | A ReadableStreamBYOBRequest instance representing the current BYOB
pull request, or null if there are no pending requests
|
[[cancelAlgorithm]] | A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying byte source |
[[closeRequested]] | A boolean flag indicating whether the stream has been closed by its underlying byte source, but still has chunks in its internal queue that have not yet been read |
[[pullAgain]] | A boolean flag set to true if the stream’s mechanisms requested a call to the underlying byte source’s pull algorithm to pull more data, but the pull could not yet be done since a previous call is still executing |
[[pullAlgorithm]] | A promise-returning algorithm that pulls data from the underlying byte source |
[[pulling]] | A boolean flag set to true while the underlying byte source’s pull algorithm is executing and the returned promise has not yet fulfilled, used to prevent reentrant calls |
[[pendingPullIntos]] | A list of pull-into descriptors |
[[queue]] | A list of readable byte stream queue entries representing the stream’s internal queue of chunks |
[[queueTotalSize]] | The total size, in bytes, of all the chunks stored in [[queue]] (see § 8.1 Queue-with-sizes) |
[[started]] | A boolean flag indicating whether the underlying byte source has finished starting |
[[strategyHWM]] | A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source |
[[stream]] | The ReadableStream instance controlled
|
Although ReadableByteStreamController
instances have
[[queue]] and [[queueTotalSize]]
slots, we do not use most of the abstract operations in § 8.1 Queue-with-sizes on them, as the way
in which we manipulate this queue is rather different than the others in the spec. Instead, we
update the two slots together manually.
This might be cleaned up in a future spec refactoring.
A readable byte stream queue entry is a struct encapsulating the important aspects of a chunk for the specific case of readable byte streams. It has the following items:
- buffer
-
An
ArrayBuffer
, which will be a transferred version of the one originally supplied by the underlying byte source - byte offset
-
A nonnegative integer number giving the byte offset derived from the view originally supplied by the underlying byte source
- byte length
-
A nonnegative integer number giving the byte length derived from the view originally supplied by the underlying byte source
A pull-into descriptor is a struct used to represent pending BYOB pull requests. It has the following items:
- buffer
-
An
ArrayBuffer
- buffer byte length
-
A positive integer representing the initial byte length of buffer
- byte offset
-
A nonnegative integer byte offset into the buffer where the underlying byte source will start writing
- byte length
-
A positive integer number of bytes which can be written into the buffer
- bytes filled
-
A nonnegative integer number of bytes that have been written into the buffer so far
- minimum fill
-
A positive integer representing the minimum number of bytes that must be written into the buffer before the associated
read()
request may be fulfilled. By default, this equals the element size. - element size
-
A positive integer representing the number of bytes that can be written into the buffer at a time, using views of the type described by the view constructor
- view constructor
-
A typed array constructor or
%DataView%
, which will be used for constructing a view with which to write into the buffer - reader type
-
Either "
default
" or "byob
", indicating what type of readable stream reader initiated this request, or "none
" if the initiating reader was released
4.7.3. Methods and properties
byobRequest = controller.
byobRequest
-
Returns the current BYOB pull request, or null if there isn’t one.
desiredSize = controller.
desiredSize
-
Returns the desired size to fill the controlled stream’s internal queue. It can be negative, if the queue is over-full. An underlying byte source ought to use this information to determine when and how to apply backpressure.
controller.
close
()-
Closes the controlled readable stream. Consumers will still be able to read any previously-enqueued chunks from the stream, but once those are read, the stream will become closed.
controller.
enqueue
(chunk)-
Enqueues the given chunk chunk in the controlled readable stream. The chunk has to be an
ArrayBufferView
instance, or else aTypeError
will be thrown. controller.
error
(e)-
Errors the controlled readable stream, making all future interactions with it fail with the given error e.
byobRequest
getter steps are:
-
Return ! ReadableByteStreamControllerGetBYOBRequest(this).
desiredSize
getter steps are:
-
Return ! ReadableByteStreamControllerGetDesiredSize(this).
close()
method
steps are:
-
If this.[[closeRequested]] is true, throw a
TypeError
exception. -
If this.[[stream]].[[state]] is not "
readable
", throw aTypeError
exception. -
Perform ? ReadableByteStreamControllerClose(this).
enqueue(chunk)
method steps are:
-
If chunk.[[ByteLength]] is 0, throw a
TypeError
exception. -
If chunk.[[ViewedArrayBuffer]].[[ByteLength]] is 0, throw a
TypeError
exception. -
If this.[[closeRequested]] is true, throw a
TypeError
exception. -
If this.[[stream]].[[state]] is not "
readable
", throw aTypeError
exception. -
Return ? ReadableByteStreamControllerEnqueue(this, chunk).
error(e)
method steps are:
-
Perform ! ReadableByteStreamControllerError(this, e).
4.7.4. Internal methods
The following are internal methods implemented by each ReadableByteStreamController
instance.
The readable stream implementation will polymorphically call to either these, or to their
counterparts for default controllers, as discussed in § 4.9.2 Interfacing with controllers.
-
Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
-
Perform ! ResetQueue(this).
-
Let result be the result of performing this.[[cancelAlgorithm]], passing in reason.
-
Perform ! ReadableByteStreamControllerClearAlgorithms(this).
-
Return result.
-
Let stream be this.[[stream]].
-
Assert: ! ReadableStreamHasDefaultReader(stream) is true.
-
If this.[[queueTotalSize]] > 0,
-
Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
-
Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest).
-
Return.
-
-
Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
-
If autoAllocateChunkSize is not undefined,
-
Let buffer be Construct(
%ArrayBuffer%
, « autoAllocateChunkSize »). -
If buffer is an abrupt completion,
-
Perform readRequest’s error steps, given buffer.[[Value]].
-
Return.
-
-
Let pullIntoDescriptor be a new pull-into descriptor with
- buffer
- buffer.[[Value]]
- buffer byte length
- autoAllocateChunkSize
- byte offset
- 0
- byte length
- autoAllocateChunkSize
- bytes filled
- 0
- minimum fill
- 1
- element size
- 1
- view constructor
%Uint8Array%
- reader type
- "
default
"
-
Append pullIntoDescriptor to this.[[pendingPullIntos]].
-
-
Perform ! ReadableStreamAddReadRequest(stream, readRequest).
-
Perform ! ReadableByteStreamControllerCallPullIfNeeded(this).
-
If this.[[pendingPullIntos]] is not empty,
-
Let firstPendingPullInto be this.[[pendingPullIntos]][0].
-
Set firstPendingPullInto’s reader type to "
none
". -
Set this.[[pendingPullIntos]] to the list « firstPendingPullInto ».
-
4.8. The ReadableStreamBYOBRequest
class
The ReadableStreamBYOBRequest
class represents a pull-into request in a
ReadableByteStreamController
.
4.8.1. Interface definition
The Web IDL definition for the ReadableStreamBYOBRequest
class is given as follows:
[Exposed=*]interface {
ReadableStreamBYOBRequest readonly attribute ArrayBufferView ?view ;undefined respond ([EnforceRange ]unsigned long long );
bytesWritten undefined respondWithNewView (ArrayBufferView ); };
view
4.8.2. Internal slots
Instances of ReadableStreamBYOBRequest
are created with the internal slots described in the
following table:
Internal Slot | Description (non-normative) |
---|---|
[[controller]] | The parent ReadableByteStreamController instance
|
[[view]] | A typed array representing the destination region to which the controller can write generated data, or null after the BYOB request has been invalidated. |
4.8.3. Methods and properties
view = byobRequest.
view
-
Returns the view for writing in to, or null if the BYOB request has already been responded to.
byobRequest.
respond
(bytesWritten)-
Indicates to the associated readable byte stream that bytesWritten bytes were written into
view
, causing the result be surfaced to the consumer.After this method is called,
view
will be transferred and no longer modifiable. byobRequest.
respondWithNewView
(view)-
Indicates to the associated readable byte stream that instead of writing into
view
, the underlying byte source is providing a newArrayBufferView
, which will be given to the consumer of the readable byte stream.The new view has to be a view onto the same backing memory region as
view
, i.e. its buffer has to equal (or be a transferred version of)view
’s buffer. ItsbyteOffset
has to equalview
’sbyteOffset
, and itsbyteLength
(representing the number of bytes written) has to be less than or equal to that ofview
.After this method is called, view will be transferred and no longer modifiable.
respond(bytesWritten)
method steps are:
-
If this.[[controller]] is undefined, throw a
TypeError
exception. -
If ! IsDetachedBuffer(this.[[view]].[[ArrayBuffer]]) is true, throw a
TypeError
exception. -
Assert: this.[[view]].[[ViewedArrayBuffer]].[[ByteLength]] > 0.
-
Perform ? ReadableByteStreamControllerRespond(this.[[controller]], bytesWritten).
respondWithNewView(view)
method steps are:
-
If this.[[controller]] is undefined, throw a
TypeError
exception. -
If ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true, throw a
TypeError
exception. -
Return ? ReadableByteStreamControllerRespondWithNewView(this.[[controller]], view).
4.9. Abstract operations
4.9.1. Working with readable streams
The following abstract operations operate on ReadableStream
instances at a higher level.
-
Let reader be a new
ReadableStreamBYOBReader
. -
Perform ? SetUpReadableStreamBYOBReader(reader, stream).
-
Return reader.
-
Let reader be a new
ReadableStreamDefaultReader
. -
Perform ? SetUpReadableStreamDefaultReader(reader, stream).
-
Return reader.
-
If highWaterMark was not passed, set it to 1.
-
If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
-
Assert: ! IsNonNegativeNumber(highWaterMark) is true.
-
Let stream be a new
ReadableStream
. -
Perform ! InitializeReadableStream(stream).
-
Let controller be a new
ReadableStreamDefaultController
. -
Perform ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm).
-
Return stream.
This abstract operation will throw an exception if and only if the supplied startAlgorithm throws.
-
Let stream be a new
ReadableStream
. -
Perform ! InitializeReadableStream(stream).
-
Let controller be a new
ReadableByteStreamController
. -
Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined).
-
Return stream.
This abstract operation will throw an exception if and only if the supplied startAlgorithm throws.
-
Set stream.[[state]] to "
readable
". -
Set stream.[[reader]] and stream.[[storedError]] to undefined.
-
Set stream.[[disturbed]] to false.
-
If stream.[[reader]] is undefined, return false.
-
Return true.
-
Let stream be undefined.
-
Let iteratorRecord be ? GetIterator(asyncIterable, async).
-
Let startAlgorithm be an algorithm that returns undefined.
-
Let pullAlgorithm be the following steps:
-
Let nextResult be IteratorNext(iteratorRecord).
-
If nextResult is an abrupt completion, return a promise rejected with nextResult.[[Value]].
-
Let nextPromise be a promise resolved with nextResult.[[Value]].
-
Return the result of reacting to nextPromise with the following fulfillment steps, given iterResult:
-
If iterResult is not an Object, throw a
TypeError
. -
Let done be ? IteratorComplete(iterResult).
-
If done is true:
-
Perform ! ReadableStreamDefaultControllerClose(stream.[[controller]]).
-
-
Otherwise:
-
Let value be ? IteratorValue(iterResult).
-
Perform ! ReadableStreamDefaultControllerEnqueue(stream.[[controller]], value).
-
-
-
-
Let cancelAlgorithm be the following steps, given reason:
-
Let iterator be iteratorRecord.[[Iterator]].
-
Let returnMethod be GetMethod(iterator, "
return
"). -
If returnMethod is an abrupt completion, return a promise rejected with returnMethod.[[Value]].
-
If returnMethod.[[Value]] is undefined, return a promise resolved with undefined.
-
Let returnResult be Call(returnMethod.[[Value]], iterator, « reason »).
-
If returnResult is an abrupt completion, return a promise rejected with returnResult.[[Value]].
-
Let returnPromise be a promise resolved with returnResult.[[Value]].
-
Return the result of reacting to returnPromise with the following fulfillment steps, given iterResult:
-
If iterResult is not an Object, throw a
TypeError
. -
Return undefined.
-
-
-
Set stream to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, 0).
-
Return stream.
-
Assert: source implements
ReadableStream
. -
Assert: dest implements
WritableStream
. -
Assert: preventClose, preventAbort, and preventCancel are all booleans.
-
If signal was not given, let signal be undefined.
-
Assert: either signal is undefined, or signal implements
AbortSignal
. -
Assert: ! IsReadableStreamLocked(source) is false.
-
Assert: ! IsWritableStreamLocked(dest) is false.
-
If source.[[controller]] implements
ReadableByteStreamController
, let reader be either ! AcquireReadableStreamBYOBReader(source) or ! AcquireReadableStreamDefaultReader(source), at the user agent’s discretion. -
Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
-
Let writer be ! AcquireWritableStreamDefaultWriter(dest).
-
Set source.[[disturbed]] to true.
-
Let shuttingDown be false.
-
Let promise be a new promise.
-
If signal is not undefined,
-
Let abortAlgorithm be the following steps:
-
Let error be signal’s abort reason.
-
Let actions be an empty ordered set.
-
If preventAbort is false, append the following action to actions:
-
If dest.[[state]] is "
writable
", return ! WritableStreamAbort(dest, error). -
Otherwise, return a promise resolved with undefined.
-
-
If preventCancel is false, append the following action action to actions:
-
If source.[[state]] is "
readable
", return ! ReadableStreamCancel(source, error). -
Otherwise, return a promise resolved with undefined.
-
-
Shutdown with an action consisting of getting a promise to wait for all of the actions in actions, and with error.
-
-
If signal is aborted, perform abortAlgorithm and return promise.
-
Add abortAlgorithm to signal.
-
-
In parallel but not really; see #905, using reader and writer, read all chunks from source and write them to dest. Due to the locking provided by the reader and writer, the exact manner in which this happens is not observable to author code, and so there is flexibility in how this is done. The following constraints apply regardless of the exact algorithm used:
-
Public API must not be used: while reading or writing, or performing any of the operations below, the JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes) must not be used. Instead, the streams must be manipulated directly.
-
Backpressure must be enforced:
-
While WritableStreamDefaultWriterGetDesiredSize(writer) is ≤ 0 or is null, the user agent must not read from reader.
-
If reader is a BYOB reader, WritableStreamDefaultWriterGetDesiredSize(writer) should be used as a basis to determine the size of the chunks read from reader.
It’s frequently inefficient to read chunks that are too small or too large. Other information might be factored in to determine the optimal chunk size.
-
Reads or writes should not be delayed for reasons other than these backpressure signals.
An implementation that waits for each write to successfully complete before proceeding to the next read/write operation violates this recommendation. In doing so, such an implementation makes the internal queue of dest useless, as it ensures dest always contains at most one queued chunk.
-
-
Shutdown must stop activity: if shuttingDown becomes true, the user agent must not initiate further reads from reader, and must only perform writes of already-read chunks, as described below. In particular, the user agent must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown.
-
Error and close states must be propagated: the following conditions must be applied in order.
-
Errors must be propagated forward: if source.[[state]] is or becomes "
errored
", then-
If preventAbort is false, shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]]) and with source.[[storedError]].
-
Otherwise, shutdown with source.[[storedError]].
-
-
Errors must be propagated backward: if dest.[[state]] is or becomes "
errored
", then-
If preventCancel is false, shutdown with an action of ! ReadableStreamCancel(source, dest.[[storedError]]) and with dest.[[storedError]].
-
Otherwise, shutdown with dest.[[storedError]].
-
-
Closing must be propagated forward: if source.[[state]] is or becomes "
closed
", then-
If preventClose is false, shutdown with an action of ! WritableStreamDefaultWriterCloseWithErrorPropagation(writer).
-
Otherwise, shutdown.
-
-
Closing must be propagated backward: if ! WritableStreamCloseQueuedOrInFlight(dest) is true or dest.[[state]] is "
closed
", then-
Assert: no chunks have been read or written.
-
Let destClosed be a new
TypeError
. -
If preventCancel is false, shutdown with an action of ! ReadableStreamCancel(source, destClosed) and with destClosed.
-
Otherwise, shutdown with destClosed.
-
-
-
Shutdown with an action: if any of the above requirements ask to shutdown with an action action, optionally with an error originalError, then:
-
If shuttingDown is true, abort these substeps.
-
Set shuttingDown to true.
-
If dest.[[state]] is "
writable
" and ! WritableStreamCloseQueuedOrInFlight(dest) is false, -
Let p be the result of performing action.
-
Upon fulfillment of p, finalize, passing along originalError if it was given.
-
Upon rejection of p with reason newError, finalize with newError.
-
-
Shutdown: if any of the above requirements or steps ask to shutdown, optionally with an error error, then:
-
If shuttingDown is true, abort these substeps.
-
Set shuttingDown to true.
-
If dest.[[state]] is "
writable
" and ! WritableStreamCloseQueuedOrInFlight(dest) is false, -
Finalize, passing along error if it was given.
-
-
Finalize: both forms of shutdown will eventually ask to finalize, optionally with an error error, which means to perform the following steps:
-
Perform ! WritableStreamDefaultWriterRelease(writer).
-
If reader implements
ReadableStreamBYOBReader
, perform ! ReadableStreamBYOBReaderRelease(reader). -
Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
-
If signal is not undefined, remove abortAlgorithm from signal.
-
If error was given, reject promise with error.
-
Otherwise, resolve promise with undefined.
-
-
-
Return promise.
Various abstract operations performed here include object creation (often of promises), which usually would require specifying a realm for the created object. However, because of the locking, none of these objects can be observed by author code. As such, the realm used to create them does not matter.
The second argument, cloneForBranch2, governs whether or not the data from the original stream will be cloned (using HTML’s serializable objects framework) before appearing in the second of the returned branches. This is useful for scenarios where both branches are to be consumed in such a way that they might otherwise interfere with each other, such as by transferring their chunks. However, it does introduce a noticeable asymmetry between the two branches, and limits the possible chunks to serializable ones. [HTML]
If stream is a readable byte stream, then cloneForBranch2 is ignored and chunks are cloned unconditionally.
In this standard ReadableStreamTee is always called with cloneForBranch2 set to false; other specifications pass true via the tee wrapper algorithm.
It performs the following steps:
-
Assert: stream implements
ReadableStream
. -
Assert: cloneForBranch2 is a boolean.
-
If stream.[[controller]] implements
ReadableByteStreamController
, return ? ReadableByteStreamTee(stream). -
Return ? ReadableStreamDefaultTee(stream, cloneForBranch2).
-
Assert: stream implements
ReadableStream
. -
Assert: cloneForBranch2 is a boolean.
-
Let reader be ? AcquireReadableStreamDefaultReader(stream).
-
Let reading be false.
-
Let readAgain be false.
-
Let canceled1 be false.
-
Let canceled2 be false.
-
Let reason1 be undefined.
-
Let reason2 be undefined.
-
Let branch1 be undefined.
-
Let branch2 be undefined.
-
Let cancelPromise be a new promise.
-
Let pullAlgorithm be the following steps:
-
If reading is true,
-
Set readAgain to true.
-
Return a promise resolved with undefined.
-
-
Set reading to true.
-
Let readRequest be a read request with the following items:
- chunk steps, given chunk
-
-
Queue a microtask to perform the following steps:
-
Set readAgain to false.
-
Let chunk1 and chunk2 be chunk.
-
If canceled2 is false and cloneForBranch2 is true,
-
Let cloneResult be StructuredClone(chunk2).
-
If cloneResult is an abrupt completion,
-
Perform ! ReadableStreamDefaultControllerError(branch1.[[controller]], cloneResult.[[Value]]).
-
Perform ! ReadableStreamDefaultControllerError(branch2.[[controller]], cloneResult.[[Value]]).
-
Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
-
Return.
-
-
Otherwise, set chunk2 to cloneResult.[[Value]].
-
-
If canceled1 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch1.[[controller]], chunk1).
-
If canceled2 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch2.[[controller]], chunk2).
-
Set reading to false.
-
If readAgain is true, perform pullAlgorithm.
-
The microtask delay here is necessary because it takes at least a microtask to detect errors, when we use reader.[[closedPromise]] below. We want errors in stream to error both branches immediately, so we cannot let successful synchronously-available reads happen ahead of asynchronously-available errors.
-
- close steps
-
-
Set reading to false.
-
If canceled1 is false, perform ! ReadableStreamDefaultControllerClose(branch1.[[controller]]).
-
If canceled2 is false, perform ! ReadableStreamDefaultControllerClose(branch2.[[controller]]).
-
If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
-
- error steps
-
-
Set reading to false.
-
-
Perform ! ReadableStreamDefaultReaderRead(reader, readRequest).
-
Return a promise resolved with undefined.
-
-
Let cancel1Algorithm be the following steps, taking a reason argument:
-
Set canceled1 to true.
-
Set reason1 to reason.
-
If canceled2 is true,
-
Let compositeReason be ! CreateArrayFromList(« reason1, reason2 »).
-
Let cancelResult be ! ReadableStreamCancel(stream, compositeReason).
-
Resolve cancelPromise with cancelResult.
-
-
Return cancelPromise.
-
-
Let cancel2Algorithm be the following steps, taking a reason argument:
-
Set canceled2 to true.
-
Set reason2 to reason.
-
If canceled1 is true,
-
Let compositeReason be ! CreateArrayFromList(« reason1, reason2 »).
-
Let cancelResult be ! ReadableStreamCancel(stream, compositeReason).
-
Resolve cancelPromise with cancelResult.
-
-
Return cancelPromise.
-
-
Let startAlgorithm be an algorithm that returns undefined.
-
Set branch1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm).
-
Set branch2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm).
-
Upon rejection of reader.[[closedPromise]] with reason r,
-
Perform ! ReadableStreamDefaultControllerError(branch1.[[controller]], r).
-
Perform ! ReadableStreamDefaultControllerError(branch2.[[controller]], r).
-
If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
-
-
Return « branch1, branch2 ».
-
Assert: stream implements
ReadableStream
. -
Assert: stream.[[controller]] implements
ReadableByteStreamController
. -
Let reader be ? AcquireReadableStreamDefaultReader(stream).
-
Let reading be false.
-
Let readAgainForBranch1 be false.
-
Let readAgainForBranch2 be false.
-
Let canceled1 be false.
-
Let canceled2 be false.
-
Let reason1 be undefined.
-
Let reason2 be undefined.
-
Let branch1 be undefined.
-
Let branch2 be undefined.
-
Let cancelPromise be a new promise.
-
Let forwardReaderError be the following steps, taking a thisReader argument:
-
Upon rejection of thisReader.[[closedPromise]] with reason r,
-
If thisReader is not reader, return.
-
Perform ! ReadableByteStreamControllerError(branch1.[[controller]], r).
-
Perform ! ReadableByteStreamControllerError(branch2.[[controller]], r).
-
If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
-
-
-
Let pullWithDefaultReader be the following steps:
-
If reader implements
ReadableStreamBYOBReader
,-
Assert: reader.[[readIntoRequests]] is empty.
-
Perform ! ReadableStreamBYOBReaderRelease(reader).
-
Set reader to ! AcquireReadableStreamDefaultReader(stream).
-
Perform forwardReaderError, given reader.
-
-
Let readRequest be a read request with the following items:
- chunk steps, given chunk
-
-
Queue a microtask to perform the following steps:
-
Set readAgainForBranch1 to false.
-
Set readAgainForBranch2 to false.
-
Let chunk1 and chunk2 be chunk.
-
If canceled1 is false and canceled2 is false,
-
Let cloneResult be CloneAsUint8Array(chunk).
-
If cloneResult is an abrupt completion,
-
Perform ! ReadableByteStreamControllerError(branch1.[[controller]], cloneResult.[[Value]]).
-
Perform ! ReadableByteStreamControllerError(branch2.[[controller]], cloneResult.[[Value]]).
-
Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
-
Return.
-
-
Otherwise, set chunk2 to cloneResult.[[Value]].
-
-
If canceled1 is false, perform ! ReadableByteStreamControllerEnqueue(branch1.[[controller]], chunk1).
-
If canceled2 is false, perform ! ReadableByteStreamControllerEnqueue(branch2.[[controller]], chunk2).
-
Set reading to false.
-
If readAgainForBranch1 is true, perform pull1Algorithm.
-
Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
-
The microtask delay here is necessary because it takes at least a microtask to detect errors, when we use reader.[[closedPromise]] below. We want errors in stream to error both branches immediately, so we cannot let successful synchronously-available reads happen ahead of asynchronously-available errors.
-
- close steps
-
-
Set reading to false.
-
If canceled1 is false, perform ! ReadableByteStreamControllerClose(branch1.[[controller]]).
-
If canceled2 is false, perform ! ReadableByteStreamControllerClose(branch2.[[controller]]).
-
If branch1.[[controller]].[[pendingPullIntos]] is not empty, perform ! ReadableByteStreamControllerRespond(branch1.[[controller]], 0).
-
If branch2.[[controller]].[[pendingPullIntos]] is not empty, perform ! ReadableByteStreamControllerRespond(branch2.[[controller]], 0).
-
If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
-
- error steps
-
-
Set reading to false.
-
-
Perform ! ReadableStreamDefaultReaderRead(reader, readRequest).
-
-
Let pullWithBYOBReader be the following steps, given view and forBranch2:
-
If reader implements
ReadableStreamDefaultReader
,-
Assert: reader.[[readRequests]] is empty.
-
Perform ! ReadableStreamDefaultReaderRelease(reader).
-
Set reader to ! AcquireReadableStreamBYOBReader(stream).
-
Perform forwardReaderError, given reader.
-
-
Let byobBranch be branch2 if forBranch2 is true, and branch1 otherwise.
-
Let otherBranch be branch2 if forBranch2 is false, and branch1 otherwise.
-
Let readIntoRequest be a read-into request with the following items:
- chunk steps, given chunk
-
-
Queue a microtask to perform the following steps:
-
Set readAgainForBranch1 to false.
-
Set readAgainForBranch2 to false.
-
Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
-
Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
-
If otherCanceled is false,
-
Let cloneResult be CloneAsUint8Array(chunk).
-
If cloneResult is an abrupt completion,
-
Perform ! ReadableByteStreamControllerError(byobBranch.[[controller]], cloneResult.[[Value]]).
-
Perform ! ReadableByteStreamControllerError(otherBranch.[[controller]], cloneResult.[[Value]]).
-
Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
-
Return.
-
-
Otherwise, let clonedChunk be cloneResult.[[Value]].
-
If byobCanceled is false, perform ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
-
Perform ! ReadableByteStreamControllerEnqueue(otherBranch.[[controller]], clonedChunk).
-
-
Otherwise, if byobCanceled is false, perform ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
-
Set reading to false.
-
If readAgainForBranch1 is true, perform pull1Algorithm.
-
Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
-
The microtask delay here is necessary because it takes at least a microtask to detect errors, when we use reader.[[closedPromise]] below. We want errors in stream to error both branches immediately, so we cannot let successful synchronously-available reads happen ahead of asynchronously-available errors.
-
- close steps, given chunk
-
-
Set reading to false.
-
Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
-
Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
-
If byobCanceled is false, perform ! ReadableByteStreamControllerClose(byobBranch.[[controller]]).
-
If otherCanceled is false, perform ! ReadableByteStreamControllerClose(otherBranch.[[controller]]).
-
If chunk is not undefined,
-
Assert: chunk.[[ByteLength]] is 0.
-
If byobCanceled is false, perform ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
-
If otherCanceled is false and otherBranch.[[controller]].[[pendingPullIntos]] is not empty, perform ! ReadableByteStreamControllerRespond(otherBranch.[[controller]], 0).
-
-
If byobCanceled is false or otherCanceled is false, resolve cancelPromise with undefined.
-
- error steps
-
-
Set reading to false.
-
-
Perform ! ReadableStreamBYOBReaderRead(reader, view, 1, readIntoRequest).
-
-
Let pull1Algorithm be the following steps:
-
If reading is true,
-
Set readAgainForBranch1 to true.
-
Return a promise resolved with undefined.
-
-
Set reading to true.
-
Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest(branch1.[[controller]]).
-
If byobRequest is null, perform pullWithDefaultReader.
-
Otherwise, perform pullWithBYOBReader, given byobRequest.[[view]] and false.
-
Return a promise resolved with undefined.
-
-
Let pull2Algorithm be the following steps:
-
If reading is true,
-
Set readAgainForBranch2 to true.
-
Return a promise resolved with undefined.
-
-
Set reading to true.
-
Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest(branch2.[[controller]]).
-
If byobRequest is null, perform pullWithDefaultReader.
-
Otherwise, perform pullWithBYOBReader, given byobRequest.[[view]] and true.
-
Return a promise resolved with undefined.
-
-
Let cancel1Algorithm be the following steps, taking a reason argument:
-
Set canceled1 to true.
-
Set reason1 to reason.
-
If canceled2 is true,
-
Let compositeReason be ! CreateArrayFromList(« reason1, reason2 »).
-
Let cancelResult be ! ReadableStreamCancel(stream, compositeReason).
-
Resolve cancelPromise with cancelResult.
-
-
Return cancelPromise.
-
-
Let cancel2Algorithm be the following steps, taking a reason argument:
-
Set canceled2 to true.
-
Set reason2 to reason.
-
If canceled1 is true,
-
Let compositeReason be ! CreateArrayFromList(« reason1, reason2 »).
-
Let cancelResult be ! ReadableStreamCancel(stream, compositeReason).
-
Resolve cancelPromise with cancelResult.
-
-
Return cancelPromise.
-
-
Let startAlgorithm be an algorithm that returns undefined.
-
Set branch1 to ! CreateReadableByteStream(startAlgorithm, pull1Algorithm, cancel1Algorithm).
-
Set branch2 to ! CreateReadableByteStream(startAlgorithm, pull2Algorithm, cancel2Algorithm).
-
Perform forwardReaderError, given reader.
-
Return « branch1, branch2 ».
4.9.2. Interfacing with controllers
In terms of specification factoring, the way that the ReadableStream
class encapsulates the
behavior of both simple readable streams and readable byte streams into a single class is by
centralizing most of the potentially-varying logic inside the two controller classes,
ReadableStreamDefaultController
and ReadableByteStreamController
. Those classes define most
of the stateful internal slots and abstract operations for how a stream’s internal queue is
managed and how it interfaces with its underlying source or underlying byte source.
Each controller class defines three internal methods, which are called by the ReadableStream
algorithms:
- [[CancelSteps]](reason)
- The controller’s steps that run in reaction to the stream being canceled, used to clean up the state stored in the controller and inform the underlying source.
- [[PullSteps]](readRequest)
- The controller’s steps that run when a default reader is read from, used to pull from the controller any queued chunks, or pull from the underlying source to get more chunks.
- [[ReleaseSteps]]()
- The controller’s steps that run when a reader is released, used to clean up reader-specific resources stored in the controller.
(These are defined as internal methods, instead of as abstract operations, so that they can be
called polymorphically by the ReadableStream
algorithms, without having to branch on which type
of controller is present.)
The rest of this section concerns abstract operations that go in the other direction: they are
used by the controller implementations to affect their associated ReadableStream
object. This
translates internal state changes of the controller into developer-facing results visible through
the ReadableStream
’s public API.
-
Assert: stream.[[reader]] implements
ReadableStreamBYOBReader
. -
Assert: stream.[[state]] is "
readable
" or "closed
". -
Append readRequest to stream.[[reader]].[[readIntoRequests]].
-
Assert: stream.[[reader]] implements
ReadableStreamDefaultReader
. -
Assert: stream.[[state]] is "
readable
". -
Append readRequest to stream.[[reader]].[[readRequests]].
-
Set stream.[[disturbed]] to true.
-
If stream.[[state]] is "
closed
", return a promise resolved with undefined. -
If stream.[[state]] is "
errored
", return a promise rejected with stream.[[storedError]]. -
Perform ! ReadableStreamClose(stream).
-
Let reader be stream.[[reader]].
-
If reader is not undefined and reader implements
ReadableStreamBYOBReader
,-
Let readIntoRequests be reader.[[readIntoRequests]].
-
Set reader.[[readIntoRequests]] to an empty list.
-
For each readIntoRequest of readIntoRequests,
-
Perform readIntoRequest’s close steps, given undefined.
-
-
-
Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason).
-
Return the result of reacting to sourceCancelPromise with a fulfillment step that returns undefined.
-
Assert: stream.[[state]] is "
readable
". -
Set stream.[[state]] to "
closed
". -
Let reader be stream.[[reader]].
-
If reader is undefined, return.
-
Resolve reader.[[closedPromise]] with undefined.
-
If reader implements
ReadableStreamDefaultReader
,-
Let readRequests be reader.[[readRequests]].
-
Set reader.[[readRequests]] to an empty list.
-
For each readRequest of readRequests,
-
Perform readRequest’s close steps.
-
-
-
Assert: stream.[[state]] is "
readable
". -
Set stream.[[state]] to "
errored
". -
Set stream.[[storedError]] to e.
-
Let reader be stream.[[reader]].
-
If reader is undefined, return.
-
Reject reader.[[closedPromise]] with e.
-
Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
-
If reader implements
ReadableStreamDefaultReader
,-
Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
-
-
Otherwise,
-
Assert: reader implements
ReadableStreamBYOBReader
. -
Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
-
-
Assert: ! ReadableStreamHasBYOBReader(stream) is true.
-
Let reader be stream.[[reader]].
-
Assert: reader.[[readIntoRequests]] is not empty.
-
Let readIntoRequest be reader.[[readIntoRequests]][0].
-
Remove readIntoRequest from reader.[[readIntoRequests]].
-
If done is true, perform readIntoRequest’s close steps, given chunk.
-
Otherwise, perform readIntoRequest’s chunk steps, given chunk.
-
Assert: ! ReadableStreamHasDefaultReader(stream) is true.
-
Let reader be stream.[[reader]].
-
Assert: reader.[[readRequests]] is not empty.
-
Let readRequest be reader.[[readRequests]][0].
-
Remove readRequest from reader.[[readRequests]].
-
If done is true, perform readRequest’s close steps.
-
Otherwise, perform readRequest’s chunk steps, given chunk.
-
Assert: ! ReadableStreamHasBYOBReader(stream) is true.
-
Return stream.[[reader]].[[readIntoRequests]]’s size.
-
Assert: ! ReadableStreamHasDefaultReader(stream) is true.
-
Return stream.[[reader]].[[readRequests]]’s size.
-
Let reader be stream.[[reader]].
-
If reader is undefined, return false.
-
If reader implements
ReadableStreamBYOBReader
, return true. -
Return false.
-
Let reader be stream.[[reader]].
-
If reader is undefined, return false.
-
If reader implements
ReadableStreamDefaultReader
, return true. -
Return false.
4.9.3. Readers
The following abstract operations support the implementation and manipulation of
ReadableStreamDefaultReader
and ReadableStreamBYOBReader
instances.
-
Let stream be reader.[[stream]].
-
Assert: stream is not undefined.
-
Return ! ReadableStreamCancel(stream, reason).
-
Set reader.[[stream]] to stream.
-
Set stream.[[reader]] to reader.
-
If stream.[[state]] is "
readable
",-
Set reader.[[closedPromise]] to a new promise.
-
-
Otherwise, if stream.[[state]] is "
closed
",-
Set reader.[[closedPromise]] to a promise resolved with undefined.
-
-
Otherwise,
-
Assert: stream.[[state]] is "
errored
". -
Set reader.[[closedPromise]] to a promise rejected with stream.[[storedError]].
-
Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
-
-
Let stream be reader.[[stream]].
-
Assert: stream is not undefined.
-
Assert: stream.[[reader]] is reader.
-
If stream.[[state]] is "
readable
", reject reader.[[closedPromise]] with aTypeError
exception. -
Otherwise, set reader.[[closedPromise]] to a promise rejected with a
TypeError
exception. -
Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
-
Perform ! stream.[[controller]].[[ReleaseSteps]]().
-
Set stream.[[reader]] to undefined.
-
Set reader.[[stream]] to undefined.
-
Let readIntoRequests be reader.[[readIntoRequests]].
-
Set reader.[[readIntoRequests]] to a new empty list.
-
For each readIntoRequest of readIntoRequests,
-
Perform readIntoRequest’s error steps, given e.
-
-
Let stream be reader.[[stream]].
-
Assert: stream is not undefined.
-
Set stream.[[disturbed]] to true.
-
If stream.[[state]] is "
errored
", perform readIntoRequest’s error steps given stream.[[storedError]]. -
Otherwise, perform ! ReadableByteStreamControllerPullInto(stream.[[controller]], view, min, readIntoRequest).
-
Perform ! ReadableStreamReaderGenericRelease(reader).
-
Let e be a new
TypeError
exception. -
Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
-
Let readRequests be reader.[[readRequests]].
-
Set reader.[[readRequests]] to a new empty list.
-
For each readRequest of readRequests,
-
Perform readRequest’s error steps, given e.
-
-
Let stream be reader.[[stream]].
-
Assert: stream is not undefined.
-
Set stream.[[disturbed]] to true.
-
If stream.[[state]] is "
closed
", perform readRequest’s close steps. -
Otherwise, if stream.[[state]] is "
errored
", perform readRequest’s error steps given stream.[[storedError]]. -
Otherwise,
-
Assert: stream.[[state]] is "
readable
". -
Perform ! stream.[[controller]].[[PullSteps]](readRequest).
-
-
Perform ! ReadableStreamReaderGenericRelease(reader).
-
Let e be a new
TypeError
exception. -
Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
-
If ! IsReadableStreamLocked(stream) is true, throw a
TypeError
exception. -
If stream.[[controller]] does not implement
ReadableByteStreamController
, throw aTypeError
exception. -
Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
-
Set reader.[[readIntoRequests]] to a new empty list.
-
If ! IsReadableStreamLocked(stream) is true, throw a
TypeError
exception. -
Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
-
Set reader.[[readRequests]] to a new empty list.
4.9.4. Default controllers
The following abstract operations support the implementation of the
ReadableStreamDefaultController
class.
-
Let shouldPull be ! ReadableStreamDefaultControllerShouldCallPull(controller).
-
If shouldPull is false, return.
-
If controller.[[pulling]] is true,
-
Set controller.[[pullAgain]] to true.
-
Return.
-
-
Assert: controller.[[pullAgain]] is false.
-
Set controller.[[pulling]] to true.
-
Let pullPromise be the result of performing controller.[[pullAlgorithm]].
-
Upon fulfillment of pullPromise,
-
Set controller.[[pulling]] to false.
-
If controller.[[pullAgain]] is true,
-
Set controller.[[pullAgain]] to false.
-
Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
-
-
-
Upon rejection of pullPromise with reason e,
-
Perform ! ReadableStreamDefaultControllerError(controller, e).
-
-
Let stream be controller.[[stream]].
-
If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return false.
-
If controller.[[started]] is false, return false.
-
If ! IsReadableStreamLocked(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0, return true.
-
Let desiredSize be ! ReadableStreamDefaultControllerGetDesiredSize(controller).
-
Assert: desiredSize is not null.
-
If desiredSize > 0, return true.
-
Return false.
ReadableStream
itself is still referenced.
This is observable using weak references. See tc39/proposal-weakrefs#31 for more detail.
It performs the following steps:
-
Set controller.[[pullAlgorithm]] to undefined.
-
Set controller.[[cancelAlgorithm]] to undefined.
-
Set controller.[[strategySizeAlgorithm]] to undefined.
-
If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
-
Let stream be controller.[[stream]].
-
Set controller.[[closeRequested]] to true.
-
If controller.[[queue]] is empty,
-
Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
-
Perform ! ReadableStreamClose(stream).
-
-
If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
-
Let stream be controller.[[stream]].
-
If ! IsReadableStreamLocked(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0, perform ! ReadableStreamFulfillReadRequest(stream, chunk, false).
-
Otherwise,
-
Let result be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk, and interpreting the result as a completion record.
-
If result is an abrupt completion,
-
Perform ! ReadableStreamDefaultControllerError(controller, result.[[Value]]).
-
Return result.
-
-
Let chunkSize be result.[[Value]].
-
Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
-
If enqueueResult is an abrupt completion,
-
Perform ! ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]]).
-
Return enqueueResult.
-
-
-
Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
-
Let stream be controller.[[stream]].
-
If stream.[[state]] is not "
readable
", return. -
Perform ! ResetQueue(controller).
-
Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
-
Perform ! ReadableStreamError(stream, e).
-
Let state be controller.[[stream]].[[state]].
-
If state is "
errored
", return null. -
If state is "
closed
", return 0. -
Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
TransformStream
. It performs the following steps:
-
If ! ReadableStreamDefaultControllerShouldCallPull(controller) is true, return false.
-
Otherwise, return true.
-
Let state be controller.[[stream]].[[state]].
-
If controller.[[closeRequested]] is false and state is "
readable
", return true. -
Otherwise, return false.
The case where controller.[[closeRequested]]
is false, but state is not "readable
", happens when the stream is errored via
controller.error()
, or when it is closed without its
controller’s controller.close()
method ever being
called: e.g., if the stream was closed by a call to
stream.cancel()
.
-
Assert: stream.[[controller]] is undefined.
-
Set controller.[[stream]] to stream.
-
Perform ! ResetQueue(controller).
-
Set controller.[[started]], controller.[[closeRequested]], controller.[[pullAgain]], and controller.[[pulling]] to false.
-
Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm and controller.[[strategyHWM]] to highWaterMark.
-
Set controller.[[pullAlgorithm]] to pullAlgorithm.
-
Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
-
Set stream.[[controller]] to controller.
-
Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
-
Let startPromise be a promise resolved with startResult.
-
Upon fulfillment of startPromise,
-
Set controller.[[started]] to true.
-
Assert: controller.[[pulling]] is false.
-
Assert: controller.[[pullAgain]] is false.
-
Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
-
-
Upon rejection of startPromise with reason r,
-
Perform ! ReadableStreamDefaultControllerError(controller, r).
-
-
Let controller be a new
ReadableStreamDefaultController
. -
Let startAlgorithm be an algorithm that returns undefined.
-
Let pullAlgorithm be an algorithm that returns a promise resolved with undefined.
-
Let cancelAlgorithm be an algorithm that returns a promise resolved with undefined.
-
If underlyingSourceDict["
start
"] exists, then set startAlgorithm to an algorithm which returns the result of invoking underlyingSourceDict["start
"] with argument list « controller » and callback this value underlyingSource. -
If underlyingSourceDict["
pull
"] exists, then set pullAlgorithm to an algorithm which returns the result of invoking underlyingSourceDict["pull
"] with argument list « controller » and callback this value underlyingSource. -
If underlyingSourceDict["
cancel
"] exists, then set cancelAlgorithm to an algorithm which takes an argument reason and returns the result of invoking underlyingSourceDict["cancel
"] with argument list « reason » and callback this value underlyingSource. -
Perform ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm).
4.9.5. Byte stream controllers
-
Let shouldPull be ! ReadableByteStreamControllerShouldCallPull(controller).
-
If shouldPull is false, return.
-
If controller.[[pulling]] is true,
-
Set controller.[[pullAgain]] to true.
-
Return.
-
-
Assert: controller.[[pullAgain]] is false.
-
Set controller.[[pulling]] to true.
-
Let pullPromise be the result of performing controller.[[pullAlgorithm]].
-
Upon fulfillment of pullPromise,
-
Set controller.[[pulling]] to false.
-
If controller.[[pullAgain]] is true,
-
Set controller.[[pullAgain]] to false.
-
Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
-
-
-
Upon rejection of pullPromise with reason e,
-
Perform ! ReadableByteStreamControllerError(controller, e).
-
ReadableStream
itself is still referenced.
This is observable using weak references. See tc39/proposal-weakrefs#31 for more detail.
It performs the following steps:
-
Set controller.[[pullAlgorithm]] to undefined.
-
Set controller.[[cancelAlgorithm]] to undefined.
-
Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
-
Set controller.[[pendingPullIntos]] to a new empty list.
-
Let stream be controller.[[stream]].
-
If controller.[[closeRequested]] is true or stream.[[state]] is not "
readable
", return. -
If controller.[[queueTotalSize]] > 0,
-
Set controller.[[closeRequested]] to true.
-
Return.
-
-
If controller.[[pendingPullIntos]] is not empty,
-
Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
-
If the remainder after dividing firstPendingPullInto’s bytes filled by firstPendingPullInto’s element size is not 0,
-
Let e be a new
TypeError
exception. -
Perform ! ReadableByteStreamControllerError(controller, e).
-
Throw e.
-
-
-
Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
-
Perform ! ReadableStreamClose(stream).
-
Assert: stream.[[state]] is not "
errored
". -
Assert: pullIntoDescriptor.reader type is not "
none
". -
Let done be false.
-
If stream.[[state]] is "
closed
",-
Assert: the remainder after dividing pullIntoDescriptor’s bytes filled by pullIntoDescriptor’s element size is 0.
-
Set done to true.
-
-
Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
-
If pullIntoDescriptor’s reader type is "
default
",-
Perform ! ReadableStreamFulfillReadRequest(stream, filledView, done).
-
-
Otherwise,
-
Assert: pullIntoDescriptor’s reader type is "
byob
". -
Perform ! ReadableStreamFulfillReadIntoRequest(stream, filledView, done).
-
-
Let bytesFilled be pullIntoDescriptor’s bytes filled.
-
Let elementSize be pullIntoDescriptor’s element size.
-
Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
-
Assert: the remainder after dividing bytesFilled by elementSize is 0.
-
Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
-
Return ! Construct(pullIntoDescriptor’s view constructor, « buffer, pullIntoDescriptor’s byte offset, bytesFilled ÷ elementSize »).
-
Let stream be controller.[[stream]].
-
If controller.[[closeRequested]] is true or stream.[[state]] is not "
readable
", return. -
Let buffer be chunk.[[ViewedArrayBuffer]].
-
Let byteOffset be chunk.[[ByteOffset]].
-
Let byteLength be chunk.[[ByteLength]].
-
If ! IsDetachedBuffer(buffer) is true, throw a
TypeError
exception. -
Let transferredBuffer be ? TransferArrayBuffer(buffer).
-
If controller.[[pendingPullIntos]] is not empty,
-
Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
-
If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a
TypeError
exception. -
Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
-
Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
-
If firstPendingPullInto’s reader type is "
none
", perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, firstPendingPullInto).
-
-
If ! ReadableStreamHasDefaultReader(stream) is true,
-
Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
-
If ! ReadableStreamGetNumReadRequests(stream) is 0,
-
Assert: controller.[[pendingPullIntos]] is empty.
-
Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength).
-
-
Otherwise,
-
If controller.[[pendingPullIntos]] is not empty,
-
Assert: controller.[[pendingPullIntos]][0]'s reader type is "
default
". -
Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
-
-
Let transferredView be ! Construct(
%Uint8Array%
, « transferredBuffer, byteOffset, byteLength »). -
Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
-
-
Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
-
Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength).
-
Let filledPullIntos be the result of performing ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
-
For each filledPullInto of filledPullIntos,
-
Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
-
-
-
Otherwise,
-
Assert: ! IsReadableStreamLocked(stream) is false.
-
Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength).
-
-
Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
-
Append a new readable byte stream queue entry with buffer buffer, byte offset byteOffset, and byte length byteLength to controller.[[queue]].
-
Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
-
Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength,
%ArrayBuffer%
). -
If cloneResult is an abrupt completion,
-
Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
-
Return cloneResult.
-
-
Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(controller, cloneResult.[[Value]], 0, byteLength).
-
Assert: pullIntoDescriptor’s reader type is "
none
". -
If pullIntoDescriptor’s bytes filled > 0, perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller, pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
-
Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
-
Let stream be controller.[[stream]].
-
If stream.[[state]] is not "
readable
", return. -
Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
-
Perform ! ResetQueue(controller).
-
Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
-
Perform ! ReadableStreamError(stream, e).
-
Assert: either controller.[[pendingPullIntos]] is empty, or controller.[[pendingPullIntos]][0] is pullIntoDescriptor.
-
Assert: controller.[[byobRequest]] is null.
-
Set pullIntoDescriptor’s bytes filled to bytes filled + size.
-
Let maxBytesToCopy be min(controller.[[queueTotalSize]], pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
-
Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
-
Let totalBytesToCopyRemaining be maxBytesToCopy.
-
Let ready be false.
-
Assert: ! IsDetachedBuffer(pullIntoDescriptor’s buffer) is false.
-
Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
-
Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
-
Let maxAlignedBytes be maxBytesFilled − remainderBytes.
-
If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
-
Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
-
Set ready to true.
A descriptor for a
read()
request that is not yet filled up to its minimum length will stay at the head of the queue, so the underlying source can keep filling it.
-
-
Let queue be controller.[[queue]].
-
While totalBytesToCopyRemaining > 0,
-
Let headOfQueue be queue[0].
-
Let bytesToCopy be min(totalBytesToCopyRemaining, headOfQueue’s byte length).
-
Let destStart be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
-
Let descriptorBuffer be pullIntoDescriptor’s buffer.
-
Let queueBuffer be headOfQueue’s buffer.
-
Let queueByteOffset be headOfQueue’s byte offset.
-
Assert: ! CanCopyDataBlockBytes(descriptorBuffer, destStart, queueBuffer, queueByteOffset, bytesToCopy) is true.
If this assertion were to fail (due to a bug in this specification or its implementation), then the next step may read from or write to potentially invalid memory. The user agent should always check this assertion, and stop in an implementation-defined manner if it fails (e.g. by crashing the process, or by erroring the stream).
-
Perform ! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart, queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy).
-
If headOfQueue’s byte length is bytesToCopy,
-
Remove queue[0].
-
-
Otherwise,
-
Set headOfQueue’s byte offset to headOfQueue’s byte offset + bytesToCopy.
-
Set headOfQueue’s byte length to headOfQueue’s byte length − bytesToCopy.
-
-
Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
-
Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesToCopy, pullIntoDescriptor).
-
Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
-
-
If ready is false,
-
Assert: controller.[[queueTotalSize]] is 0.
-
Assert: pullIntoDescriptor’s bytes filled > 0.
-
Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
-
-
Return ready.
-
Assert: controller.[[queueTotalSize]] > 0.
-
Let entry be controller.[[queue]][0].
-
Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − entry’s byte length.
-
Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
-
Let view be ! Construct(
%Uint8Array%
, « entry’s buffer, entry’s byte offset, entry’s byte length »). -
Perform readRequest’s chunk steps, given view.
-
If controller.[[byobRequest]] is null and controller.[[pendingPullIntos]] is not empty,
-
Let firstDescriptor be controller.[[pendingPullIntos]][0].
-
Let view be ! Construct(
%Uint8Array%
, « firstDescriptor’s buffer, firstDescriptor’s byte offset + firstDescriptor’s bytes filled, firstDescriptor’s byte length − firstDescriptor’s bytes filled »). -
Let byobRequest be a new
ReadableStreamBYOBRequest
. -
Set byobRequest.[[controller]] to controller.
-
Set byobRequest.[[view]] to view.
-
Set controller.[[byobRequest]] to byobRequest.
-
-
Return controller.[[byobRequest]].
-
Let state be controller.[[stream]].[[state]].
-
If state is "
errored
", return null. -
If state is "
closed
", return 0. -
Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
-
Assert: controller.[[stream]].[[state]] is "
readable
". -
If controller.[[queueTotalSize]] is 0 and controller.[[closeRequested]] is true,
-
Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
-
Perform ! ReadableStreamClose(controller.[[stream]]).
-
-
Otherwise,
-
Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
-
-
If controller.[[byobRequest]] is null, return.
-
Set controller.[[byobRequest]].[[controller]] to undefined.
-
Set controller.[[byobRequest]].[[view]] to null.
-
Set controller.[[byobRequest]] to null.
-
Assert: controller.[[closeRequested]] is false.
-
Let filledPullIntos be a new empty list.
-
While controller.[[pendingPullIntos]] is not empty,
-
If controller.[[queueTotalSize]] is 0, then break.
-
Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
-
If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
-
Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
-
Append pullIntoDescriptor to filledPullIntos.
-
-
-
Return filledPullIntos.
-
Let reader be controller.[[stream]].[[reader]].
-
Assert: reader implements
ReadableStreamDefaultReader
. -
While reader.[[readRequests]] is not empty,
-
If controller.[[queueTotalSize]] is 0, return.
-
Let readRequest be reader.[[readRequests]][0].
-
Remove readRequest from reader.[[readRequests]].
-
Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest).
-
-
Let stream be controller.[[stream]].
-
Let elementSize be 1.
-
Let ctor be
%DataView%
. -
If view has a [[TypedArrayName]] internal slot (i.e., it is not a
DataView
),-
Set elementSize to the element size specified in the typed array constructors table for view.[[TypedArrayName]].
-
Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
-
-
Let minimumFill be min × elementSize.
-
Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
-
Assert: the remainder after dividing minimumFill by elementSize is 0.
-
Let byteOffset be view.[[ByteOffset]].
-
Let byteLength be view.[[ByteLength]].
-
Let bufferResult be TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
-
If bufferResult is an abrupt completion,
-
Perform readIntoRequest’s error steps, given bufferResult.[[Value]].
-
Return.
-
-
Let buffer be bufferResult.[[Value]].
-
Let pullIntoDescriptor be a new pull-into descriptor with
- buffer
- buffer
- buffer byte length
- buffer.[[ArrayBufferByteLength]]
- byte offset
- byteOffset
- byte length
- byteLength
- bytes filled
- 0
- minimum fill
- minimumFill
- element size
- elementSize
- view constructor
- ctor
- reader type
- "
byob
"
-
If controller.[[pendingPullIntos]] is not empty,
-
Append pullIntoDescriptor to controller.[[pendingPullIntos]].
-
Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
-
Return.
-
-
If stream.[[state]] is "
closed
",-
Let emptyView be ! Construct(ctor, « pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, 0 »).
-
Perform readIntoRequest’s close steps, given emptyView.
-
Return.
-
-
If controller.[[queueTotalSize]] > 0,
-
If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
-
Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
-
Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
-
Perform readIntoRequest’s chunk steps, given filledView.
-
Return.
-
-
If controller.[[closeRequested]] is true,
-
Let e be a
TypeError
exception. -
Perform ! ReadableByteStreamControllerError(controller, e).
-
Perform readIntoRequest’s error steps, given e.
-
Return.
-
-
-
Append pullIntoDescriptor to controller.[[pendingPullIntos]].
-
Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
-
Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
-
Assert: controller.[[pendingPullIntos]] is not empty.
-
Let firstDescriptor be controller.[[pendingPullIntos]][0].
-
Let state be controller.[[stream]].[[state]].
-
If state is "
closed
",-
If bytesWritten is not 0, throw a
TypeError
exception.
-
-
Otherwise,
-
Assert: state is "
readable
". -
If bytesWritten is 0, throw a
TypeError
exception. -
If firstDescriptor’s bytes filled + bytesWritten > firstDescriptor’s byte length, throw a
RangeError
exception.
-
-
Set firstDescriptor’s buffer to ! TransferArrayBuffer(firstDescriptor’s buffer).
-
Perform ? ReadableByteStreamControllerRespondInternal(controller, bytesWritten).
-
Assert: the remainder after dividing firstDescriptor’s bytes filled by firstDescriptor’s element size is 0.
-
If firstDescriptor’s reader type is "
none
", perform ! ReadableByteStreamControllerShiftPendingPullInto(controller). -
Let stream be controller.[[stream]].
-
If ! ReadableStreamHasBYOBReader(stream) is true,
-
Let filledPullIntos be a new empty list.
-
While filledPullIntos’s size < ! ReadableStreamGetNumReadIntoRequests(stream),
-
Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto(controller).
-
Append pullIntoDescriptor to filledPullIntos.
-
-
For each filledPullInto of filledPullIntos,
-
Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
-
-
-
Assert: pullIntoDescriptor’s bytes filled + bytesWritten ≤ pullIntoDescriptor’s byte length.
-
Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesWritten, pullIntoDescriptor).
-
If pullIntoDescriptor’s reader type is "
none
",-
Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor).
-
Let filledPullIntos be the result of performing ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
-
For each filledPullInto of filledPullIntos,
-
Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
-
-
Return.
-
-
If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
A descriptor for a
read()
request that is not yet filled up to its minimum length will stay at the head of the queue, so the underlying source can keep filling it. -
Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
-
Let remainderSize be the remainder after dividing pullIntoDescriptor’s bytes filled by pullIntoDescriptor’s element size.
-
If remainderSize > 0,
-
Let end be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
-
Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller, pullIntoDescriptor’s buffer, end − remainderSize, remainderSize).
-
-
Set pullIntoDescriptor’s bytes filled to pullIntoDescriptor’s bytes filled − remainderSize.
-
Let filledPullIntos be the result of performing ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
-
Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor).
-
For each filledPullInto of filledPullIntos,
-
Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
-
-
Let firstDescriptor be controller.[[pendingPullIntos]][0].
-
Assert: ! CanTransferArrayBuffer(firstDescriptor’s buffer) is true.
-
Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
-
Let state be controller.[[stream]].[[state]].
-
If state is "
closed
",-
Assert: bytesWritten is 0.
-
Perform ! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
-
-
Otherwise,
-
Assert: state is "
readable
". -
Assert: bytesWritten > 0.
-
Perform ? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).
-
-
Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
-
Assert: controller.[[pendingPullIntos]] is not empty.
-
Assert: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
-
Let firstDescriptor be controller.[[pendingPullIntos]][0].
-
Let state be controller.[[stream]].[[state]].
-
If state is "
closed
",-
If view.[[ByteLength]] is not 0, throw a
TypeError
exception.
-
-
Otherwise,
-
Assert: state is "
readable
". -
If view.[[ByteLength]] is 0, throw a
TypeError
exception.
-
-
If firstDescriptor’s byte offset + firstDescriptor’ bytes filled is not view.[[ByteOffset]], throw a
RangeError
exception. -
If firstDescriptor’s buffer byte length is not view.[[ViewedArrayBuffer]].[[ByteLength]], throw a
RangeError
exception. -
If firstDescriptor’s bytes filled + view.[[ByteLength]] > firstDescriptor’s byte length, throw a
RangeError
exception. -
Let viewByteLength be view.[[ByteLength]].
-
Set firstDescriptor’s buffer to ? TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
-
Perform ? ReadableByteStreamControllerRespondInternal(controller, viewByteLength).
-
Assert: controller.[[byobRequest]] is null.
-
Let descriptor be controller.[[pendingPullIntos]][0].
-
Remove descriptor from controller.[[pendingPullIntos]].
-
Return descriptor.
-
Let stream be controller.[[stream]].
-
If stream.[[state]] is not "
readable
", return false. -
If controller.[[closeRequested]] is true, return false.
-
If controller.[[started]] is false, return false.
-
If ! ReadableStreamHasDefaultReader(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0, return true.
-
If ! ReadableStreamHasBYOBReader(stream) is true and ! ReadableStreamGetNumReadIntoRequests(stream) > 0, return true.
-
Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize(controller).
-
Assert: desiredSize is not null.
-
If desiredSize > 0, return true.
-
Return false.
-
Assert: stream.[[controller]] is undefined.
-
If autoAllocateChunkSize is not undefined,
-
Assert: ! IsInteger(autoAllocateChunkSize) is true.
-
Assert: autoAllocateChunkSize is positive.
-
-
Set controller.[[stream]] to stream.
-
Set controller.[[pullAgain]] and controller.[[pulling]] to false.
-
Set controller.[[byobRequest]] to null.
-
Perform ! ResetQueue(controller).
-
Set controller.[[closeRequested]] and controller.[[started]] to false.
-
Set controller.[[strategyHWM]] to highWaterMark.
-
Set controller.[[pullAlgorithm]] to pullAlgorithm.
-
Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
-
Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
-
Set controller.[[pendingPullIntos]] to a new empty list.
-
Set stream.[[controller]] to controller.
-
Let startResult be the result of performing startAlgorithm.
-
Let startPromise be a promise resolved with startResult.
-
Upon fulfillment of startPromise,
-
Set controller.[[started]] to true.
-
Assert: controller.[[pulling]] is false.
-
Assert: controller.[[pullAgain]] is false.
-
Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
-
-
Upon rejection of startPromise with reason r,
-
Perform ! ReadableByteStreamControllerError(controller, r).
-
-
Let controller be a new
ReadableByteStreamController
. -
Let startAlgorithm be an algorithm that returns undefined.
-
Let pullAlgorithm be an algorithm that returns a promise resolved with undefined.
-
Let cancelAlgorithm be an algorithm that returns a promise resolved with undefined.
-
If underlyingSourceDict["
start
"] exists, then set startAlgorithm to an algorithm which returns the result of invoking underlyingSourceDict["start
"] with argument list « controller » and callback this value underlyingSource. -
If underlyingSourceDict["
pull
"] exists, then set pullAlgorithm to an algorithm which returns the result of invoking underlyingSourceDict["pull
"] with argument list « controller » and callback this value underlyingSource. -
If underlyingSourceDict["
cancel
"] exists, then set cancelAlgorithm to an algorithm which takes an argument reason and returns the result of invoking underlyingSourceDict["cancel
"] with argument list « reason » and callback this value underlyingSource. -
Let autoAllocateChunkSize be underlyingSourceDict["
autoAllocateChunkSize
"], if it exists, or undefined otherwise. -
If autoAllocateChunkSize is 0, then throw a
TypeError
exception. -
Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize).
5. Writable streams
5.1. Using writable streams
readableStream. pipeTo( writableStream) . then(() => console. log( "All data successfully written!" )) . catch ( e=> console. error( "Something went wrong!" , e));
write()
and close()
methods. Since
writable streams queue any incoming writes, and take care internally to forward them to the
underlying sink in sequence, you can indiscriminately write to a writable stream without much
ceremony:
function writeArrayToStream( array, writableStream) { const writer= writableStream. getWriter(); array. forEach( chunk=> writer. write( chunk). catch (() => {})); return writer. close(); } writeArrayToStream([ 1 , 2 , 3 , 4 , 5 ], writableStream) . then(() => console. log( "All done!" )) . catch ( e=> console. error( "Error with the stream: " + e));
Note how we use .catch(() => {})
to suppress any rejections from the
write()
method; we’ll be notified of any fatal errors via a
rejection of the close()
method, and leaving them un-caught would
cause potential unhandledrejection
events and console warnings.
close()
method.
That promise will reject if anything goes wrong with the stream—initializing it, writing to it, or
closing it. And it will fulfill once the stream is successfully closed. Often this is all you care
about.
However, if you care about the success of writing a specific chunk, you can use the promise
returned by the writer’s write()
method:
writer. write( "i am a chunk of data" ) . then(() => console. log( "chunk successfully written!" )) . catch ( e=> console. error( e));
What "success" means is up to a given stream instance (or more precisely, its underlying sink) to decide. For example, for a file stream it could simply mean that the OS has accepted the write, and not necessarily that the chunk has been flushed to disk. Some streams might not be able to give such a signal at all, in which case the returned promise will fulfill immediately.
desiredSize
and ready
properties of writable stream writers allow producers to more precisely respond to flow
control signals from the stream, to keep memory usage below the stream’s specified high water mark. The following example writes an infinite sequence of random bytes to a stream, using
desiredSize
to determine how many bytes to generate at a given
time, and using ready
to wait for the backpressure to subside.
async function writeRandomBytesForever( writableStream) { const writer= writableStream. getWriter(); while ( true ) { await writer. ready; const bytes= new Uint8Array( writer. desiredSize); crypto. getRandomValues( bytes); // Purposefully don't await; awaiting writer.ready is enough. writer. write( bytes). catch (() => {}); } } writeRandomBytesForever( myWritableStream). catch ( e=> console. error( "Something broke" , e));
Note how we don’t await
the promise returned by
write()
; this would be redundant with await
ing the
ready
promise. Additionally, similar to a previous example, we use the .catch(() =>
{})
pattern on the promises returned by write()
; in this
case we’ll be notified about any failures
await
ing the ready
promise.
await
the promise returned by
write()
, consider a modification of the above example, where we
continue to use the WritableStreamDefaultWriter
interface directly, but we don’t control how
many bytes we have to write at a given time. In that case, the backpressure-respecting code
looks the same:
async function writeSuppliedBytesForever( writableStream, getBytes) { const writer= writableStream. getWriter(); while ( true ) { await writer. ready; const bytes= getBytes(); writer. write( bytes). catch (() => {}); } }
Unlike the previous example, where—because we were always writing exactly
writer.desiredSize
bytes each time—the
write()
and ready
promises were
synchronized, in this case it’s quite possible that the ready
promise fulfills before the one returned by write()
does.
Remember, the ready
promise fulfills when the desired size becomes positive, which might be before the write
succeeds (especially in cases with a larger high water mark).
In other words, await
ing the return value of write()
means you never queue up writes in the stream’s internal queue, instead only executing a write
after the previous one succeeds, which can result in low throughput.
5.2. The WritableStream
class
The WritableStream
represents a writable stream.
5.2.1. Interface definition
The Web IDL definition for the WritableStream
class is given as follows:
[Exposed=*,Transferable ]interface {
WritableStream constructor (optional object ,
underlyingSink optional QueuingStrategy = {});
strategy readonly attribute boolean locked ;Promise <undefined >abort (optional any );
reason Promise <undefined >close ();WritableStreamDefaultWriter getWriter (); };
5.2.2. Internal slots
Instances of WritableStream
are created with the internal slots described in the following
table:
Internal Slot | Description (non-normative) |
---|---|
[[backpressure]] | A boolean indicating the backpressure signal set by the controller |
[[closeRequest]] | The promise returned from the writer’s
close() method
|
[[controller]] | A WritableStreamDefaultController created with the ability to
control the state and queue of this stream
|
[[Detached]] | A boolean flag set to true when the stream is transferred |
[[inFlightWriteRequest]] | A slot set to the promise for the current in-flight write operation while the underlying sink’s write algorithm is executing and has not yet fulfilled, used to prevent reentrant calls |
[[inFlightCloseRequest]] | A slot set to the promise for the current in-flight close operation
while the underlying sink’s close algorithm is executing and has not yet fulfilled, used to
prevent the abort() method from interrupting close
|
[[pendingAbortRequest]] | A pending abort request |
[[state]] | A string containing the stream’s current state, used internally; one of
"writable ", "closed ", "erroring ", or "errored "
|
[[storedError]] | A value indicating how the stream failed, to be given as a failure
reason or exception when trying to operate on the stream while in the "errored " state
|
[[writer]] | A WritableStreamDefaultWriter instance, if the stream is locked to a writer, or undefined if it is not
|
[[writeRequests]] | A list of promises representing the stream’s internal queue of write requests not yet processed by the underlying sink |
The [[inFlightCloseRequest]] slot and [[closeRequest]] slot are mutually exclusive. Similarly, no element will be removed from [[writeRequests]] while [[inFlightWriteRequest]] is not undefined. Implementations can optimize storage for these slots based on these invariants.
A pending abort request is a struct used to track a request to abort the stream before that request is finally processed. It has the following items:
- promise
-
A promise returned from WritableStreamAbort
- reason
-
A JavaScript value that was passed as the abort reason to WritableStreamAbort
- was already erroring
-
A boolean indicating whether or not the stream was in the "
erroring
" state when WritableStreamAbort was called, which impacts the outcome of the abort request
5.2.3. The underlying sink API
The WritableStream()
constructor accepts as its first argument a JavaScript object representing
the underlying sink. Such objects can contain any of the following properties:
dictionary {
UnderlyingSink UnderlyingSinkStartCallback start ;UnderlyingSinkWriteCallback write ;UnderlyingSinkCloseCallback close ;UnderlyingSinkAbortCallback abort ;any type ; };callback =
UnderlyingSinkStartCallback any (WritableStreamDefaultController );
controller callback =
UnderlyingSinkWriteCallback Promise <undefined > (any ,
chunk WritableStreamDefaultController );
controller callback =
UnderlyingSinkCloseCallback Promise <undefined > ();callback =
UnderlyingSinkAbortCallback Promise <undefined > (optional any );
reason
start(controller)
, of type UnderlyingSinkStartCallback-
A function that is called immediately during creation of the
WritableStream
.Typically this is used to acquire access to the underlying sink resource being represented.
If this setup process is asynchronous, it can return a promise to signal success or failure; a rejected promise will error the stream. Any thrown exceptions will be re-thrown by the
WritableStream()
constructor. write(chunk, controller)
, of type UnderlyingSinkWriteCallback-
A function that is called when a new chunk of data is ready to be written to the underlying sink. The stream implementation guarantees that this function will be called only after previous writes have succeeded, and never before
start()
has succeeded or afterclose()
orabort()
have been called.This function is used to actually send the data to the resource presented by the underlying sink, for example by calling a lower-level API.
If the process of writing data is asynchronous, and communicates success or failure signals back to its user, then this function can return a promise to signal success or failure. This promise return value will be communicated back to the caller of
writer.write()
, so they can monitor that individual write. Throwing an exception is treated the same as returning a rejected promise.Note that such signals are not always available; compare e.g. § 10.6 A writable stream with no backpressure or success signals with § 10.7 A writable stream with backpressure and success signals. In such cases, it’s best to not return anything.
The promise potentially returned by this function also governs whether the given chunk counts as written for the purposes of computed the desired size to fill the stream’s internal queue. That is, during the time it takes the promise to settle,
writer.desiredSize
will stay at its previous value, only increasing to signal the desire for more chunks once the write succeeds.Finally, the promise potentially returned by this function is used to ensure that well-behaved producers do not attempt to mutate the chunk before it has been fully processed. (This is not guaranteed by any specification machinery, but instead is an informal contract between producers and the underlying sink.)
close()
, of type UnderlyingSinkCloseCallback-
A function that is called after the producer signals, via
writer.close()
, that they are done writing chunks to the stream, and subsequently all queued-up writes have successfully completed.This function can perform any actions necessary to finalize or flush writes to the underlying sink, and release access to any held resources.
If the shutdown process is asynchronous, the function can return a promise to signal success or failure; the result will be communicated via the return value of the called
writer.close()
method. Additionally, a rejected promise will error the stream, instead of letting it close successfully. Throwing an exception is treated the same as returning a rejected promise. abort(reason)
, of type UnderlyingSinkAbortCallback-
A function that is called after the producer signals, via
stream.abort()
orwriter.abort()
, that they wish to abort the stream. It takes as its argument the same value as was passed to those methods by the producer.Writable streams can additionally be aborted under certain conditions during piping; see the definition of the
pipeTo()
method for more details.This function can clean up any held resources, much like
close()
, but perhaps with some custom handling.If the shutdown process is asynchronous, the function can return a promise to signal success or failure; the result will be communicated via the return value of the called
writer.abort()
method. Throwing an exception is treated the same as returning a rejected promise. Regardless, the stream will be errored with a newTypeError
indicating that it was aborted. type
, of type any-
This property is reserved for future use, so any attempts to supply a value will throw an exception.
The controller
argument passed to start()
and
write()
is an instance of WritableStreamDefaultController
, and has the
ability to error the stream. This is mainly used for bridging the gap with non-promise-based APIs,
as seen for example in § 10.6 A writable stream with no backpressure or success signals.
5.2.4. Constructor, methods, and properties
stream = new
WritableStream
(underlyingSink[, strategy)-
Creates a new
WritableStream
wrapping the provided underlying sink. See § 5.2.3 The underlying sink API for more details on the underlyingSink argument.The strategy argument represents the stream’s queuing strategy, as described in § 7.1 The queuing strategy API. If it is not provided, the default behavior will be the same as a
CountQueuingStrategy
with a high water mark of 1. isLocked = stream.
locked
-
Returns whether or not the writable stream is locked to a writer.
await stream.
abort
([ reason ])-
Aborts the stream, signaling that the producer can no longer successfully write to the stream and it is to be immediately moved to an errored state, with any queued-up writes discarded. This will also execute any abort mechanism of the underlying sink.
The returned promise will fulfill if the stream shuts down successfully, or reject if the underlying sink signaled that there was an error doing so. Additionally, it will reject with a
TypeError
(without attempting to cancel the stream) if the stream is currently locked. await stream.
close
()-
Closes the stream. The underlying sink will finish processing any previously-written chunks, before invoking its close behavior. During this time any further attempts to write will fail (without erroring the stream).
The method returns a promise that will fulfill if all remaining chunks are successfully written and the stream successfully closes, or rejects if an error is encountered during this process. Additionally, it will reject with a
TypeError
(without attempting to cancel the stream) if the stream is currently locked. writer = stream.
getWriter
()-
Creates a writer (an instance of
WritableStreamDefaultWriter
) and locks the stream to the new writer. While the stream is locked, no other writer can be acquired until this one is released.This functionality is especially useful for creating abstractions that desire the ability to write to a stream without interruption or interleaving. By getting a writer for the stream, you can ensure nobody else can write at the same time, which would cause the resulting written data to be unpredictable and probably useless.
new WritableStream(underlyingSink, strategy)
constructor steps are:
-
If underlyingSink is missing, set it to null.
-
Let underlyingSinkDict be underlyingSink, converted to an IDL value of type
UnderlyingSink
.We cannot declare the underlyingSink argument as having the
UnderlyingSink
type directly, because doing so would lose the reference to the original object. We need to retain the object so we can invoke the various methods on it. -
If underlyingSinkDict["
type
"] exists, throw aRangeError
exception.This is to allow us to add new potential types in the future, without backward-compatibility concerns.
-
Perform ! InitializeWritableStream(this).
-
Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
-
Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
-
Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink(this, underlyingSink, underlyingSinkDict, highWaterMark, sizeAlgorithm).
locked
getter steps are:
-
Return ! IsWritableStreamLocked(this).
abort(reason)
method steps are:
-
If ! IsWritableStreamLocked(this) is true, return a promise rejected with a
TypeError
exception. -
Return ! WritableStreamAbort(this, reason).
close()
method steps are:
-
If ! IsWritableStreamLocked(this) is true, return a promise rejected with a
TypeError
exception. -
If ! WritableStreamCloseQueuedOrInFlight(this) is true, return a promise rejected with a
TypeError
exception. -
Return ! WritableStreamClose(this).
getWriter()
method steps are:
-
Return ? AcquireWritableStreamDefaultWriter(this).
5.2.5. Transfer via postMessage()
destination.postMessage(ws, { transfer: [ws] });
-
Sends a
WritableStream
to another frame, window, or worker.The transferred stream can be used exactly like the original. The original will become locked and no longer directly usable.
WritableStream
objects are transferable objects. Their transfer steps, given value
and dataHolder, are:
-
If ! IsWritableStreamLocked(value) is true, throw a "
DataCloneError
"DOMException
. -
Let port1 be a new
MessagePort
in the current Realm. -
Let port2 be a new
MessagePort
in the current Realm. -
Entangle port1 and port2.
-
Let readable be a new
ReadableStream
in the current Realm. -
Perform ! SetUpCrossRealmTransformReadable(readable, port1).
-
Let promise be ! ReadableStreamPipeTo(readable, value, false, false, false).
-
Set promise.[[PromiseIsHandled]] to true.
-
Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
-
Let deserializedRecord be ! StructuredDeserializeWithTransfer(dataHolder.[[port]], the current Realm).
-
Let port be a deserializedRecord.[[Deserialized]].
-
Perform ! SetUpCrossRealmTransformWritable(value, port).
5.3. The WritableStreamDefaultWriter
class
The WritableStreamDefaultWriter
class represents a writable stream writer designed to be
vended by a WritableStream
instance.
5.3.1. Interface definition
The Web IDL definition for the WritableStreamDefaultWriter
class is given as follows:
[Exposed=*]interface {
WritableStreamDefaultWriter constructor (WritableStream );
stream readonly attribute Promise <undefined >closed ;readonly attribute unrestricted double ?desiredSize ;readonly attribute Promise <undefined >ready ;Promise <undefined >abort (optional any );
reason Promise <undefined >close ();undefined releaseLock ();Promise <undefined >write (optional any ); };
chunk
5.3.2. Internal slots
Instances of WritableStreamDefaultWriter
are created with the internal slots described in the
following table:
Internal Slot | Description (non-normative) |
---|---|
[[closedPromise]] | A promise returned by the writer’s
closed getter
|
[[readyPromise]] | A promise returned by the writer’s
ready getter
|
[[stream]] | A WritableStream instance that owns this reader
|
5.3.3. Constructor, methods, and properties
writer = new
WritableStreamDefaultWriter
(stream)-
This is equivalent to calling
stream.
.getWriter()
await writer.
closed
-
Returns a promise that will be fulfilled when the stream becomes closed, or rejected if the stream ever errors or the writer’s lock is released before the stream finishes closing.
desiredSize = writer.
desiredSize
-
Returns the desired size to fill the stream’s internal queue. It can be negative, if the queue is over-full. A producer can use this information to determine the right amount of data to write.
It will be null if the stream cannot be successfully written to (due to either being errored, or having an abort queued up). It will return zero if the stream is closed. And the getter will throw an exception if invoked when the writer’s lock is released.
await writer.
ready
-
Returns a promise that will be fulfilled when the desired size to fill the stream’s internal queue transitions from non-positive to positive, signaling that it is no longer applying backpressure. Once the desired size dips back to zero or below, the getter will return a new promise that stays pending until the next transition.
If the stream becomes errored or aborted, or the writer’s lock is released, the returned promise will become rejected.
await writer.
abort
([ reason ])-
If the reader is active, behaves the same as
stream.
.abort
(reason) await writer.
close
()-
If the reader is active, behaves the same as
stream.
.close
() writer.
releaseLock
()-
Releases the writer’s lock on the corresponding stream. After the lock is released, the writer is no longer active. If the associated stream is errored when the lock is released, the writer will appear errored in the same way from now on; otherwise, the writer will appear closed.
Note that the lock can still be released even if some ongoing writes have not yet finished (i.e. even if the promises returned from previous calls to
write()
have not yet settled). It’s not necessary to hold the lock on the writer for the duration of the write; the lock instead simply prevents other producers from writing in an interleaved manner. await writer.
write
(chunk)-
Writes the given chunk to the writable stream, by waiting until any previous writes have finished successfully, and then sending the chunk to the underlying sink’s
write()
method. It will return a promise that fulfills with undefined upon a successful write, or rejects if the write fails or stream becomes errored before the writing process is initiated.Note that what "success" means is up to the underlying sink; it might indicate simply that the chunk has been accepted, and not necessarily that it is safely saved to its ultimate destination.
If chunk is mutable, producers are advised to avoid mutating it after passing it to
write()
, until after the promise returned bywrite()
settles. This ensures that the underlying sink receives and processes the same value that was passed in.
new WritableStreamDefaultWriter(stream)
constructor steps are:
-
Perform ? SetUpWritableStreamDefaultWriter(this, stream).
closed
getter steps are:
-
Return this.[[closedPromise]].
desiredSize
getter steps are:
-
If this.[[stream]] is undefined, throw a
TypeError
exception. -
Return ! WritableStreamDefaultWriterGetDesiredSize(this).
ready
getter
steps are:
-
Return this.[[readyPromise]].
abort(reason)
method steps are:
-
If this.[[stream]] is undefined, return a promise rejected with a
TypeError
exception. -
Return ! WritableStreamDefaultWriterAbort(this, reason).
close()
method
steps are:
-
Let stream be this.[[stream]].
-
If stream is undefined, return a promise rejected with a
TypeError
exception. -
If ! WritableStreamCloseQueuedOrInFlight(stream) is true, return a promise rejected with a
TypeError
exception. -
Return ! WritableStreamDefaultWriterClose(this).
releaseLock()
method steps are:
-
Let stream be this.[[stream]].
-
If stream is undefined, return.
-
Assert: stream.[[writer]] is not undefined.
-
Perform ! WritableStreamDefaultWriterRelease(this).
write(chunk)
method steps are:
-
If this.[[stream]] is undefined, return a promise rejected with a
TypeError
exception. -
Return ! WritableStreamDefaultWriterWrite(this, chunk).
5.4. The WritableStreamDefaultController
class
The WritableStreamDefaultController
class has methods that allow control of a
WritableStream
’s state. When constructing a WritableStream
, the underlying sink is
given a corresponding WritableStreamDefaultController
instance to manipulate.
5.4.1. Interface definition
The Web IDL definition for the WritableStreamDefaultController
class is given as follows:
[Exposed=*]interface {
WritableStreamDefaultController readonly attribute AbortSignal signal ;undefined error (optional any ); };
e
5.4.2. Internal slots
Instances of WritableStreamDefaultController
are created with the internal slots described in
the following table:
Internal Slot | Description (non-normative) |
---|---|
[[abortAlgorithm]] | A promise-returning algorithm, taking one argument (the abort reason), which communicates a requested abort to the underlying sink |
[[abortController]] | An AbortController that can be used to abort the pending write or
close operation when the stream is aborted.
|
[[closeAlgorithm]] | A promise-returning algorithm which communicates a requested close to the underlying sink |
[[queue]] | A list representing the stream’s internal queue of chunks |
[[queueTotalSize]] | The total size of all the chunks stored in [[queue]] (see § 8.1 Queue-with-sizes) |
[[started]] | A boolean flag indicating whether the underlying sink has finished starting |
[[strategyHWM]] | A number supplied by the creator of the stream as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying sink |
[[strategySizeAlgorithm]] | An algorithm to calculate the size of enqueued chunks, as part of the stream’s queuing strategy |
[[stream]] | The WritableStream instance controlled
|
[[writeAlgorithm]] | A promise-returning algorithm, taking one argument (the chunk to write), which writes data to the underlying sink |
The close sentinel is a unique value enqueued into [[queue]], in lieu of a chunk, to signal that the stream is closed. It is only used internally, and is never exposed to web developers.
5.4.3. Methods and properties
controller.
signal
-
An AbortSignal that can be used to abort the pending write or close operation when the stream is aborted.
controller.
error
(e)-
Closes the controlled writable stream, making all future interactions with it fail with the given error e.
This method is rarely used, since usually it suffices to return a rejected promise from one of the underlying sink’s methods. However, it can be useful for suddenly shutting down a stream in response to an event outside the normal lifecycle of interactions with the underlying sink.
signal
getter steps are:
-
Return this.[[abortController]]’s signal.
error(e)
method steps are:
-
Let state be this.[[stream]].[[state]].
-
If state is not "
writable
", return. -
Perform ! WritableStreamDefaultControllerError(this, e).
5.4.4. Internal methods
The following are internal methods implemented by each WritableStreamDefaultController
instance.
The writable stream implementation will call into these.
The reason these are in method form, instead of as abstract operations, is to make it clear that the writable stream implementation is decoupled from the controller implementation, and could in the future be expanded with other controllers, as long as those controllers implemented such internal methods. A similar scenario is seen for readable streams (see § 4.9.2 Interfacing with controllers), where there actually are multiple controller types and as such the counterpart internal methods are used polymorphically.
-
Let result be the result of performing this.[[abortAlgorithm]], passing reason.
-
Perform ! WritableStreamDefaultControllerClearAlgorithms(this).
-
Return result.
-
Perform ! ResetQueue(this).
5.5. Abstract operations
5.5.1. Working with writable streams
The following abstract operations operate on WritableStream
instances at a higher level.
-
Let writer be a new
WritableStreamDefaultWriter
. -
Perform ? SetUpWritableStreamDefaultWriter(writer, stream).
-
Return writer.
-
Assert: ! IsNonNegativeNumber(highWaterMark) is true.
-
Let stream be a new
WritableStream
. -
Perform ! InitializeWritableStream(stream).
-
Let controller be a new
WritableStreamDefaultController
. -
Perform ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm).
-
Return stream.
This abstract operation will throw an exception if and only if the supplied startAlgorithm throws.
-
Set stream.[[state]] to "
writable
". -
Set stream.[[storedError]], stream.[[writer]], stream.[[controller]], stream.[[inFlightWriteRequest]], stream.[[closeRequest]], stream.[[inFlightCloseRequest]], and stream.[[pendingAbortRequest]] to undefined.
-
Set stream.[[writeRequests]] to a new empty list.
-
Set stream.[[backpressure]] to false.
-
If stream.[[writer]] is undefined, return false.
-
Return true.
-
If ! IsWritableStreamLocked(stream) is true, throw a
TypeError
exception. -
Set writer.[[stream]] to stream.
-
Set stream.[[writer]] to writer.
-
Let state be stream.[[state]].
-
If state is "
writable
",-
If ! WritableStreamCloseQueuedOrInFlight(stream) is false and stream.[[backpressure]] is true, set writer.[[readyPromise]] to a new promise.
-
Otherwise, set writer.[[readyPromise]] to a promise resolved with undefined.
-
Set writer.[[closedPromise]] to a new promise.
-
-
Otherwise, if state is "
erroring
",-
Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
-
Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
-
Set writer.[[closedPromise]] to a new promise.
-
-
Otherwise, if state is "
closed
",-
Set writer.[[readyPromise]] to a promise resolved with undefined.
-
Set writer.[[closedPromise]] to a promise resolved with undefined.
-
-
Otherwise,
-
Assert: state is "
errored
". -
Let storedError be stream.[[storedError]].
-
Set writer.[[readyPromise]] to a promise rejected with storedError.
-
Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
-
Set writer.[[closedPromise]] to a promise rejected with storedError.
-
Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
-
-
If stream.[[state]] is "
closed
" or "errored
", return a promise resolved with undefined. -
Signal abort on stream.[[controller]].[[abortController]] with reason.
-
Let state be stream.[[state]].
-
If state is "
closed
" or "errored
", return a promise resolved with undefined.We re-check the state because signaling abort runs author code and that might have changed the state.
-
If stream.[[pendingAbortRequest]] is not undefined, return stream.[[pendingAbortRequest]]’s promise.
-
Assert: state is "
writable
" or "erroring
". -
Let wasAlreadyErroring be false.
-
If state is "
erroring
",-
Set wasAlreadyErroring to true.
-
Set reason to undefined.
-
-
Let promise be a new promise.
-
Set stream.[[pendingAbortRequest]] to a new pending abort request whose promise is promise, reason is reason, and was already erroring is wasAlreadyErroring.
-
If wasAlreadyErroring is false, perform ! WritableStreamStartErroring(stream, reason).
-
Return promise.
-
Let state be stream.[[state]].
-
If state is "
closed
" or "errored
", return a promise rejected with aTypeError
exception. -
Assert: state is "
writable
" or "erroring
". -
Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
-
Let promise be a new promise.
-
Set stream.[[closeRequest]] to promise.
-
Let writer be stream.[[writer]].
-
If writer is not undefined, and stream.[[backpressure]] is true, and state is "
writable
", resolve writer.[[readyPromise]] with undefined. -
Perform ! WritableStreamDefaultControllerClose(stream.[[controller]]).
-
Return promise.
5.5.2. Interfacing with controllers
To allow future flexibility to add different writable stream behaviors (similar to the distinction
between default readable streams and readable byte streams), much of the internal state of a
writable stream is encapsulated by the WritableStreamDefaultController
class.
Each controller class defines two internal methods, which are called by the WritableStream
algorithms:
- [[AbortSteps]](reason)
- The controller’s steps that run in reaction to the stream being aborted, used to clean up the state stored in the controller and inform the underlying sink.
- [[ErrorSteps]]()
- The controller’s steps that run in reaction to the stream being errored, used to clean up the state stored in the controller.
(These are defined as internal methods, instead of as abstract operations, so that they can be
called polymorphically by the WritableStream
algorithms, without having to branch on which type
of controller is present. This is a bit theoretical for now, given that only
WritableStreamDefaultController
exists so far.)
The rest of this section concerns abstract operations that go in the other direction: they are used
by the controller implementation to affect its associated WritableStream
object. This
translates internal state changes of the controllerinto developer-facing results visible through
the WritableStream
’s public API.
-
Assert: ! IsWritableStreamLocked(stream) is true.
-
Assert: stream.[[state]] is "
writable
". -
Let promise be a new promise.
-
Append promise to stream.[[writeRequests]].
-
Return promise.
-
If stream.[[closeRequest]] is undefined and stream.[[inFlightCloseRequest]] is undefined, return false.
-
Return true.
-
Let state be stream.[[state]].
-
If state is "
writable
",-
Perform ! WritableStreamStartErroring(stream, error).
-
Return.
-
-
Assert: state is "
erroring
". -
Perform ! WritableStreamFinishErroring(stream).
-
Assert: stream.[[state]] is "
erroring
". -
Assert: ! WritableStreamHasOperationMarkedInFlight(stream) is false.
-
Set stream.[[state]] to "
errored
". -
Perform ! stream.[[controller]].[[ErrorSteps]]().
-
Let storedError be stream.[[storedError]].
-
For each writeRequest of stream.[[writeRequests]]:
-
Reject writeRequest with storedError.
-
-
Set stream.[[writeRequests]] to an empty list.
-
If stream.[[pendingAbortRequest]] is undefined,
-
Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
-
Return.
-
-
Let abortRequest be stream.[[pendingAbortRequest]].
-
Set stream.[[pendingAbortRequest]] to undefined.
-
If abortRequest’s was already erroring is true,
-
Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
-
Return.
-
Let promise be ! stream.[[controller]].[[AbortSteps]](abortRequest’s reason).
-
Upon fulfillment of promise,
-
Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
-
Upon rejection of promise with reason reason,
-
Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
-
Assert: stream.[[inFlightCloseRequest]] is not undefined.
-
Resolve stream.[[inFlightCloseRequest]] with undefined.
-
Set stream.[[inFlightCloseRequest]] to undefined.
-
Let state be stream.[[state]].
-
Assert: stream.[[state]] is "
writable
" or "erroring
". -
If state is "
erroring
",-
Set stream.[[storedError]] to undefined.
-
If stream.[[pendingAbortRequest]] is not undefined,
-
Resolve stream.[[pendingAbortRequest]]’s promise with undefined.
-
Set stream.[[pendingAbortRequest]] to undefined.
-
-
-
Set stream.[[state]] to "
closed
". -
Let writer be stream.[[writer]].
-
If writer is not undefined, resolve writer.[[closedPromise]] with undefined.
-
Assert: stream.[[pendingAbortRequest]] is undefined.
-
Assert: stream.[[storedError]] is undefined.
-
Assert: stream.[[inFlightCloseRequest]] is not undefined.
-
Reject stream.[[inFlightCloseRequest]] with error.
-
Set stream.[[inFlightCloseRequest]] to undefined.
-
Assert: stream.[[state]] is "
writable
" or "erroring
". -
If stream.[[pendingAbortRequest]] is not undefined,
-
Reject stream.[[pendingAbortRequest]]’s promise with error.
-
Set stream.[[pendingAbortRequest]] to undefined.
-
-
Perform ! WritableStreamDealWithRejection(stream, error).
-
Assert: stream.[[inFlightWriteRequest]] is not undefined.
-
Resolve stream.[[inFlightWriteRequest]] with undefined.
-
Set stream.[[inFlightWriteRequest]] to undefined.
-
Assert: stream.[[inFlightWriteRequest]] is not undefined.
-
Reject stream.[[inFlightWriteRequest]] with error.
-
Set stream.[[inFlightWriteRequest]] to undefined.
-
Assert: stream.[[state]] is "
writable
" or "erroring
". -
Perform ! WritableStreamDealWithRejection(stream, error).
-
If stream.[[inFlightWriteRequest]] is undefined and stream.[[inFlightCloseRequest]] is undefined, return false.
-
Return true.
-
Assert: stream.[[inFlightCloseRequest]] is undefined.
-
Assert: stream.[[closeRequest]] is not undefined.
-
Set stream.[[inFlightCloseRequest]] to stream.[[closeRequest]].
-
Set stream.[[closeRequest]] to undefined.
-
Assert: stream.[[inFlightWriteRequest]] is undefined.
-
Assert: stream.[[writeRequests]] is not empty.
-
Let writeRequest be stream.[[writeRequests]][0].
-
Remove writeRequest from stream.[[writeRequests]].
-
Set stream.[[inFlightWriteRequest]] to writeRequest.
-
Assert: stream.[[state]] is "
errored
". -
If stream.[[closeRequest]] is not undefined,
-
Assert: stream.[[inFlightCloseRequest]] is undefined.
-
Reject stream.[[closeRequest]] with stream.[[storedError]].
-
Set stream.[[closeRequest]] to undefined.
-
-
Let writer be stream.[[writer]].
-
If writer is not undefined,
-
Reject writer.[[closedPromise]] with stream.[[storedError]].
-
Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
-
-
Assert: stream.[[storedError]] is undefined.
-
Assert: stream.[[state]] is "
writable
". -
Let controller be stream.[[controller]].
-
Assert: controller is not undefined.
-
Set stream.[[state]] to "
erroring
". -
Set stream.[[storedError]] to reason.
-
Let writer be stream.[[writer]].
-
If writer is not undefined, perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason).
-
If ! WritableStreamHasOperationMarkedInFlight(stream) is false and controller.[[started]] is true, perform ! WritableStreamFinishErroring(stream).
-
Assert: stream.[[state]] is "
writable
". -
Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
-
Let writer be stream.[[writer]].
-
If writer is not undefined and backpressure is not stream.[[backpressure]],
-
If backpressure is true, set writer.[[readyPromise]] to a new promise.
-
Otherwise,
-
Assert: backpressure is false.
-
Resolve writer.[[readyPromise]] with undefined.
-
-
-
Set stream.[[backpressure]] to backpressure.
5.5.3. Writers
The following abstract operations support the implementation and manipulation of
WritableStreamDefaultWriter
instances.
-
Let stream be writer.[[stream]].
-
Assert: stream is not undefined.
-
Return ! WritableStreamAbort(stream, reason).
-
Let stream be writer.[[stream]].
-
Assert: stream is not undefined.
-
Return ! WritableStreamClose(stream).
-
Let stream be writer.[[stream]].
-
Assert: stream is not undefined.
-
Let state be stream.[[state]].
-
If ! WritableStreamCloseQueuedOrInFlight(stream) is true or state is "
closed
", return a promise resolved with undefined. -
If state is "
errored
", return a promise rejected with stream.[[storedError]]. -
Assert: state is "
writable
" or "erroring
". -
Return ! WritableStreamDefaultWriterClose(writer).
This abstract operation helps implement the error propagation semantics of
ReadableStream
’s pipeTo()
.
-
If writer.[[closedPromise]].[[PromiseState]] is "
pending
", reject writer.[[closedPromise]] with error. -
Otherwise, set writer.[[closedPromise]] to a promise rejected with error.
-
Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
-
If writer.[[readyPromise]].[[PromiseState]] is "
pending
", reject writer.[[readyPromise]] with error. -
Otherwise, set writer.[[readyPromise]] to a promise rejected with error.
-
Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
-
Let stream be writer.[[stream]].
-
Let state be stream.[[state]].
-
If state is "
errored
" or "erroring
", return null. -
If state is "
closed
", return 0. -
Return ! WritableStreamDefaultControllerGetDesiredSize(stream.[[controller]]).
-
Let stream be writer.[[stream]].
-
Assert: stream is not undefined.
-
Assert: stream.[[writer]] is writer.
-
Let releasedError be a new
TypeError
. -
Perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError).
-
Perform ! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError).
-
Set stream.[[writer]] to undefined.
-
Set writer.[[stream]] to undefined.
-
Let stream be writer.[[stream]].
-
Assert: stream is not undefined.
-
Let controller be stream.[[controller]].
-
Let chunkSize be ! WritableStreamDefaultControllerGetChunkSize(controller, chunk).
-
If stream is not equal to writer.[[stream]], return a promise rejected with a
TypeError
exception. -
Let state be stream.[[state]].
-
If state is "
errored
", return a promise rejected with stream.[[storedError]]. -
If ! WritableStreamCloseQueuedOrInFlight(stream) is true or state is "
closed
", return a promise rejected with aTypeError
exception indicating that the stream is closing or closed. -
If state is "
erroring
", return a promise rejected with stream.[[storedError]]. -
Assert: state is "
writable
". -
Let promise be ! WritableStreamAddWriteRequest(stream).
-
Perform ! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize).
-
Return promise.
5.5.4. Default controllers
The following abstract operations support the implementation of the
WritableStreamDefaultController
class.
-
Assert: stream implements
WritableStream
. -
Assert: stream.[[controller]] is undefined.
-
Set controller.[[stream]] to stream.
-
Set stream.[[controller]] to controller.
-
Perform ! ResetQueue(controller).
-
Set controller.[[abortController]] to a new
AbortController
. -
Set controller.[[started]] to false.
-
Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm.
-
Set controller.[[strategyHWM]] to highWaterMark.
-
Set controller.[[writeAlgorithm]] to writeAlgorithm.
-
Set controller.[[closeAlgorithm]] to closeAlgorithm.
-
Set controller.[[abortAlgorithm]] to abortAlgorithm.
-
Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
-
Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
-
Let startResult be the result of performing startAlgorithm. (This may throw an exception.)
-
Let startPromise be a promise resolved with startResult.
-
Upon fulfillment of startPromise,
-
Assert: stream.[[state]] is "
writable
" or "erroring
". -
Set controller.[[started]] to true.
-
Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
-
-
Upon rejection of startPromise with reason r,
-
Assert: stream.[[state]] is "
writable
" or "erroring
". -
Set controller.[[started]] to true.
-
Perform ! WritableStreamDealWithRejection(stream, r).
-
-
Let controller be a new
WritableStreamDefaultController
. -
Let startAlgorithm be an algorithm that returns undefined.
-
Let writeAlgorithm be an algorithm that returns a promise resolved with undefined.
-
Let closeAlgorithm be an algorithm that returns a promise resolved with undefined.
-
Let abortAlgorithm be an algorithm that returns a promise resolved with undefined.
-
If underlyingSinkDict["
start
"] exists, then set startAlgorithm to an algorithm which returns the result of invoking underlyingSinkDict["start
"] with argument list « controller », exception behavior "rethrow
", and callback this value underlyingSink. -
If underlyingSinkDict["
write
"] exists, then set writeAlgorithm to an algorithm which takes an argument chunk and returns the result of invoking underlyingSinkDict["write
"] with argument list « chunk, controller » and callback this value underlyingSink. -
If underlyingSinkDict["
close
"] exists, then set closeAlgorithm to an algorithm which returns the result of invoking underlyingSinkDict["close
"] with argument list «» and callback this value underlyingSink. -
If underlyingSinkDict["
abort
"] exists, then set abortAlgorithm to an algorithm which takes an argument reason and returns the result of invoking underlyingSinkDict["abort
"] with argument list « reason » and callback this value underlyingSink. -
Perform ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm).
-
Let stream be controller.[[stream]].
-
If controller.[[started]] is false, return.
-
If stream.[[inFlightWriteRequest]] is not undefined, return.
-
Let state be stream.[[state]].
-
Assert: state is not "
closed
" or "errored
". -
If state is "
erroring
",-
Perform ! WritableStreamFinishErroring(stream).
-
Return.
-
-
If controller.[[queue]] is empty, return.
-
Let value be ! PeekQueueValue(controller).
-
If value is the close sentinel, perform ! WritableStreamDefaultControllerProcessClose(controller).
-
Otherwise, perform ! WritableStreamDefaultControllerProcessWrite(controller, value).
WritableStream
itself is still referenced.
This is observable using weak references. See tc39/proposal-weakrefs#31 for more detail.
It performs the following steps:
-
Set controller.[[writeAlgorithm]] to undefined.
-
Set controller.[[closeAlgorithm]] to undefined.
-
Set controller.[[abortAlgorithm]] to undefined.
-
Set controller.[[strategySizeAlgorithm]] to undefined.
This algorithm will be performed multiple times in some edge cases. After the first time it will do nothing.
-
Perform ! EnqueueValueWithSize(controller, close sentinel, 0).
-
Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
-
Let stream be controller.[[stream]].
-
Assert: stream.[[state]] is "
writable
". -
Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
-
Perform ! WritableStreamStartErroring(stream, error).
-
If controller.[[stream]].[[state]] is "
writable
", perform ! WritableStreamDefaultControllerError(controller, error).
-
Let desiredSize be ! WritableStreamDefaultControllerGetDesiredSize(controller).
-
Return true if desiredSize ≤ 0, or false otherwise.
-
If controller.[[strategySizeAlgorithm]] is undefined, then:
-
Assert: controller.[[stream]].[[state]] is not "
writable
". -
Return 1.
-
-
Let returnValue be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk, and interpreting the result as a completion record.
-
If returnValue is an abrupt completion,
-
Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, returnValue.[[Value]]).
-
Return 1.
-
-
Return returnValue.[[Value]].
-
Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
-
Let stream be controller.[[stream]].
-
Perform ! WritableStreamMarkCloseRequestInFlight(stream).
-
Perform ! DequeueValue(controller).
-
Assert: controller.[[queue]] is empty.
-
Let sinkClosePromise be the result of performing controller.[[closeAlgorithm]].
-
Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
-
Upon fulfillment of sinkClosePromise,
-
Perform ! WritableStreamFinishInFlightClose(stream).
-
-
Upon rejection of sinkClosePromise with reason reason,
-
Perform ! WritableStreamFinishInFlightCloseWithError(stream, reason).
-
-
Let stream be controller.[[stream]].
-
Perform ! WritableStreamMarkFirstWriteRequestInFlight(stream).
-
Let sinkWritePromise be the result of performing controller.[[writeAlgorithm]], passing in chunk.
-
Upon fulfillment of sinkWritePromise,
-
Perform ! WritableStreamFinishInFlightWrite(stream).
-
Let state be stream.[[state]].
-
Assert: state is "
writable
" or "erroring
". -
Perform ! DequeueValue(controller).
-
If ! WritableStreamCloseQueuedOrInFlight(stream) is false and state is "
writable
",-
Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
-
Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
-
-
Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
-
-
Upon rejection of sinkWritePromise with reason,
-
If stream.[[state]] is "
writable
", perform ! WritableStreamDefaultControllerClearAlgorithms(controller). -
Perform ! WritableStreamFinishInFlightWriteWithError(stream, reason).
-
-
Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
-
If enqueueResult is an abrupt completion,
-
Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueResult.[[Value]]).
-
Return.
-
-
Let stream be controller.[[stream]].
-
If ! WritableStreamCloseQueuedOrInFlight(stream) is false and stream.[[state]] is "
writable
",-
Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
-
Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
-
-
Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
6. Transform streams
6.1. Using transform streams
readableStream. pipeThrough( transformStream) . pipeTo( writableStream) . then(() => console. log( "All data successfully transformed!" )) . catch ( e=> console. error( "Something went wrong!" , e));
readable
and writable
properties of a
transform stream directly to access the usual interfaces of a readable stream and writable stream. In this example we supply data to the writable side of the stream using its
writer interface. The readable side is then piped to
anotherWritableStream
.
const writer= transformStream. writable. getWriter(); writer. write( "input chunk" ); transformStream. readable. pipeTo( anotherWritableStream);
fetch()
API accepts a readable stream
request body, but it can be more convenient to write data for uploading via a
writable stream interface. Using an identity transform stream addresses this:
const { writable, readable} = new TransformStream(); fetch( "..." , { body: readable}). then( response=> /* ... */ ); const writer= writable. getWriter(); writer. write( new Uint8Array([ 0x73 , 0x74 , 0x72 , 0x65 , 0x61 , 0x6D , 0x73 , 0x21 ])); writer. close();
Another use of identity transform streams is to add additional buffering to a pipe. In this
example we add extra buffering between readableStream
and
writableStream
.
const writableStrategy= new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 }); readableStream. pipeThrough( new TransformStream( undefined , writableStrategy)) . pipeTo( writableStream);
6.2. The TransformStream
class
The TransformStream
class is a concrete instance of the general transform stream concept.
6.2.1. Interface definition
The Web IDL definition for the TransformStream
class is given as follows:
[Exposed=*,Transferable ]interface {
TransformStream constructor (optional object ,
transformer optional QueuingStrategy = {},
writableStrategy optional QueuingStrategy = {});
readableStrategy readonly attribute ReadableStream readable ;readonly attribute WritableStream writable ; };
6.2.2. Internal slots
Instances of TransformStream
are created with the internal slots described in the following
table:
Internal Slot | Description (non-normative) |
---|---|
[[backpressure]] | Whether there was backpressure on [[readable]] the last time it was observed |
[[backpressureChangePromise]] | A promise which is fulfilled and replaced every time the value of [[backpressure]] changes |
[[controller]] | A TransformStreamDefaultController created with the ability to
control [[readable]] and [[writable]]
|
[[Detached]] | A boolean flag set to true when the stream is transferred |
[[readable]] | The ReadableStream instance controlled by this object
|
[[writable]] | The WritableStream instance controlled by this object
|
6.2.3. The transformer API
The TransformStream()
constructor accepts as its first argument a JavaScript object representing
the transformer. Such objects can contain any of the following methods:
dictionary {
Transformer TransformerStartCallback start ;TransformerTransformCallback transform ;TransformerFlushCallback flush ;TransformerCancelCallback cancel ;any readableType ;any writableType ; };callback =
TransformerStartCallback any (TransformStreamDefaultController );
controller callback =
TransformerFlushCallback Promise <undefined > (TransformStreamDefaultController );
controller callback =
TransformerTransformCallback Promise <undefined > (any ,
chunk TransformStreamDefaultController );
controller callback =
TransformerCancelCallback Promise <undefined > (any );
reason
start(controller)
, of type TransformerStartCallback-
A function that is called immediately during creation of the
TransformStream
.Typically this is used to enqueue prefix chunks, using
controller.enqueue()
. Those chunks will be read from the