blob: 6473b2186e2a30e0a1ba3faeb0c028ebd8949600 [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/core/streams/readable_stream.h"
#include "base/stl_util.h"
#include "third_party/blink/renderer/bindings/core/v8/native_value_traits_impl.h"
#include "third_party/blink/renderer/bindings/core/v8/readable_stream_default_reader_or_readable_stream_byob_reader.h"
#include "third_party/blink/renderer/bindings/core/v8/script_function.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_abort_signal.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_iterator_result_value.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_readable_stream.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_readable_stream_get_reader_options.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_readable_writable_pair.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_stream_pipe_options.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_throw_dom_exception.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_underlying_source.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_writable_stream.h"
#include "third_party/blink/renderer/core/dom/abort_signal.h"
#include "third_party/blink/renderer/core/execution_context/execution_context.h"
#include "third_party/blink/renderer/core/frame/web_feature.h"
#include "third_party/blink/renderer/core/streams/miscellaneous_operations.h"
#include "third_party/blink/renderer/core/streams/promise_handler.h"
#include "third_party/blink/renderer/core/streams/readable_byte_stream_controller.h"
#include "third_party/blink/renderer/core/streams/readable_stream_controller.h"
#include "third_party/blink/renderer/core/streams/readable_stream_default_controller.h"
#include "third_party/blink/renderer/core/streams/readable_stream_generic_reader.h"
#include "third_party/blink/renderer/core/streams/readable_stream_transferring_optimizer.h"
#include "third_party/blink/renderer/core/streams/stream_algorithms.h"
#include "third_party/blink/renderer/core/streams/stream_promise_resolver.h"
#include "third_party/blink/renderer/core/streams/transferable_streams.h"
#include "third_party/blink/renderer/core/streams/underlying_source_base.h"
#include "third_party/blink/renderer/core/streams/writable_stream.h"
#include "third_party/blink/renderer/core/streams/writable_stream_default_controller.h"
#include "third_party/blink/renderer/core/streams/writable_stream_default_writer.h"
#include "third_party/blink/renderer/core/streams/writable_stream_transferring_optimizer.h"
#include "third_party/blink/renderer/platform/bindings/exception_code.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/bindings/script_state.h"
#include "third_party/blink/renderer/platform/bindings/v8_binding.h"
#include "third_party/blink/renderer/platform/heap/garbage_collected.h"
#include "third_party/blink/renderer/platform/heap/heap_allocator.h"
#include "third_party/blink/renderer/platform/heap/persistent.h"
#include "third_party/blink/renderer/platform/instrumentation/use_counter.h"
#include "third_party/blink/renderer/platform/wtf/assertions.h"
#include "third_party/blink/renderer/platform/wtf/deque.h"
namespace blink {
ReadableStream::PipeOptions::PipeOptions()
: prevent_close_(false), prevent_abort_(false), prevent_cancel_(false) {}
ReadableStream::PipeOptions::PipeOptions(const StreamPipeOptions* options)
: prevent_close_(options->hasPreventClose() ? options->preventClose()
: false),
prevent_abort_(options->hasPreventAbort() ? options->preventAbort()
: false),
prevent_cancel_(options->hasPreventCancel() ? options->preventCancel()
: false),
signal_(options->hasSignal() ? options->signal() : nullptr) {}
void ReadableStream::PipeOptions::Trace(Visitor* visitor) const {
visitor->Trace(signal_);
}
bool ReadableStream::PipeOptions::GetBoolean(ScriptState* script_state,
v8::Local<v8::Object> dictionary,
const char* property_name,
ExceptionState& exception_state) {
auto* isolate = script_state->GetIsolate();
v8::TryCatch block(isolate);
v8::Local<v8::Value> property_value;
if (!dictionary
->Get(script_state->GetContext(),
V8AtomicString(isolate, property_name))
.ToLocal(&property_value)) {
exception_state.RethrowV8Exception(block.Exception());
return false;
}
return property_value->ToBoolean(isolate)->Value();
}
// PipeToEngine implements PipeTo(). All standard steps in this class come from
// https://streams.spec.whatwg.org/#readable-stream-pipe-to
//
// This implementation is simple but suboptimal because it uses V8 promises to
// drive its asynchronous state machine, allocating a lot of temporary V8
// objects as a result.
//
// TODO(ricea): Create internal versions of ReadableStreamDefaultReader::Read()
// and WritableStreamDefaultWriter::Write() to bypass promise creation and so
// reduce the number of allocations on the hot path.
class ReadableStream::PipeToEngine final
: public GarbageCollected<PipeToEngine> {
public:
PipeToEngine(ScriptState* script_state, PipeOptions* pipe_options)
: script_state_(script_state), pipe_options_(pipe_options) {}
// This is the main entrypoint for ReadableStreamPipeTo().
ScriptPromise Start(ReadableStream* readable, WritableStream* destination) {
// 1. Assert: ! IsReadableStream(source) is true.
DCHECK(readable);
// 2. Assert: ! IsWritableStream(dest) is true.
DCHECK(destination);
// Not relevant to C++ implementation:
// 3. Assert: Type(preventClose) is Boolean, Type(preventAbort) is Boolean,
// and Type(preventCancel) is Boolean.
// TODO(ricea): Implement |signal|.
// 4. Assert: signal is undefined or signal is an instance of the
// AbortSignal interface.
// 5. Assert: ! IsReadableStreamLocked(source) is false.
DCHECK(!ReadableStream::IsLocked(readable));
// 6. Assert: ! IsWritableStreamLocked(dest) is false.
DCHECK(!WritableStream::IsLocked(destination));
auto* isolate = script_state_->GetIsolate();
ExceptionState exception_state(isolate, ExceptionState::kUnknownContext, "",
"");
// 7. If !
// IsReadableByteStreamController(source.[[readableStreamController]]) is
// true, let reader be either ! AcquireReadableStreamBYOBReader(source)
// or ! AcquireReadableStreamDefaultReader(source), at the user agent’s
// discretion.
// 8. Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
reader_ = ReadableStream::AcquireDefaultReader(script_state_, readable,
false, exception_state);
DCHECK(!exception_state.HadException());
// 9. Let writer be ! AcquireWritableStreamDefaultWriter(dest).
writer_ = WritableStream::AcquireDefaultWriter(script_state_, destination,
exception_state);
DCHECK(!exception_state.HadException());
// 10. Let shuttingDown be false.
DCHECK(!is_shutting_down_);
// 11. Let promise be a new promise.
promise_ = MakeGarbageCollected<StreamPromiseResolver>(script_state_);
// 12. If signal is not undefined,
if (auto* signal = pipe_options_->Signal()) {
// b. If signal’s aborted flag is set, perform abortAlgorithm and
// return promise.
if (signal->aborted()) {
AbortAlgorithm();
return promise_->GetScriptPromise(script_state_);
}
// c. Add abortAlgorithm to signal.
signal->AddAlgorithm(
WTF::Bind(&PipeToEngine::AbortAlgorithm, WrapWeakPersistent(this)));
}
// 13. In parallel ...
// The rest of the algorithm is described in terms of a series of
// constraints rather than as explicit steps.
if (CheckInitialState()) {
// Need to detect closing and error when we are not reading. This
// corresponds to the following conditions from the standard:
// 1. Errors must be propagated forward: if source.[[state]] is or
// becomes "errored", ...
// and
// 3. Closing must be propagated forward: if source.[[state]] is or
// becomes "closed", ...
ThenPromise(reader_->ClosedPromise()->V8Promise(isolate),
&PipeToEngine::OnReaderClosed, &PipeToEngine::ReadableError);
// Need to detect error when we are not writing. This corresponds to this
// condition from the standard:
// 2. Errors must be propagated backward: if dest.[[state]] is or
// becomes "errored", ...
// We do not need to detect closure of the writable end of the pipe,
// because we have it locked and so it can only be closed by us.
ThenPromise(writer_->ClosedPromise()->V8Promise(isolate), nullptr,
&PipeToEngine::WritableError);
// Start the main read / write loop.
HandleNextEvent(Undefined());
}
// 14. Return promise.
return promise_->GetScriptPromise(script_state_);
}
void Trace(Visitor* visitor) const {
visitor->Trace(script_state_);
visitor->Trace(pipe_options_);
visitor->Trace(reader_);
visitor->Trace(writer_);
visitor->Trace(promise_);
visitor->Trace(last_write_);
visitor->Trace(shutdown_error_);
}
private:
// The implementation uses method pointers to maximise code reuse.
// |Action| represents an action that can be passed to the "Shutdown with an
// action" operation. Each Action is implemented as a method which delegates
// to some abstract operation, inferring the arguments from the state of
// |this|.
using Action = v8::Local<v8::Promise> (PipeToEngine::*)();
// This implementation uses ThenPromise() 7 times. Instead of creating a dozen
// separate subclasses of ScriptFunction, we use a single implementation and
// pass a method pointer at runtime to control the behaviour. Most
// PromiseReaction methods don't need to return a value, but because some do,
// the rest have to return undefined so that they can have the same method
// signature. Similarly, many of the methods ignore the argument that is
// passed to them.
using PromiseReaction =
v8::Local<v8::Value> (PipeToEngine::*)(v8::Local<v8::Value>);
class WrappedPromiseReaction final : public PromiseHandlerWithValue {
public:
WrappedPromiseReaction(ScriptState* script_state,
PipeToEngine* instance,
PromiseReaction method)
: PromiseHandlerWithValue(script_state),
instance_(instance),
method_(method) {}
v8::Local<v8::Value> CallWithLocal(v8::Local<v8::Value> value) override {
return (instance_->*method_)(value);
}
void Trace(Visitor* visitor) const override {
visitor->Trace(instance_);
ScriptFunction::Trace(visitor);
}
private:
Member<PipeToEngine> instance_;
PromiseReaction method_;
};
// Checks the state of the streams and executes the shutdown handlers if
// necessary. Returns true if piping can continue.
bool CheckInitialState() {
auto* isolate = script_state_->GetIsolate();
const auto state = Readable()->state_;
// Both streams can be errored or closed. To perform the right action the
// order of the checks must match the standard: "the following conditions
// must be applied in order." This method only checks the initial state;
// detection of state changes elsewhere is done through checking promise
// reactions.
// a. Errors must be propagated forward: if source.[[state]] is or
// becomes "errored",
if (state == kErrored) {
ReadableError(Readable()->GetStoredError(isolate));
return false;
}
// 2. Errors must be propagated backward: if dest.[[state]] is or becomes
// "errored",
if (Destination()->IsErrored()) {
WritableError(Destination()->GetStoredError(isolate));
return false;
}
// 3. Closing must be propagated forward: if source.[[state]] is or
// becomes "closed", then
if (state == kClosed) {
ReadableClosed();
return false;
}
// 4. Closing must be propagated backward: if !
// WritableStreamCloseQueuedOrInFlight(dest) is true or dest.[[state]]
// is "closed",
if (Destination()->IsClosingOrClosed()) {
WritableStartedClosed();
return false;
}
return true;
}
void AbortAlgorithm() {
// a. Let abortAlgorithm be the following steps:
// i. Let error be a new "AbortError" DOMException.
v8::Local<v8::Value> error = V8ThrowDOMException::CreateOrEmpty(
script_state_->GetIsolate(), DOMExceptionCode::kAbortError,
"Pipe aborted.");
// Steps ii. to iv. are implemented in AbortAlgorithmAction.
// v. Shutdown with an action consisting of getting a promise to wait for
// all of the actions in actions, and with error.
ShutdownWithAction(&PipeToEngine::AbortAlgorithmAction, error);
}
v8::Local<v8::Promise> AbortAlgorithmAction() {
v8::Local<v8::Value> error =
shutdown_error_.NewLocal(script_state_->GetIsolate());
// ii. Let actions be an empty ordered set.
HeapVector<ScriptPromise> actions;
// This method runs later than the equivalent steps in the standard. This
// means that it is safe to do the checks of the state of the destination
// and source synchronously, simplifying the logic.
// iii. If preventAbort is false, append the following action to actions:
// 1. If dest.[[state]] is "writable", return !
// WritableStreamAbort(dest, error).
// 2. Otherwise, return a promise resolved with undefined.
if (!pipe_options_->PreventAbort() && Destination()->IsWritable()) {
actions.push_back(ScriptPromise(
script_state_,
WritableStream::Abort(script_state_, Destination(), error)));
}
// iv. If preventCancel is false, append the following action action to
// actions:
// 1. If source.[[state]] is "readable", return !
// ReadableStreamCancel(source, error).
// 2. Otherwise, return a promise resolved with undefined.
if (!pipe_options_->PreventCancel() && IsReadable(Readable())) {
actions.push_back(ScriptPromise(
script_state_,
ReadableStream::Cancel(script_state_, Readable(), error)));
}
return ScriptPromise::All(script_state_, actions)
.V8Value()
.As<v8::Promise>();
}
// HandleNextEvent() has an unused argument and return value because it is a
// PromiseReaction. HandleNextEvent() and ReadFulfilled() call each other
// asynchronously in a loop until the pipe completes.
v8::Local<v8::Value> HandleNextEvent(v8::Local<v8::Value>) {
DCHECK(!is_reading_);
if (is_shutting_down_) {
return Undefined();
}
base::Optional<double> desired_size = writer_->GetDesiredSizeInternal();
if (!desired_size.has_value()) {
// This can happen if abort() is queued but not yet started when
// pipeTo() is called. In that case [[storedError]] is not set yet, and
// we need to wait until it is before we can cancel the pipe. Once
// [[storedError]] has been set, the rejection handler set on the writer
// closed promise above will detect it, so all we need to do here is
// nothing.
return Undefined();
}
if (desired_size.value() <= 0) {
// Need to wait for backpressure to go away.
ThenPromise(
writer_->ReadyPromise()->V8Promise(script_state_->GetIsolate()),
&PipeToEngine::HandleNextEvent, &PipeToEngine::WritableError);
return Undefined();
}
is_reading_ = true;
ThenPromise(ReadableStreamDefaultReader::Read(script_state_, reader_)
->V8Promise(script_state_->GetIsolate()),
&PipeToEngine::ReadFulfilled, &PipeToEngine::ReadRejected);
return Undefined();
}
v8::Local<v8::Value> ReadFulfilled(v8::Local<v8::Value> result) {
is_reading_ = false;
DCHECK(result->IsObject());
auto* isolate = script_state_->GetIsolate();
v8::Local<v8::Value> value;
bool done = false;
bool unpack_succeeded =
V8UnpackIteratorResult(script_state_, result.As<v8::Object>(), &done)
.ToLocal(&value);
DCHECK(unpack_succeeded);
if (done) {
ReadableClosed();
return Undefined();
}
const auto write =
WritableStreamDefaultWriter::Write(script_state_, writer_, value);
last_write_.Set(isolate, write);
ThenPromise(write, nullptr, &PipeToEngine::WritableError);
HandleNextEvent(Undefined());
return Undefined();
}
v8::Local<v8::Value> ReadRejected(v8::Local<v8::Value>) {
is_reading_ = false;
ReadableError(Readable()->GetStoredError(script_state_->GetIsolate()));
return Undefined();
}
// If read() is in progress, then wait for it to tell us that the stream is
// closed so that we write all the data before shutdown.
v8::Local<v8::Value> OnReaderClosed(v8::Local<v8::Value>) {
if (!is_reading_) {
ReadableClosed();
}
return Undefined();
}
// 1. Errors must be propagated forward: if source.[[state]] is or
// becomes "errored", then
v8::Local<v8::Value> ReadableError(v8::Local<v8::Value> error) {
// This function can be called during shutdown when the lock is released.
// Exit early in that case.
if (is_shutting_down_) {
return Undefined();
}
// a. If preventAbort is false, shutdown with an action of !
// WritableStreamAbort(dest, source.[[storedError]]) and with
// source.[[storedError]].
DCHECK(error->SameValue(
Readable()->GetStoredError(script_state_->GetIsolate())));
if (!pipe_options_->PreventAbort()) {
ShutdownWithAction(&PipeToEngine::WritableStreamAbortAction, error);
} else {
// b. Otherwise, shutdown with source.[[storedError]].
Shutdown(error);
}
return Undefined();
}
// 2. Errors must be propagated backward: if dest.[[state]] is or becomes
// "errored", then
v8::Local<v8::Value> WritableError(v8::Local<v8::Value> error) {
// This function can be called during shutdown when the lock is released.
// Exit early in that case.
if (is_shutting_down_) {
return Undefined();
}
// a. If preventCancel is false, shutdown with an action of !
// ReadableStreamCancel(source, dest.[[storedError]]) and with
// dest.[[storedError]].
DCHECK(error->SameValue(
Destination()->GetStoredError(script_state_->GetIsolate())));
if (!pipe_options_->PreventCancel()) {
ShutdownWithAction(&PipeToEngine::ReadableStreamCancelAction, error);
} else {
// b. Otherwise, shutdown with dest.[[storedError]].
Shutdown(error);
}
return Undefined();
}
// 3. Closing must be propagated forward: if source.[[state]] is or
// becomes "closed", then
void ReadableClosed() {
// a. If preventClose is false, shutdown with an action of !
// WritableStreamDefaultWriterCloseWithErrorPropagation(writer).
if (!pipe_options_->PreventClose()) {
ShutdownWithAction(
&PipeToEngine::
WritableStreamDefaultWriterCloseWithErrorPropagationAction,
v8::MaybeLocal<v8::Value>());
} else {
// b. Otherwise, shutdown.
Shutdown(v8::MaybeLocal<v8::Value>());
}
}
// 4. Closing must be propagated backward: if !
// WritableStreamCloseQueuedOrInFlight(dest) is true or dest.[[state]] is
// "closed", then
void WritableStartedClosed() {
// a. Assert: no chunks have been read or written.
// This is trivially true because this method is only called from
// CheckInitialState().
// b. Let destClosed be a new TypeError.
const auto dest_closed = v8::Exception::TypeError(
V8String(script_state_->GetIsolate(), "Destination stream closed"));
// c. If preventCancel is false, shutdown with an action of !
// ReadableStreamCancel(source, destClosed) and with destClosed.
if (!pipe_options_->PreventCancel()) {
ShutdownWithAction(&PipeToEngine::ReadableStreamCancelAction,
dest_closed);
} else {
// d. Otherwise, shutdown with destClosed.
Shutdown(dest_closed);
}
}
// * Shutdown with an action: if any of the above requirements ask to shutdown
// with an action |action|, optionally with an error |originalError|, then:
void ShutdownWithAction(Action action,
v8::MaybeLocal<v8::Value> original_error) {
// a. If shuttingDown is true, abort these substeps.
if (is_shutting_down_) {
return;
}
// b. Set shuttingDown to true.
is_shutting_down_ = true;
// Store the action in case we need to call it asynchronously. This is safe
// because the |is_shutting_down_| guard flag ensures that we can only reach
// this assignment once.
shutdown_action_ = action;
// Store |original_error| as |shutdown_error_| if it was supplied.
v8::Local<v8::Value> original_error_local;
if (original_error.ToLocal(&original_error_local)) {
shutdown_error_.Set(script_state_->GetIsolate(), original_error_local);
}
v8::Local<v8::Promise> p;
// c. If dest.[[state]] is "writable" and !
// WritableStreamCloseQueuedOrInFlight(dest) is false,
if (ShouldWriteQueuedChunks()) {
// i. If any chunks have been read but not yet written, write them to
// dest.
// ii. Wait until every chunk that has been read has been written
// (i.e. the corresponding promises have settled).
p = ThenPromise(WriteQueuedChunks(), &PipeToEngine::InvokeShutdownAction);
} else {
// d. Let p be the result of performing action.
p = InvokeShutdownAction();
}
// e. Upon fulfillment of p, finalize, passing along originalError if it
// was given.
// f. Upon rejection of p with reason newError, finalize with newError.
ThenPromise(p, &PipeToEngine::FinalizeWithOriginalErrorIfSet,
&PipeToEngine::FinalizeWithNewError);
}
// * Shutdown: if any of the above requirements or steps ask to shutdown,
// optionally with an error error, then:
void Shutdown(v8::MaybeLocal<v8::Value> error_maybe) {
// a. If shuttingDown is true, abort these substeps.
if (is_shutting_down_) {
return;
}
// b. Set shuttingDown to true.
is_shutting_down_ = true;
// c. If dest.[[state]] is "writable" and !
// WritableStreamCloseQueuedOrInFlight(dest) is false,
if (ShouldWriteQueuedChunks()) {
// Need to stash the value of |error_maybe| since we are calling
// Finalize() asynchronously.
v8::Local<v8::Value> error;
if (error_maybe.ToLocal(&error)) {
shutdown_error_.Set(script_state_->GetIsolate(), error);
}
// i. If any chunks have been read but not yet written, write them to
// dest.
// ii. Wait until every chunk that has been read has been written
// (i.e. the corresponding promises have settled).
// d. Finalize, passing along error if it was given.
ThenPromise(WriteQueuedChunks(),
&PipeToEngine::FinalizeWithOriginalErrorIfSet);
} else {
// d. Finalize, passing along error if it was given.
Finalize(error_maybe);
}
}
// Calls Finalize(), using the stored shutdown error rather than the value
// that was passed.
v8::Local<v8::Value> FinalizeWithOriginalErrorIfSet(v8::Local<v8::Value>) {
v8::MaybeLocal<v8::Value> error_maybe;
if (!shutdown_error_.IsEmpty()) {
error_maybe = shutdown_error_.NewLocal(script_state_->GetIsolate());
}
Finalize(error_maybe);
return Undefined();
}
// Calls Finalize(), using the value that was passed as the error.
v8::Local<v8::Value> FinalizeWithNewError(v8::Local<v8::Value> new_error) {
Finalize(new_error);
return Undefined();
}
// * Finalize: both forms of shutdown will eventually ask to finalize,
// optionally with an error error, which means to perform the following
// steps:
void Finalize(v8::MaybeLocal<v8::Value> error_maybe) {
// a. Perform ! WritableStreamDefaultWriterRelease(writer).
WritableStreamDefaultWriter::Release(script_state_, writer_);
// b. Perform ! ReadableStreamReaderGenericRelease(reader).
ReadableStreamGenericReader::GenericRelease(script_state_, reader_);
// TODO(ricea): Implement signal.
// c. If signal is not undefined, remove abortAlgorithm from signal.
v8::Local<v8::Value> error;
if (error_maybe.ToLocal(&error)) {
// d. If error was given, reject promise with error.
promise_->Reject(script_state_, error);
} else {
// e. Otherwise, resolve promise with undefined.
promise_->ResolveWithUndefined(script_state_);
}
}
bool ShouldWriteQueuedChunks() const {
// "If dest.[[state]] is "writable" and !
// WritableStreamCloseQueuedOrInFlight(dest) is false"
return Destination()->IsWritable() &&
!WritableStream::CloseQueuedOrInFlight(Destination());
}
v8::Local<v8::Promise> WriteQueuedChunks() {
if (!last_write_.IsEmpty()) {
// "Wait until every chunk that has been read has been written (i.e.
// the corresponding promises have settled)"
// This implies that we behave the same whether the promise fulfills or
// rejects. IgnoreErrors() will convert a rejection into a successful
// resolution.
return ThenPromise(last_write_.NewLocal(script_state_->GetIsolate()),
nullptr, &PipeToEngine::IgnoreErrors);
}
return PromiseResolveWithUndefined(script_state_);
}
v8::Local<v8::Value> IgnoreErrors(v8::Local<v8::Value>) {
return Undefined();
}
// InvokeShutdownAction(), version for calling directly.
v8::Local<v8::Promise> InvokeShutdownAction() {
return (this->*shutdown_action_)();
}
// InvokeShutdownAction(), version for use as a PromiseReaction.
v8::Local<v8::Value> InvokeShutdownAction(v8::Local<v8::Value>) {
return InvokeShutdownAction();
}
v8::Local<v8::Value> ShutdownError() const {
DCHECK(!shutdown_error_.IsEmpty());
return shutdown_error_.NewLocal(script_state_->GetIsolate());
}
v8::Local<v8::Promise> WritableStreamAbortAction() {
return WritableStream::Abort(script_state_, Destination(), ShutdownError());
}
v8::Local<v8::Promise> ReadableStreamCancelAction() {
return ReadableStream::Cancel(script_state_, Readable(), ShutdownError());
}
v8::Local<v8::Promise>
WritableStreamDefaultWriterCloseWithErrorPropagationAction() {
return WritableStreamDefaultWriter::CloseWithErrorPropagation(script_state_,
writer_);
}
// Reduces the visual noise when we are returning an undefined value.
v8::Local<v8::Value> Undefined() {
return v8::Undefined(script_state_->GetIsolate());
}
WritableStream* Destination() { return writer_->OwnerWritableStream(); }
const WritableStream* Destination() const {
return writer_->OwnerWritableStream();
}
ReadableStream* Readable() { return reader_->owner_readable_stream_; }
// Performs promise.then(on_fulfilled, on_rejected). It behaves like
// StreamPromiseThen(). Only the types are different.
v8::Local<v8::Promise> ThenPromise(v8::Local<v8::Promise> promise,
PromiseReaction on_fulfilled,
PromiseReaction on_rejected = nullptr) {
return StreamThenPromise(
script_state_->GetContext(), promise,
on_fulfilled ? MakeGarbageCollected<WrappedPromiseReaction>(
script_state_, this, on_fulfilled)
: nullptr,
on_rejected ? MakeGarbageCollected<WrappedPromiseReaction>(
script_state_, this, on_rejected)
: nullptr);
}
Member<ScriptState> script_state_;
Member<PipeOptions> pipe_options_;
Member<ReadableStreamDefaultReader> reader_;
Member<WritableStreamDefaultWriter> writer_;
Member<StreamPromiseResolver> promise_;
TraceWrapperV8Reference<v8::Promise> last_write_;
Action shutdown_action_;
TraceWrapperV8Reference<v8::Value> shutdown_error_;
bool is_shutting_down_ = false;
bool is_reading_ = false;
DISALLOW_COPY_AND_ASSIGN(PipeToEngine);
};
class ReadableStream::TeeEngine final : public GarbageCollected<TeeEngine> {
public:
TeeEngine() = default;
// Create the streams and start copying data.
void Start(ScriptState*, ReadableStream*, ExceptionState&);
// Branch1() and Branch2() are null until Start() is called.
ReadableStream* Branch1() const { return branch_[0]; }
ReadableStream* Branch2() const { return branch_[1]; }
void Trace(Visitor* visitor) const {
visitor->Trace(stream_);
visitor->Trace(reader_);
visitor->Trace(reason_[0]);
visitor->Trace(reason_[1]);
visitor->Trace(branch_[0]);
visitor->Trace(branch_[1]);
visitor->Trace(controller_[0]);
visitor->Trace(controller_[1]);
visitor->Trace(cancel_promise_);
}
private:
class PullAlgorithm;
class CancelAlgorithm;
Member<ReadableStream> stream_;
Member<ReadableStreamDefaultReader> reader_;
Member<StreamPromiseResolver> cancel_promise_;
bool closed_ = false;
// The standard contains a number of pairs of variables with one for each
// stream. These are implemented as arrays here. While they are 1-indexed in
// the standard, they are 0-indexed here; ie. "canceled_[0]" here corresponds
// to "canceled1" in the standard.
bool canceled_[2] = {false, false};
TraceWrapperV8Reference<v8::Value> reason_[2];
Member<ReadableStream> branch_[2];
Member<ReadableStreamDefaultController> controller_[2];
DISALLOW_COPY_AND_ASSIGN(TeeEngine);
};
class ReadableStream::TeeEngine::PullAlgorithm final : public StreamAlgorithm {
public:
explicit PullAlgorithm(TeeEngine* engine) : engine_(engine) {}
v8::Local<v8::Promise> Run(ScriptState* script_state,
int,
v8::Local<v8::Value>[]) override {
// https://streams.spec.whatwg.org/#readable-stream-tee
// 12. Let pullAlgorithm be the following steps:
// a. Return the result of transforming ! ReadableStreamDefaultReaderRead(
// reader) with a fulfillment handler which takes the argument result
// and performs the following steps:
return StreamThenPromise(
script_state->GetContext(),
ReadableStreamDefaultReader::Read(script_state, engine_->reader_)
->V8Promise(script_state->GetIsolate()),
MakeGarbageCollected<ResolveFunction>(script_state, engine_));
}
void Trace(Visitor* visitor) const override {
visitor->Trace(engine_);
StreamAlgorithm::Trace(visitor);
}
private:
class ResolveFunction final : public PromiseHandler {
public:
ResolveFunction(ScriptState* script_state, TeeEngine* engine)
: PromiseHandler(script_state), engine_(engine) {}
void CallWithLocal(v8::Local<v8::Value> result) override {
// i. If closed is true, return.
if (engine_->closed_) {
return;
}
// ii. Assert: Type(result) is Object.
DCHECK(result->IsObject());
auto* script_state = GetScriptState();
auto* isolate = script_state->GetIsolate();
// iii. Let done be ! Get(result, "done").
// vi. Let value be ! Get(result, "value").
// The precise order of operations is not important here, because |result|
// is guaranteed to have own properties of "value" and "done" and so the
// "Get" operations cannot have side-effects.
v8::Local<v8::Value> value;
bool done = false;
bool unpack_succeeded =
V8UnpackIteratorResult(script_state, result.As<v8::Object>(), &done)
.ToLocal(&value);
CHECK(unpack_succeeded);
// vi. Assert: Type(done) is Boolean.
// v. If done is true,
if (done) {
// 1. If canceled1 is false,
// a. Perform ! ReadableStreamDefaultControllerClose(branch1.
// [[readableStreamController]]).
// 2. If canceled2 is false,
// b. Perform ! ReadableStreamDefaultControllerClose(branch2.
// [[readableStreamController]]).
for (int branch = 0; branch < 2; ++branch) {
if (!engine_->canceled_[branch] &&
ReadableStreamDefaultController::CanCloseOrEnqueue(
engine_->controller_[branch])) {
ReadableStreamDefaultController::Close(
script_state, engine_->controller_[branch]);
}
}
// TODO(ricea): Implement https://github.com/whatwg/streams/pull/1045 so
// this step can be numbered correctly.
// Resolve |cancelPromise| with undefined.
engine_->cancel_promise_->ResolveWithUndefined(script_state);
// 3. Set closed to true.
engine_->closed_ = true;
// 4. Return.
return;
}
ExceptionState exception_state(isolate, ExceptionState::kUnknownContext,
"", "");
// vii. Let value1 and value2 be value.
// viii. If canceled2 is false and cloneForBranch2 is true, set value2 to
// ? StructuredDeserialize(? StructuredSerialize(value2), the
// current Realm Record).
// TODO(ricea): Support cloneForBranch2
// ix. If canceled1 is false, perform ?
// ReadableStreamDefaultControllerEnqueue(branch1.
// [[readableStreamController]], value1).
// x. If canceled2 is false, perform ?
// ReadableStreamDefaultControllerEnqueue(branch2.
// [[readableStreamController]], value2).
for (int branch = 0; branch < 2; ++branch) {
if (!engine_->canceled_[branch] &&
ReadableStreamDefaultController::CanCloseOrEnqueue(
engine_->controller_[branch])) {
ReadableStreamDefaultController::Enqueue(script_state,
engine_->controller_[branch],
value, exception_state);
if (exception_state.HadException()) {
// Instead of returning a rejection, which is inconvenient here,
// call ControllerError(). The only difference this makes is that
// it happens synchronously, but that should not be observable.
ReadableStreamDefaultController::Error(
script_state, engine_->controller_[branch],
exception_state.GetException());
exception_state.ClearException();
return;
}
}
}
}
void Trace(Visitor* visitor) const override {
visitor->Trace(engine_);
PromiseHandler::Trace(visitor);
}
private:
Member<TeeEngine> engine_;
};
Member<TeeEngine> engine_;
};
class ReadableStream::TeeEngine::CancelAlgorithm final
: public StreamAlgorithm {
public:
CancelAlgorithm(TeeEngine* engine, int branch)
: engine_(engine), branch_(branch) {
DCHECK(branch == 0 || branch == 1);
}
v8::Local<v8::Promise> Run(ScriptState* script_state,
int argc,
v8::Local<v8::Value> argv[]) override {
// https://streams.spec.whatwg.org/#readable-stream-tee
// This implements both cancel1Algorithm and cancel2Algorithm as they are
// identical except for the index they operate on. Standard comments are
// from cancel1Algorithm.
// 13. Let cancel1Algorithm be the following steps, taking a reason
// argument:
auto* isolate = script_state->GetIsolate();
// a. Set canceled1 to true.
engine_->canceled_[branch_] = true;
DCHECK_EQ(argc, 1);
// b. Set reason1 to reason.
engine_->reason_[branch_].Set(isolate, argv[0]);
const int other_branch = 1 - branch_;
// c. If canceled2 is true,
if (engine_->canceled_[other_branch]) {
// i. Let compositeReason be ! CreateArrayFromList(« reason1, reason2 »).
v8::Local<v8::Value> reason[] = {engine_->reason_[0].NewLocal(isolate),
engine_->reason_[1].NewLocal(isolate)};
v8::Local<v8::Value> composite_reason =
v8::Array::New(script_state->GetIsolate(), reason, 2);
// ii. Let cancelResult be ! ReadableStreamCancel(stream,
// compositeReason).
auto cancel_result = ReadableStream::Cancel(
script_state, engine_->stream_, composite_reason);
// iii. Resolve cancelPromise with cancelResult.
engine_->cancel_promise_->Resolve(script_state, cancel_result);
}
return engine_->cancel_promise_->V8Promise(isolate);
}
void Trace(Visitor* visitor) const override {
visitor->Trace(engine_);
StreamAlgorithm::Trace(visitor);
}
private:
Member<TeeEngine> engine_;
const int branch_;
};
void ReadableStream::TeeEngine::Start(ScriptState* script_state,
ReadableStream* stream,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#readable-stream-tee
// 1. Assert: ! IsReadableStream(stream) is true.
DCHECK(stream);
// TODO(ricea): 2. Assert: Type(cloneForBranch2) is Boolean.
stream_ = stream;
// 3. Let reader be ? AcquireReadableStreamDefaultReader(stream).
reader_ = ReadableStream::AcquireDefaultReader(script_state, stream, false,
exception_state);
if (exception_state.HadException()) {
return;
}
// These steps are performed by the constructor:
// 4. Let closed be false.
DCHECK(!closed_);
// 5. Let canceled1 be false.
DCHECK(!canceled_[0]);
// 6. Let canceled2 be false.
DCHECK(!canceled_[1]);
// 7. Let reason1 be undefined.
DCHECK(reason_[0].IsEmpty());
// 8. Let reason2 be undefined.
DCHECK(reason_[1].IsEmpty());
// 9. Let branch1 be undefined.
DCHECK(!branch_[0]);
// 10. Let branch2 be undefined.
DCHECK(!branch_[1]);
// 11. Let cancelPromise be a new promise.
cancel_promise_ = MakeGarbageCollected<StreamPromiseResolver>(script_state);
// 12. Let pullAlgorithm be the following steps:
// (steps are defined in PullAlgorithm::Run()).
auto* pull_algorithm = MakeGarbageCollected<PullAlgorithm>(this);
// 13. Let cancel1Algorithm be the following steps, taking a reason argument:
// (see CancelAlgorithm::Run()).
auto* cancel1_algorithm = MakeGarbageCollected<CancelAlgorithm>(this, 0);
// 14. Let cancel2Algorithm be the following steps, taking a reason argument:
// (both algorithms share a single implementation).
auto* cancel2_algorithm = MakeGarbageCollected<CancelAlgorithm>(this, 1);
// 15. Let startAlgorithm be an algorithm that returns undefined.
auto* start_algorithm = CreateTrivialStartAlgorithm();
auto* size_algorithm = CreateDefaultSizeAlgorithm();
// 16. Set branch1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm,
// cancel1Algorithm).
branch_[0] = ReadableStream::Create(script_state, start_algorithm,
pull_algorithm, cancel1_algorithm, 1.0,
size_algorithm, exception_state);
if (exception_state.HadException()) {
return;
}
// 17. Set branch2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm,
// cancel2Algorithm).
branch_[1] = ReadableStream::Create(script_state, start_algorithm,
pull_algorithm, cancel2_algorithm, 1.0,
size_algorithm, exception_state);
if (exception_state.HadException()) {
return;
}
for (int branch = 0; branch < 2; ++branch) {
ReadableStreamController* controller =
branch_[branch]->readable_stream_controller_;
// We just created the branches above. It is obvious that the controllers
// are default controllers.
controller_[branch] = To<ReadableStreamDefaultController>(controller);
}
class RejectFunction final : public PromiseHandler {
public:
RejectFunction(ScriptState* script_state, TeeEngine* engine)
: PromiseHandler(script_state), engine_(engine) {}
void CallWithLocal(v8::Local<v8::Value> r) override {
// 18. Upon rejection of reader.[[closedPromise]] with reason r,
// a. Perform ! ReadableStreamDefaultControllerError(branch1.
// [[readableStreamController]], r).
ReadableStreamDefaultController::Error(GetScriptState(),
engine_->controller_[0], r);
// b. Perform ! ReadableStreamDefaultControllerError(branch2.
// [[readableStreamController]], r).
ReadableStreamDefaultController::Error(GetScriptState(),
engine_->controller_[1], r);
// TODO(ricea): Implement https://github.com/whatwg/streams/pull/1045 so
// this step can be numbered correctly.
// Resolve |cancelPromise| with undefined.
engine_->cancel_promise_->ResolveWithUndefined(GetScriptState());
}
void Trace(Visitor* visitor) const override {
visitor->Trace(engine_);
PromiseHandler::Trace(visitor);
}
private:
Member<TeeEngine> engine_;
};
// 18. Upon rejection of reader.[[closedPromise]] with reason r,
StreamThenPromise(
script_state->GetContext(),
reader_->closed_promise_->V8Promise(script_state->GetIsolate()), nullptr,
MakeGarbageCollected<RejectFunction>(script_state, this));
// Step "19. Return « branch1, branch2 »."
// is performed by the caller.
}
ReadableStream* ReadableStream::Create(ScriptState* script_state,
ExceptionState& exception_state) {
return Create(script_state,
ScriptValue(script_state->GetIsolate(),
v8::Undefined(script_state->GetIsolate())),
ScriptValue(script_state->GetIsolate(),
v8::Undefined(script_state->GetIsolate())),
exception_state);
}
ReadableStream* ReadableStream::Create(ScriptState* script_state,
ScriptValue underlying_source,
ExceptionState& exception_state) {
return Create(script_state, underlying_source,
ScriptValue(script_state->GetIsolate(),
v8::Undefined(script_state->GetIsolate())),
exception_state);
}
ReadableStream* ReadableStream::Create(ScriptState* script_state,
ScriptValue underlying_source,
ScriptValue strategy,
ExceptionState& exception_state) {
auto* stream = MakeGarbageCollected<ReadableStream>();
stream->InitInternal(script_state, underlying_source, strategy, false,
exception_state);
if (exception_state.HadException()) {
return nullptr;
}
return stream;
}
ReadableStream* ReadableStream::CreateWithCountQueueingStrategy(
ScriptState* script_state,
UnderlyingSourceBase* underlying_source,
size_t high_water_mark) {
return CreateWithCountQueueingStrategy(script_state, underlying_source,
high_water_mark,
/*optimizer=*/nullptr);
}
ReadableStream* ReadableStream::CreateWithCountQueueingStrategy(
ScriptState* script_state,
UnderlyingSourceBase* underlying_source,
size_t high_water_mark,
std::unique_ptr<ReadableStreamTransferringOptimizer> optimizer) {
auto* isolate = script_state->GetIsolate();
// It's safer to use a workalike rather than a real CountQueuingStrategy
// object. We use the default "size" function as it is implemented in C++ and
// so much faster than calling into JavaScript. Since the create object has a
// null prototype, there is no danger of us finding some other "size" function
// via the prototype chain.
v8::Local<v8::Name> high_water_mark_string =
V8AtomicString(isolate, "highWaterMark");
v8::Local<v8::Value> high_water_mark_value =
v8::Number::New(isolate, high_water_mark);
auto strategy_object =
v8::Object::New(isolate, v8::Null(isolate), &high_water_mark_string,
&high_water_mark_value, 1);
ExceptionState exception_state(script_state->GetIsolate(),
ExceptionState::kConstructionContext,
"ReadableStream");
v8::Local<v8::Value> underlying_source_v8 =
ToV8(underlying_source, script_state);
auto* stream = MakeGarbageCollected<ReadableStream>();
stream->InitInternal(
script_state,
ScriptValue(script_state->GetIsolate(), underlying_source_v8),
ScriptValue(script_state->GetIsolate(), strategy_object), true,
exception_state);
if (exception_state.HadException()) {
exception_state.ClearException();
DLOG(WARNING)
<< "Ignoring an exception in CreateWithCountQueuingStrategy().";
}
stream->transferring_optimizer_ = std::move(optimizer);
return stream;
}
ReadableStream* ReadableStream::Create(ScriptState* script_state,
StreamStartAlgorithm* start_algorithm,
StreamAlgorithm* pull_algorithm,
StreamAlgorithm* cancel_algorithm,
double high_water_mark,
StrategySizeAlgorithm* size_algorithm,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#create-readable-stream
// All arguments are compulsory in this implementation, so the first two steps
// are skipped:
// 1. If highWaterMark was not passed, set it to 1.
// 2. If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
// 3. Assert: ! IsNonNegativeNumber(highWaterMark) is true.
DCHECK_GE(high_water_mark, 0);
// 4. Let stream be ObjectCreate(the original value of ReadableStream's
// prototype property).
auto* stream = MakeGarbageCollected<ReadableStream>();
// 5. Perform ! InitializeReadableStream(stream).
Initialize(stream);
// 6. Let controller be ObjectCreate(the original value of
// ReadableStreamDefaultController's prototype property).
auto* controller = MakeGarbageCollected<ReadableStreamDefaultController>();
// 7. Perform ? SetUpReadableStreamDefaultController(stream, controller,
// startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark,
// sizeAlgorithm).
ReadableStreamDefaultController::SetUp(
script_state, stream, controller, start_algorithm, pull_algorithm,
cancel_algorithm, high_water_mark, size_algorithm, exception_state);
if (exception_state.HadException()) {
return nullptr;
}
// 8. Return stream.
return stream;
}
ReadableStream::ReadableStream() = default;
ReadableStream::~ReadableStream() = default;
bool ReadableStream::locked() const {
// https://streams.spec.whatwg.org/#rs-locked
// 2. Return ! IsReadableStreamLocked(this).
return IsLocked(this);
}
ScriptPromise ReadableStream::cancel(ScriptState* script_state,
ExceptionState& exception_state) {
return cancel(script_state,
ScriptValue(script_state->GetIsolate(),
v8::Undefined(script_state->GetIsolate())),
exception_state);
}
ScriptPromise ReadableStream::cancel(ScriptState* script_state,
ScriptValue reason,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#rs-cancel
// 2. If ! IsReadableStreamLocked(this) is true, return a promise rejected
// with a TypeError exception.
if (IsLocked(this)) {
exception_state.ThrowTypeError("Cannot cancel a locked stream");
return ScriptPromise();
}
// 3. Return ! ReadableStreamCancel(this, reason).
v8::Local<v8::Promise> result = Cancel(script_state, this, reason.V8Value());
return ScriptPromise(script_state, result);
}
void ReadableStream::getReader(
ScriptState* script_state,
ReadableStreamDefaultReaderOrReadableStreamBYOBReader& return_value,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#rs-get-reader
// 1. If options["mode"] does not exist, return ?
// AcquireReadableStreamDefaultReader(this).
return_value.SetReadableStreamDefaultReader(
AcquireDefaultReader(script_state, this, true, exception_state));
}
void ReadableStream::getReader(
ScriptState* script_state,
ReadableStreamGetReaderOptions* options,
ReadableStreamDefaultReaderOrReadableStreamBYOBReader& return_value,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#rs-get-reader
if (options->hasMode()) {
DCHECK_EQ(options->mode(), "byob");
UseCounter::Count(ExecutionContext::From(script_state),
WebFeature::kReadableStreamBYOBReader);
return_value.SetReadableStreamBYOBReader(
AcquireBYOBReader(script_state, this, exception_state));
} else {
getReader(script_state, return_value, exception_state);
}
}
ReadableStreamDefaultReader* ReadableStream::GetDefaultReaderForTesting(
ScriptState* script_state,
ExceptionState& exception_state) {
ReadableStreamDefaultReaderOrReadableStreamBYOBReader return_value;
getReader(script_state, return_value, exception_state);
return return_value.GetAsReadableStreamDefaultReader();
}
ReadableStream* ReadableStream::pipeThrough(ScriptState* script_state,
ReadableWritablePair* transform,
ExceptionState& exception_state) {
return pipeThrough(script_state, transform, StreamPipeOptions::Create(),
exception_state);
}
// https://streams.spec.whatwg.org/#rs-pipe-through
ReadableStream* ReadableStream::pipeThrough(ScriptState* script_state,
ReadableWritablePair* transform,
const StreamPipeOptions* options,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#rs-pipe-through
DCHECK(transform->hasReadable());
ReadableStream* readable_stream = transform->readable();
DCHECK(transform->hasWritable());
WritableStream* writable_stream = transform->writable();
// 1. If ! IsReadableStreamLocked(this) is true, throw a TypeError exception.
if (IsLocked(this)) {
exception_state.ThrowTypeError("Cannot pipe a locked stream");
return nullptr;
}
// 2. If ! IsWritableStreamLocked(transform["writable"]) is true, throw a
// TypeError exception.
if (WritableStream::IsLocked(writable_stream)) {
exception_state.ThrowTypeError("parameter 1's 'writable' is locked");
return nullptr;
}
// 3. Let signal be options["signal"] if it exists, or undefined otherwise.
auto* pipe_options = MakeGarbageCollected<PipeOptions>(options);
// 4. Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
// options["preventClose"], options["preventAbort"],
// options["preventCancel"], signal).
ScriptPromise promise =
PipeTo(script_state, this, writable_stream, pipe_options);
// 5. Set promise.[[PromiseIsHandled]] to true.
promise.MarkAsHandled();
// 6. Return transform["readable"].
return readable_stream;
}
ScriptPromise ReadableStream::pipeTo(ScriptState* script_state,
WritableStream* destination,
ExceptionState& exception_state) {
return pipeTo(script_state, destination, StreamPipeOptions::Create(),
exception_state);
}
ScriptPromise ReadableStream::pipeTo(ScriptState* script_state,
WritableStream* destination,
const StreamPipeOptions* options,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#rs-pipe-to
// 1. If ! IsReadableStreamLocked(this) is true, return a promise rejected
// with a TypeError exception.
if (IsLocked(this)) {
exception_state.ThrowTypeError("Cannot pipe a locked stream");
return ScriptPromise();
}
// 2. If ! IsWritableStreamLocked(destination) is true, return a promise
// rejected with a TypeError exception.
if (WritableStream::IsLocked(destination)) {
exception_state.ThrowTypeError("Cannot pipe to a locked stream");
return ScriptPromise();
}
// 3. Let signal be options["signal"] if it exists, or undefined otherwise.
auto* pipe_options = MakeGarbageCollected<PipeOptions>(options);
// 4. Return ! ReadableStreamPipeTo(this, destination,
// options["preventClose"], options["preventAbort"],
// options["preventCancel"], signal).
return PipeTo(script_state, this, destination, pipe_options);
}
HeapVector<Member<ReadableStream>> ReadableStream::tee(
ScriptState* script_state,
ExceptionState& exception_state) {
return CallTeeAndReturnBranchArray(script_state, this, exception_state);
}
// Unlike in the standard, this is defined as a separate method from the
// constructor. This prevents problems when garbage collection happens
// re-entrantly during construction.
void ReadableStream::InitInternal(ScriptState* script_state,
ScriptValue raw_underlying_source,
ScriptValue raw_strategy,
bool created_by_ua,
ExceptionState& exception_state) {
if (!created_by_ua) {
// TODO(ricea): Move this to IDL once blink::ReadableStreamOperations is
// no longer using the public constructor.
UseCounter::Count(ExecutionContext::From(script_state),
WebFeature::kReadableStreamConstructor);
}
// https://streams.spec.whatwg.org/#rs-constructor
// 1. Perform ! InitializeReadableStream(this).
Initialize(this);
// The next part of this constructor corresponds to the object conversions
// that are implicit in the definition in the standard.
DCHECK(!raw_underlying_source.IsEmpty());
DCHECK(!raw_strategy.IsEmpty());
auto context = script_state->GetContext();
auto* isolate = script_state->GetIsolate();
v8::Local<v8::Object> underlying_source;
ScriptValueToObject(script_state, raw_underlying_source, &underlying_source,
exception_state);
if (exception_state.HadException()) {
return;
}
// 2. Let size be ? GetV(strategy, "size").
// 3. Let highWaterMark be ? GetV(strategy, "highWaterMark").
StrategyUnpacker strategy_unpacker(script_state, raw_strategy,
exception_state);
if (exception_state.HadException()) {
return;
}
// 4. Let type be ? GetV(underlyingSource, "type").
v8::TryCatch try_catch(isolate);
v8::Local<v8::Value> type;
if (!underlying_source->Get(context, V8AtomicString(isolate, "type"))
.ToLocal(&type)) {
exception_state.RethrowV8Exception(try_catch.Exception());
return;
}
if (!type->IsUndefined()) {
// 5. Let typeString be ? ToString(type).
v8::Local<v8::String> type_string;
if (!type->ToString(context).ToLocal(&type_string)) {
exception_state.RethrowV8Exception(try_catch.Exception());
return;
}
// 6. If typeString is "bytes",
if (type_string == V8AtomicString(isolate, "bytes")) {
UseCounter::Count(ExecutionContext::From(script_state),
WebFeature::kReadableStreamWithByteSource);
UnderlyingSource* underlying_source_dict =
NativeValueTraits<UnderlyingSource>::NativeValue(
script_state->GetIsolate(), raw_underlying_source.V8Value(),
exception_state);
if (!strategy_unpacker.IsSizeUndefined()) {
exception_state.ThrowRangeError(
"Cannot create byte stream with size() defined on the strategy");
return;
}
double high_water_mark =
strategy_unpacker.GetHighWaterMark(script_state, 0, exception_state);
if (exception_state.HadException()) {
return;
}
ReadableByteStreamController::SetUpFromUnderlyingSource(
script_state, this, underlying_source, underlying_source_dict,
high_water_mark, exception_state);
return;
}
// 8. Otherwise, throw a RangeError exception.
else {
exception_state.ThrowRangeError("Invalid type is specified");
return;
}
}
// 7. Otherwise, if type is undefined,
// a. Let sizeAlgorithm be ? MakeSizeAlgorithmFromSizeFunction(size).
auto* size_algorithm =
strategy_unpacker.MakeSizeAlgorithm(script_state, exception_state);
if (exception_state.HadException()) {
return;
}
DCHECK(size_algorithm);
// b. If highWaterMark is undefined, let highWaterMark be 1.
// c. Set highWaterMark to ? ValidateAndNormalizeHighWaterMark(
// highWaterMark).
double high_water_mark =
strategy_unpacker.GetHighWaterMark(script_state, 1, exception_state);
if (exception_state.HadException()) {
return;
}
// 4. Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource
// (this, underlyingSource, highWaterMark, sizeAlgorithm).
ReadableStreamDefaultController::SetUpFromUnderlyingSource(
script_state, this, underlying_source, high_water_mark, size_algorithm,
exception_state);
}
//
// Readable stream abstract operations
//
ReadableStreamDefaultReader* ReadableStream::AcquireDefaultReader(
ScriptState* script_state,
ReadableStream* stream,
bool for_author_code,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#acquire-readable-stream-reader
// for_author_code is compulsory in this implementation
// 1. Let reader by a new ReadableStreamDefaultReader.
// 2. Perform ? SetUpReadableStreamReader(reader, stream).
auto* reader = MakeGarbageCollected<ReadableStreamDefaultReader>(
script_state, stream, exception_state);
if (exception_state.HadException()) {
return nullptr;
}
reader->for_author_code_ = for_author_code;
// 3. Return reader.
return reader;
}
ReadableStreamBYOBReader* ReadableStream::AcquireBYOBReader(
ScriptState* script_state,
ReadableStream* stream,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#acquire-readable-stream-byob-reader
// 1. Let reader be a new ReadableStreamBYOBReader.
// 2. Perform ? SetUpBYOBReader(reader, stream).
auto* reader = MakeGarbageCollected<ReadableStreamBYOBReader>(
script_state, stream, exception_state);
if (exception_state.HadException()) {
return nullptr;
}
// 3. Return reader.
return reader;
}
void ReadableStream::Initialize(ReadableStream* stream) {
// Fields are initialised by the constructor, so we only check that they were
// initialised correctly.
// https://streams.spec.whatwg.org/#initialize-readable-stream
// 1. Set stream.[[state]] to "readable".
CHECK_EQ(stream->state_, kReadable);
// 2. Set stream.[[reader]] and stream.[[storedError]] to undefined.
DCHECK(!stream->reader_);
DCHECK(stream->stored_error_.IsEmpty());
// 3. Set stream.[[disturbed]] to false.
DCHECK(!stream->is_disturbed_);
}
// TODO(domenic): cloneForBranch2 argument from spec not supported yet
void ReadableStream::Tee(ScriptState* script_state,
ReadableStream** branch1,
ReadableStream** branch2,
ExceptionState& exception_state) {
auto* engine = MakeGarbageCollected<TeeEngine>();
engine->Start(script_state, this, exception_state);
if (exception_state.HadException()) {
return;
}
// Instead of returning a List like ReadableStreamTee in the standard, the
// branches are returned via output parameters.
*branch1 = engine->Branch1();
*branch2 = engine->Branch2();
}
void ReadableStream::LockAndDisturb(ScriptState* script_state) {
if (reader_) {
return;
}
ReadableStreamGenericReader* reader = GetReaderNotForAuthorCode(script_state);
DCHECK(reader);
is_disturbed_ = true;
}
void ReadableStream::Serialize(ScriptState* script_state,
MessagePort* port,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#rs-transfer
// 1. If ! IsReadableStreamLocked(value) is true, throw a "DataCloneError"
// DOMException.
if (IsLocked(this)) {
exception_state.ThrowTypeError("Cannot transfer a locked stream");
return;
}
// Done by SerializedScriptValue::TransferReadableStream():
// 2. Let port1 be a new MessagePort in the current Realm.
// 3. Let port2 be a new MessagePort in the current Realm.
// 4. Entangle port1 and port2.
// 5. Let writable be a new WritableStream in the current Realm.
// 6. Perform ! SetUpCrossRealmTransformWritable(writable, port1).
auto* writable = CreateCrossRealmTransformWritable(
script_state, port, /*optimizer=*/nullptr, exception_state);
if (exception_state.HadException()) {
return;
}
// 7. Let promise be ! ReadableStreamPipeTo(value, writable, false, false,
// false).
auto promise =
PipeTo(script_state, this, writable, MakeGarbageCollected<PipeOptions>());
// 8. Set promise.[[PromiseIsHandled]] to true.
promise.MarkAsHandled();
// This step is done in a roundabout way by the caller:
// 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2,
// « port2 »).
}
ReadableStream* ReadableStream::Deserialize(
ScriptState* script_state,
MessagePort* port,
std::unique_ptr<ReadableStreamTransferringOptimizer> optimizer,
ExceptionState& exception_state) {
// We need to execute JavaScript to call "Then" on v8::Promises. We will not
// run author code.
v8::Isolate::AllowJavascriptExecutionScope allow_js(
script_state->GetIsolate());
// https://streams.spec.whatwg.org/#rs-transfer
// These steps are done by V8ScriptValueDeserializer::ReadDOMObject().
// 1. Let deserializedRecord be !
// StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
// Realm).
// 2. Let port be deserializedRecord.[[Deserialized]].
// 3. Perform ! SetUpCrossRealmTransformReadable(value, port).
// In the standard |value| contains an uninitialized ReadableStream. In the
// implementation, we create the stream here.
auto* readable = CreateCrossRealmTransformReadable(
script_state, port, std::move(optimizer), exception_state);
if (exception_state.HadException()) {
return nullptr;
}
return readable;
}
ReadableStreamDefaultReader* ReadableStream::GetReaderNotForAuthorCode(
ScriptState* script_state) {
DCHECK(!IsLocked(this));
// Since the stream is not locked, AcquireDefaultReader cannot fail.
NonThrowableExceptionState exception_state(__FILE__, __LINE__);
return AcquireDefaultReader(script_state, this, false, exception_state);
}
ScriptPromise ReadableStream::PipeTo(ScriptState* script_state,
ReadableStream* readable,
WritableStream* destination,
PipeOptions* pipe_options) {
auto* engine = MakeGarbageCollected<PipeToEngine>(script_state, pipe_options);
return engine->Start(readable, destination);
}
v8::Local<v8::Value> ReadableStream::GetStoredError(
v8::Isolate* isolate) const {
return stored_error_.NewLocal(isolate);
}
std::unique_ptr<ReadableStreamTransferringOptimizer>
ReadableStream::TakeTransferringOptimizer() {
return std::move(transferring_optimizer_);
}
void ReadableStream::Trace(Visitor* visitor) const {
visitor->Trace(readable_stream_controller_);
visitor->Trace(reader_);
visitor->Trace(stored_error_);
ScriptWrappable::Trace(visitor);
}
//
// Abstract Operations Used By Controllers
//
void ReadableStream::AddReadIntoRequest(
ScriptState* script_state,
ReadableStream* stream,
ReadableStreamBYOBReader::ReadIntoRequest* readRequest) {
// https://streams.spec.whatwg.org/#readable-stream-add-read-into-request
// 1. Assert: stream.[[reader]] implements ReadableStreamBYOBReader.
DCHECK(stream->reader_->IsBYOBReader());
// 2. Assert: stream.[[state]] is "readable" or "closed".
DCHECK(stream->state_ == kReadable || stream->state_ == kClosed);
// 3. Append readRequest to stream.[[reader]].[[readIntoRequests]].
ReadableStreamGenericReader* reader = stream->reader_;
ReadableStreamBYOBReader* byob_reader = To<ReadableStreamBYOBReader>(reader);
byob_reader->read_into_requests_.push_back(readRequest);
}
StreamPromiseResolver* ReadableStream::AddReadRequest(ScriptState* script_state,
ReadableStream* stream) {
// https://streams.spec.whatwg.org/#readable-stream-add-read-request
// 1. Assert: ! IsReadableStreamDefaultReader(stream.[[reader]]) is true.
ReadableStreamGenericReader* reader = stream->reader_;
ReadableStreamDefaultReader* default_reader =
To<ReadableStreamDefaultReader>(reader);
DCHECK(default_reader);
// 2. Assert: stream.[[state]] is "readable".
CHECK_EQ(stream->state_, kReadable);
// 3. Let promise be a new promise.
auto* promise = MakeGarbageCollected<StreamPromiseResolver>(script_state);
// This implementation stores promises directly in |read_requests_| rather
// than wrapping them in a Record.
// 4. Let readRequest be Record {[[promise]]: promise}.
// 5. Append readRequest as the last element of stream.[[reader]].
// [[readRequests]].
default_reader->read_requests_.push_back(promise);
// 6. Return promise.
return promise;
}
v8::Local<v8::Promise> ReadableStream::Cancel(ScriptState* script_state,
ReadableStream* stream,
v8::Local<v8::Value> reason) {
// https://streams.spec.whatwg.org/#readable-stream-cancel
// 1. Set stream.[[disturbed]] to true.
stream->is_disturbed_ = true;
// 2. If stream.[[state]] is "closed", return a promise resolved with
// undefined.
const auto state = stream->state_;
if (state == kClosed) {
return PromiseResolveWithUndefined(script_state);
}
// 3. If stream.[[state]] is "errored", return a promise rejected with stream.
// [[storedError]].
if (state == kErrored) {
return PromiseReject(script_state,
stream->GetStoredError(script_state->GetIsolate()));
}
// 4. Perform ! ReadableStreamClose(stream).
Close(script_state, stream);
// 5. Let sourceCancelPromise be ! stream.[[readableStreamController]].
// [[CancelSteps]](reason).
v8::Local<v8::Promise> source_cancel_promise =
stream->readable_stream_controller_->CancelSteps(script_state, reason);
class ReturnUndefinedFunction final : public PromiseHandler {
public:
explicit ReturnUndefinedFunction(ScriptState* script_state)
: PromiseHandler(script_state) {}
// The method does nothing; the default value of undefined is returned to
// JavaScript.
void CallWithLocal(v8::Local<v8::Value>) override {}
};
// 6. Return the result of transforming sourceCancelPromise with a
// fulfillment handler that returns undefined.
return StreamThenPromise(
script_state->GetContext(), source_cancel_promise,
MakeGarbageCollected<ReturnUndefinedFunction>(script_state));
}
void ReadableStream::Close(ScriptState* script_state, ReadableStream* stream) {
// https://streams.spec.whatwg.org/#readable-stream-close
// 1. Assert: stream.[[state]] is "readable".
CHECK_EQ(stream->state_, kReadable);
// 2. Set stream.[[state]] to "closed".
stream->state_ = kClosed;
// 3. Let reader be stream.[[reader]].
ReadableStreamGenericReader* reader = stream->reader_;
// 4. If reader is undefined, return.
if (!reader) {
return;
}
// 5. If ! IsReadableStreamDefaultReader(reader) is true,
if (reader->IsDefaultReader()) {
// a. Repeat for each readRequest that is an element of reader.
// [[readRequests]],
HeapDeque<Member<StreamPromiseResolver>> requests;
requests.Swap(To<ReadableStreamDefaultReader>(reader)->read_requests_);
bool for_author_code =
To<ReadableStreamDefaultReader>(reader)->for_author_code_;
for (StreamPromiseResolver* promise : requests) {
// i. Resolve readRequest.[[promise]] with !
// ReadableStreamCreateReadResult(undefined, true, reader.
// [[forAuthorCode]]).
promise->Resolve(
script_state,
CreateReadResult(script_state,
v8::Undefined(script_state->GetIsolate()), true,
for_author_code));
}
// b. Set reader.[[readRequests]] to an empty List.
// This is not required since we've already called Swap().
}
// 6. Resolve reader.[[closedPromise]] with undefined.
reader->closed_promise_->ResolveWithUndefined(script_state);
}
v8::Local<v8::Value> ReadableStream::CreateReadResult(
ScriptState* script_state,
v8::Local<v8::Value> value,
bool done,
bool for_author_code) {
// https://streams.spec.whatwg.org/#readable-stream-create-read-result
auto* isolate = script_state->GetIsolate();
auto context = script_state->GetContext();
auto value_string = V8AtomicString(isolate, "value");
auto done_string = V8AtomicString(isolate, "done");
auto done_value = v8::Boolean::New(isolate, done);
// 1. Let prototype be null.
// 2. If forAuthorCode is true, set prototype to %ObjectPrototype%.
// This implementation doesn't use a |prototype| variable, instead using
// different code paths depending on the value of |for_author_code|.
if (for_author_code) {
// 4. Let obj be ObjectCreate(prototype).
auto obj = v8::Object::New(isolate);
// 5. Perform CreateDataProperty(obj, "value", value).
obj->CreateDataProperty(context, value_string, value).Check();
// 6. Perform CreateDataProperty(obj, "done", done).
obj->CreateDataProperty(context, done_string, done_value).Check();
// 7. Return obj.
return obj;
}
// When |for_author_code| is false, we can perform all the steps in a single
// call to V8.
// 4. Let obj be ObjectCreate(prototype).
// 5. Perform CreateDataProperty(obj, "value", value).
// 6. Perform CreateDataProperty(obj, "done", done).
// 7. Return obj.
// TODO(ricea): Is it possible to use this optimised API in both cases?
v8::Local<v8::Name> names[2] = {value_string, done_string};
v8::Local<v8::Value> values[2] = {value, done_value};
static_assert(base::size(names) == base::size(values),
"names and values arrays must be the same size");
return v8::Object::New(isolate, v8::Null(isolate), names, values,
base::size(names));
}
void ReadableStream::Error(ScriptState* script_state,
ReadableStream* stream,
v8::Local<v8::Value> e) {
// https://streams.spec.whatwg.org/#readable-stream-error
// 1. Assert: stream.[[state]] is "readable".
CHECK_EQ(stream->state_, kReadable);
auto* isolate = script_state->GetIsolate();
// 2. Set stream.[[state]] to "errored".
stream->state_ = kErrored;
// 3. Set stream.[[storedError]] to e.
stream->stored_error_.Set(isolate, e);
// 4. Let reader be stream.[[reader]].
ReadableStreamGenericReader* reader = stream->reader_;
// 5. If reader is undefined, return.
if (!reader) {
return;
}
// 6. If ! IsReadableStreamDefaultReader(reader) is true,
// a. Repeat for each readRequest that is an element of reader.
// [[readRequests]],
if (reader->IsDefaultReader()) {
ReadableStreamDefaultReader* default_reader =
To<ReadableStreamDefaultReader>(reader);
for (StreamPromiseResolver* promise : default_reader->read_requests_) {
// i. Reject readRequest.[[promise]] with e.
promise->Reject(script_state, e);
}
// b. Set reader.[[readRequests]] to a new empty List.
default_reader->read_requests_.clear();
}
// 7. Otherwise,
else {
// a. Assert: reader implements ReadableStreamBYOBReader.
DCHECK(reader->IsBYOBReader());
// b. For each readIntoRequest of reader.[[readIntoRequests]],
ReadableStreamBYOBReader* byob_reader =
To<ReadableStreamBYOBReader>(reader);
for (ReadableStreamBYOBReader::ReadIntoRequest* request :
byob_reader->read_into_requests_) {
// i. Perform readIntoRequests' error steps, given e.
request->ErrorSteps(script_state, e);
}
// c. Set reader.[[readIntoRequests]] to a new empty list.
byob_reader->read_into_requests_.clear();
}
// 8. Reject reader.[[closedPromise]] with e.
reader->closed_promise_->Reject(script_state, e);
// 9. Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
reader->closed_promise_->MarkAsHandled(isolate);
}
void ReadableStream::FulfillReadIntoRequest(ScriptState* script_state,
ReadableStream* stream,
DOMArrayBufferView* chunk,
bool done) {
// https://streams.spec.whatwg.org/#readable-stream-fulfill-read-into-request
// 1. Assert: ! ReadableStreamHasBYOBReader(stream) is true.
DCHECK(HasBYOBReader(stream));
// 2. Let reader be stream.[[reader]].
ReadableStreamGenericReader* reader = stream->reader_;
ReadableStreamBYOBReader* byob_reader = To<ReadableStreamBYOBReader>(reader);
// 3. Assert: reader.[[readIntoRequests]] is not empty.
DCHECK(!byob_reader->read_into_requests_.IsEmpty());
// 4. Let readIntoRequest be reader.[[readIntoRequests]][0].
ReadableStreamBYOBReader::ReadIntoRequest* read_into_request =
byob_reader->read_into_requests_[0];
// 5. Remove readIntoRequest from reader.[[readIntoRequests]].
byob_reader->read_into_requests_.pop_front();
// 6. If done is true, perform readIntoRequest’s close steps, given chunk.
if (done) {
read_into_request->CloseSteps(script_state, chunk);
} else {
// 7. Otherwise, perform readIntoRequest’s chunk steps, given chunk.
read_into_request->ChunkSteps(script_state, chunk);
}
}
void ReadableStream::FulfillReadRequest(ScriptState* script_state,
ReadableStream* stream,
v8::Local<v8::Value> chunk,
bool done) {
// https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request
// 1. Assert: ! ReadableStreamHasDefaultReader(stream) is true.
DCHECK(HasDefaultReader(stream));
// 2. Let reader be stream.[[reader]].
ReadableStreamGenericReader* reader = stream->reader_;
ReadableStreamDefaultReader* default_reader =
To<ReadableStreamDefaultReader>(reader);
// 3. Let readRequest be the first element of reader.[[readRequests]].
StreamPromiseResolver* read_request = default_reader->read_requests_.front();
// 4. Remove readIntoRequest from reader.[[readIntoRequests]], shifting all
// other elements downward (so that the second becomes the first, and so
// on).
default_reader->read_requests_.pop_front();
// 5. Resolve readIntoRequest.[[promise]] with !
// ReadableStreamCreateReadResult(chunk, done, reader.[[forAuthorCode]]).
read_request->Resolve(script_state, ReadableStream::CreateReadResult(
script_state, chunk, done,
default_reader->for_author_code_));
}
int ReadableStream::GetNumReadIntoRequests(const ReadableStream* stream) {
// https://streams.spec.whatwg.org/#readable-stream-get-num-read-into-requests
// 1. Assert: ! ReadableStreamHasBYOBReader(stream) is true.
DCHECK(HasBYOBReader(stream));
// 2. Return stream.[[reader]].[[readIntoRequests]]'s size.
ReadableStreamGenericReader* reader = stream->reader_;
return To<ReadableStreamBYOBReader>(reader)->read_into_requests_.size();
}
int ReadableStream::GetNumReadRequests(const ReadableStream* stream) {
// https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests
// 1. Assert: ! ReadableStreamHasDefaultReader(stream) is true.
DCHECK(HasDefaultReader(stream));
// 2. Return the number of elements in stream.[[reader]].[[readRequests]].
ReadableStreamGenericReader* reader = stream->reader_;
return To<ReadableStreamDefaultReader>(reader)->read_requests_.size();
}
bool ReadableStream::HasBYOBReader(const ReadableStream* stream) {
// https://streams.spec.whatwg.org/#readable-stream-has-byob-reader
// 1. Let reader be stream.[[reader]].
ReadableStreamGenericReader* reader = stream->reader_;
// 2. If reader is undefined, return false.
if (!reader) {
return false;
}
// 3. If reader implements ReadableStreamBYOBReader, return true.
// 4. Return false.
return reader->IsBYOBReader();
}
bool ReadableStream::HasDefaultReader(const ReadableStream* stream) {
// https://streams.spec.whatwg.org/#readable-stream-has-default-reader
// 1. Let reader be stream.[[reader]].
ReadableStreamGenericReader* reader = stream->reader_;
// 2. If reader is undefined, return false.
if (!reader) {
return false;
}
// 3. If reader implements ReadableStreamDefaultReader, return true.
// 4. Return false.
return reader->IsDefaultReader();
}
//
// TODO(ricea): Functions for transferable streams.
//
HeapVector<Member<ReadableStream>> ReadableStream::CallTeeAndReturnBranchArray(
ScriptState* script_state,
ReadableStream* readable,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#rs-tee
ReadableStream* branch1 = nullptr;
ReadableStream* branch2 = nullptr;
// 2. Let branches be ? ReadableStreamTee(this, false).
readable->Tee(script_state, &branch1, &branch2, exception_state);
if (!branch1 || !branch2)
return HeapVector<Member<ReadableStream>>();
DCHECK(!exception_state.HadException());
// 3. Return ! CreateArrayFromList(branches).
return HeapVector<Member<ReadableStream>>({branch1, branch2});
}
} // namespace blink