| // Copyright 2019 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "third_party/blink/renderer/core/streams/writable_stream.h" |
| |
| #include "third_party/blink/renderer/bindings/core/v8/script_value.h" |
| #include "third_party/blink/renderer/bindings/core/v8/to_v8_for_core.h" |
| #include "third_party/blink/renderer/bindings/core/v8/v8_queuing_strategy_init.h" |
| #include "third_party/blink/renderer/core/streams/count_queuing_strategy.h" |
| #include "third_party/blink/renderer/core/streams/miscellaneous_operations.h" |
| #include "third_party/blink/renderer/core/streams/promise_handler.h" |
| #include "third_party/blink/renderer/core/streams/readable_stream.h" |
| #include "third_party/blink/renderer/core/streams/readable_stream_transferring_optimizer.h" |
| #include "third_party/blink/renderer/core/streams/stream_promise_resolver.h" |
| #include "third_party/blink/renderer/core/streams/transferable_streams.h" |
| #include "third_party/blink/renderer/core/streams/underlying_sink_base.h" |
| #include "third_party/blink/renderer/core/streams/writable_stream_default_controller.h" |
| #include "third_party/blink/renderer/core/streams/writable_stream_default_writer.h" |
| #include "third_party/blink/renderer/core/streams/writable_stream_transferring_optimizer.h" |
| #include "third_party/blink/renderer/platform/bindings/exception_state.h" |
| #include "third_party/blink/renderer/platform/bindings/script_state.h" |
| #include "third_party/blink/renderer/platform/bindings/to_v8.h" |
| #include "third_party/blink/renderer/platform/bindings/v8_binding.h" |
| #include "third_party/blink/renderer/platform/heap/heap.h" |
| #include "third_party/blink/renderer/platform/heap/visitor.h" |
| #include "third_party/blink/renderer/platform/wtf/assertions.h" |
| |
| // Implementation of WritableStream for Blink. See |
| // https://streams.spec.whatwg.org/#ws. The implementation closely follows the |
| // standard, except where required for performance or integration with Blink. |
| // In particular, classes, methods and abstract operations are implemented in |
| // the same order as in the standard, to simplify side-by-side reading. |
| |
| namespace blink { |
| |
| // The PendingAbortRequest type corresponds to the Record {[[promise]], |
| // [[reason]], [[wasAlreadyErroring]]} from the standard. |
| class WritableStream::PendingAbortRequest final |
| : public GarbageCollected<PendingAbortRequest> { |
| public: |
| PendingAbortRequest(v8::Isolate* isolate, |
| StreamPromiseResolver* promise, |
| v8::Local<v8::Value> reason, |
| bool was_already_erroring) |
| : promise_(promise), |
| reason_(isolate, reason), |
| was_already_erroring_(was_already_erroring) {} |
| |
| StreamPromiseResolver* GetPromise() { return promise_; } |
| v8::Local<v8::Value> Reason(v8::Isolate* isolate) { |
| return reason_.NewLocal(isolate); |
| } |
| |
| bool WasAlreadyErroring() { return was_already_erroring_; } |
| |
| void Trace(Visitor* visitor) const { |
| visitor->Trace(promise_); |
| visitor->Trace(reason_); |
| } |
| |
| private: |
| Member<StreamPromiseResolver> promise_; |
| TraceWrapperV8Reference<v8::Value> reason_; |
| const bool was_already_erroring_; |
| |
| DISALLOW_COPY_AND_ASSIGN(PendingAbortRequest); |
| }; |
| |
| WritableStream* WritableStream::Create(ScriptState* script_state, |
| ExceptionState& exception_state) { |
| return Create(script_state, |
| ScriptValue(script_state->GetIsolate(), |
| v8::Undefined(script_state->GetIsolate())), |
| ScriptValue(script_state->GetIsolate(), |
| v8::Undefined(script_state->GetIsolate())), |
| exception_state); |
| } |
| |
| WritableStream* WritableStream::Create(ScriptState* script_state, |
| ScriptValue underlying_sink, |
| ExceptionState& exception_state) { |
| return Create(script_state, underlying_sink, |
| ScriptValue(script_state->GetIsolate(), |
| v8::Undefined(script_state->GetIsolate())), |
| exception_state); |
| } |
| |
| WritableStream* WritableStream::Create(ScriptState* script_state, |
| ScriptValue raw_underlying_sink, |
| ScriptValue raw_strategy, |
| ExceptionState& exception_state) { |
| auto* stream = MakeGarbageCollected<WritableStream>(); |
| stream->InitInternal(script_state, raw_underlying_sink, raw_strategy, |
| exception_state); |
| if (exception_state.HadException()) { |
| return nullptr; |
| } |
| return stream; |
| } |
| |
| WritableStream::WritableStream() = default; |
| WritableStream::~WritableStream() = default; |
| |
| ScriptPromise WritableStream::abort(ScriptState* script_state, |
| ExceptionState& exception_state) { |
| return abort(script_state, |
| ScriptValue(script_state->GetIsolate(), |
| v8::Undefined(script_state->GetIsolate())), |
| exception_state); |
| } |
| |
| ScriptPromise WritableStream::abort(ScriptState* script_state, |
| ScriptValue reason, |
| ExceptionState& exception_state) { |
| // https://streams.spec.whatwg.org/#ws-abort |
| // 2. If ! IsWritableStreamLocked(this) is true, return a promise rejected |
| // with a TypeError exception. |
| if (IsLocked(this)) { |
| exception_state.ThrowTypeError("Cannot abort a locked stream"); |
| return ScriptPromise(); |
| } |
| |
| // 3. Return ! WritableStreamAbort(this, reason). |
| return ScriptPromise(script_state, |
| Abort(script_state, this, reason.V8Value())); |
| } |
| |
| ScriptPromise WritableStream::close(ScriptState* script_state, |
| ExceptionState& exception_state) { |
| // https://streams.spec.whatwg.org/#ws-close |
| // 2. If ! IsWritableStreamLocked(this) is true, return a promise rejected |
| // with a TypeError exception. |
| if (IsLocked(this)) { |
| exception_state.ThrowTypeError("Cannot close a locked stream"); |
| return ScriptPromise(); |
| } |
| |
| // 3. If ! WritableStreamCloseQueuedOrInFlight(this) is true, return a promise |
| // rejected with a TypeError exception. |
| if (CloseQueuedOrInFlight(this)) { |
| exception_state.ThrowTypeError("Cannot close a closed or closing stream"); |
| return ScriptPromise(); |
| } |
| |
| return ScriptPromise(script_state, Close(script_state, this)); |
| } |
| |
| WritableStreamDefaultWriter* WritableStream::getWriter( |
| ScriptState* script_state, |
| ExceptionState& exception_state) { |
| // https://streams.spec.whatwg.org/#ws-get-writer |
| // 2. Return ? AcquireWritableStreamDefaultWriter(this). |
| |
| auto* writer = AcquireDefaultWriter(script_state, this, exception_state); |
| if (exception_state.HadException()) { |
| return nullptr; |
| } |
| return writer; |
| } |
| |
| // General Writable Stream Abstract Operations |
| |
| WritableStream* WritableStream::Create(ScriptState* script_state, |
| StreamStartAlgorithm* start_algorithm, |
| StreamAlgorithm* write_algorithm, |
| StreamAlgorithm* close_algorithm, |
| StreamAlgorithm* abort_algorithm, |
| double high_water_mark, |
| StrategySizeAlgorithm* size_algorithm, |
| ExceptionState& exception_state) { |
| DCHECK(size_algorithm); |
| |
| // https://streams.spec.whatwg.org/#create-writable-stream |
| // 3. Assert: ! IsNonNegativeNumber(highWaterMark) is true. |
| DCHECK_GE(high_water_mark, 0); |
| |
| // 4. Let stream be ObjectCreate(the original value of WritableStream's |
| // prototype property). |
| // 5. Perform ! InitializeWritableStream(stream). |
| auto* stream = MakeGarbageCollected<WritableStream>(); |
| |
| // 6. Let controller be ObjectCreate(the original value of |
| // WritableStreamDefaultController's prototype property). |
| auto* controller = MakeGarbageCollected<WritableStreamDefaultController>(); |
| |
| // 7. Perform ? SetUpWritableStreamDefaultController(stream, controller, |
| // startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, |
| // highWaterMark, sizeAlgorithm). |
| WritableStreamDefaultController::SetUp( |
| script_state, stream, controller, start_algorithm, write_algorithm, |
| close_algorithm, abort_algorithm, high_water_mark, size_algorithm, |
| exception_state); |
| |
| // 8. Return stream. |
| return stream; |
| } |
| |
| // static |
| WritableStream* WritableStream::CreateWithCountQueueingStrategy( |
| ScriptState* script_state, |
| UnderlyingSinkBase* underlying_sink, |
| size_t high_water_mark) { |
| return CreateWithCountQueueingStrategy(script_state, underlying_sink, |
| high_water_mark, |
| /*optimizer=*/nullptr); |
| } |
| |
| WritableStream* WritableStream::CreateWithCountQueueingStrategy( |
| ScriptState* script_state, |
| UnderlyingSinkBase* underlying_sink, |
| size_t high_water_mark, |
| std::unique_ptr<WritableStreamTransferringOptimizer> optimizer) { |
| // TODO(crbug.com/902633): This method of constructing a WritableStream |
| // introduces unnecessary trips through V8. Implement algorithms based on an |
| // UnderlyingSinkBase. |
| auto* init = QueuingStrategyInit::Create(); |
| init->setHighWaterMark(static_cast<double>(high_water_mark)); |
| auto* strategy = CountQueuingStrategy::Create(script_state, init); |
| ScriptValue strategy_value = ScriptValue::From(script_state, strategy); |
| if (strategy_value.IsEmpty()) |
| return nullptr; |
| |
| auto underlying_sink_value = ScriptValue::From(script_state, underlying_sink); |
| |
| ExceptionState exception_state(script_state->GetIsolate(), |
| ExceptionState::kConstructionContext, |
| "WritableStream"); |
| auto* stream = MakeGarbageCollected<WritableStream>(); |
| stream->InitInternal(script_state, underlying_sink_value, strategy_value, |
| exception_state); |
| if (exception_state.HadException()) |
| return nullptr; |
| |
| stream->transferring_optimizer_ = std::move(optimizer); |
| return stream; |
| } |
| |
| void WritableStream::Serialize(ScriptState* script_state, |
| MessagePort* port, |
| ExceptionState& exception_state) { |
| // https://streams.spec.whatwg.org/#ws-transfer |
| // 1. If ! IsWritableStreamLocked(value) is true, throw a "DataCloneError" |
| // DOMException. |
| if (IsLocked(this)) { |
| exception_state.ThrowTypeError("Cannot transfer a locked stream"); |
| return; |
| } |
| |
| // Done by SerializedScriptValue::TransferWritableStream(): |
| // 2. Let port1 be a new MessagePort in the current Realm. |
| // 3. Let port2 be a new MessagePort in the current Realm. |
| // 4. Entangle port1 and port2. |
| |
| // 5. Let readable be a new ReadableStream in the current Realm. |
| // 6. Perform ! SetUpCrossRealmTransformReadable(readable, port1). |
| auto* readable = CreateCrossRealmTransformReadable( |
| script_state, port, /*optimizer=*/nullptr, exception_state); |
| if (exception_state.HadException()) { |
| return; |
| } |
| |
| // 7. Let promise be ! ReadableStreamPipeTo(readable, value, false, false, |
| // false). |
| auto promise = ReadableStream::PipeTo( |
| script_state, readable, this, |
| MakeGarbageCollected<ReadableStream::PipeOptions>()); |
| |
| // 8. Set promise.[[PromiseIsHandled]] to true. |
| promise.MarkAsHandled(); |
| |
| // This step is done in a roundabout way by the caller: |
| // 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « |
| // port2 »). |
| } |
| |
| WritableStream* WritableStream::Deserialize( |
| ScriptState* script_state, |
| MessagePort* port, |
| std::unique_ptr<WritableStreamTransferringOptimizer> optimizer, |
| ExceptionState& exception_state) { |
| // We need to execute JavaScript to call "Then" on v8::Promises. We will not |
| // run author code. |
| v8::Isolate::AllowJavascriptExecutionScope allow_js( |
| script_state->GetIsolate()); |
| |
| // https://streams.spec.whatwg.org/#ws-transfer |
| // These step is done by V8ScriptValueDeserializer::ReadDOMObject(). |
| // 1. Let deserializedRecord be ! |
| // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current |
| // Realm). |
| // 2. Let port be deserializedRecord.[[Deserialized]]. |
| |
| // 3. Perform ! SetUpCrossRealmTransformWritable(value, port). |
| // In the standard |value| contains an unitialized WritableStream. In the |
| // implementation, we create the stream here. |
| auto* writable = CreateCrossRealmTransformWritable( |
| script_state, port, std::move(optimizer), exception_state); |
| if (exception_state.HadException()) { |
| return nullptr; |
| } |
| return writable; |
| } |
| |
| WritableStreamDefaultWriter* WritableStream::AcquireDefaultWriter( |
| ScriptState* script_state, |
| WritableStream* stream, |
| ExceptionState& exception_state) { |
| // https://streams.spec.whatwg.org/#acquire-writable-stream-default-writer |
| // 1. Return ? Construct(WritableStreamDefaultWriter, « stream »). |
| auto* writer = MakeGarbageCollected<WritableStreamDefaultWriter>( |
| script_state, stream, exception_state); |
| if (exception_state.HadException()) { |
| return nullptr; |
| } |
| return writer; |
| } |
| |
| v8::Local<v8::Promise> WritableStream::Abort(ScriptState* script_state, |
| WritableStream* stream, |
| v8::Local<v8::Value> reason) { |
| // https://streams.spec.whatwg.org/#writable-stream-abort |
| // 1. Let state be stream.[[state]]. |
| const auto state = stream->state_; |
| |
| // 2. If state is "closed" or "errored", return a promise resolved with |
| // undefined. |
| if (state == kClosed || state == kErrored) { |
| return PromiseResolveWithUndefined(script_state); |
| } |
| |
| // 3. If stream.[[pendingAbortRequest]] is not undefined, return |
| // stream.[[pendingAbortRequest]].[[promise]]. |
| auto* isolate = script_state->GetIsolate(); |
| if (stream->pending_abort_request_) { |
| return stream->pending_abort_request_->GetPromise()->V8Promise(isolate); |
| } |
| |
| // 4. Assert: state is "writable" or "erroring". |
| CHECK(state == kWritable || state == kErroring); |
| |
| // 5. Let wasAlreadyErroring be false. |
| // 6. If state is "erroring", |
| // a. Set wasAlreadyErroring to true. |
| // b. Set reason to undefined. |
| const bool was_already_erroring = state == kErroring; |
| if (was_already_erroring) { |
| reason = v8::Undefined(isolate); |
| } |
| |
| // 7. Let promise be a new promise. |
| auto* promise = MakeGarbageCollected<StreamPromiseResolver>(script_state); |
| |
| // 8. Set stream.[[pendingAbortRequest]] to Record {[[promise]]: promise, |
| // [[reason]]: reason, [[wasAlreadyErroring]]: wasAlreadyErroring}. |
| stream->pending_abort_request_ = MakeGarbageCollected<PendingAbortRequest>( |
| isolate, promise, reason, was_already_erroring); |
| |
| // 9. If wasAlreadyErroring is false, perform ! WritableStreamStartErroring( |
| // stream, reason). |
| if (!was_already_erroring) { |
| StartErroring(script_state, stream, reason); |
| } |
| |
| // 10. Return promise. |
| return promise->V8Promise(isolate); |
| } |
| |
| // Writable Stream Abstract Operations Used by Controllers |
| |
| v8::Local<v8::Promise> WritableStream::AddWriteRequest( |
| ScriptState* script_state, |
| WritableStream* stream) { |
| // https://streams.spec.whatwg.org/#writable-stream-add-write-request |
| // 1. Assert: ! IsWritableStreamLocked(stream) is true. |
| DCHECK(IsLocked(stream)); |
| |
| // 2. Assert: stream.[[state]] is "writable". |
| CHECK_EQ(stream->state_, kWritable); |
| |
| // 3. Let promise be a new promise. |
| auto* promise = MakeGarbageCollected<StreamPromiseResolver>(script_state); |
| |
| // 4. Append promise as the last element of stream.[[writeRequests]] |
| stream->write_requests_.push_back(promise); |
| |
| // 5. Return promise. |
| return promise->V8Promise(script_state->GetIsolate()); |
| } |
| |
| v8::Local<v8::Promise> WritableStream::Close(ScriptState* script_state, |
| WritableStream* stream) { |
| // https://streams.spec.whatwg.org/#writable-stream-close |
| // 1. Let state be stream.[[state]]. |
| const auto state = stream->GetState(); |
| |
| // 2. If state is "closed" or "errored", return a promise rejected with a |
| // TypeError exception. |
| if (state == kClosed || state == kErrored) { |
| return PromiseReject(script_state, |
| CreateCannotActionOnStateStreamException( |
| script_state->GetIsolate(), "close", state)); |
| } |
| |
| // 3. Assert: state is "writable" or "erroring". |
| CHECK(state == kWritable || state == kErroring); |
| |
| // 4. Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false. |
| CHECK(!CloseQueuedOrInFlight(stream)); |
| |
| // 5. Let promise be a new promise. |
| auto* promise = MakeGarbageCollected<StreamPromiseResolver>(script_state); |
| |
| // 6. Set stream.[[closeRequest]] to promise. |
| stream->SetCloseRequest(promise); |
| |
| // 7. Let writer be stream.[[writer]]. |
| WritableStreamDefaultWriter* writer = stream->writer_; |
| |
| // 8. If writer is not undefined, and stream.[[backpressure]] is true, and |
| // state is "writable", resolve writer.[[readyPromise]] with undefined. |
| if (writer && stream->HasBackpressure() && state == kWritable) { |
| writer->ReadyPromise()->ResolveWithUndefined(script_state); |
| } |
| |
| // 9. Perform ! WritableStreamDefaultControllerClose( |
| // stream.[[writableStreamController]]). |
| WritableStreamDefaultController::Close(script_state, stream->Controller()); |
| |
| // 10. Return promise. |
| return promise->V8Promise(script_state->GetIsolate()); |
| } |
| |
| bool WritableStream::CloseQueuedOrInFlight(const WritableStream* stream) { |
| // https://streams.spec.whatwg.org/#writable-stream-close-queued-or-in-flight |
| // 1. If stream.[[closeRequest]] is undefined and |
| // stream.[[inFlightCloseRequest]] is undefined, return false. |
| // 2. Return true. |
| return stream->close_request_ || stream->in_flight_close_request_; |
| } |
| |
| void WritableStream::DealWithRejection(ScriptState* script_state, |
| WritableStream* stream, |
| v8::Local<v8::Value> error) { |
| // https://streams.spec.whatwg.org/#writable-stream-deal-with-rejection |
| // 1. Let state be stream.[[state]]. |
| const auto state = stream->state_; |
| |
| // 2. If state is "writable", |
| if (state == kWritable) { |
| // a. Perform ! WritableStreamStartErroring(stream, error). |
| StartErroring(script_state, stream, error); |
| |
| // b. Return. |
| return; |
| } |
| |
| // 3. Assert: state is "erroring". |
| CHECK_EQ(state, kErroring); |
| |
| // 4. Perform ! WritableStreamFinishErroring(stream). |
| FinishErroring(script_state, stream); |
| } |
| |
| void WritableStream::StartErroring(ScriptState* script_state, |
| WritableStream* stream, |
| v8::Local<v8::Value> reason) { |
| // https://streams.spec.whatwg.org/#writable-stream-start-erroring |
| // 1. Assert: stream.[[storedError]] is undefined. |
| DCHECK(stream->stored_error_.IsEmpty()); |
| |
| // 2. Assert: stream.[[state]] is "writable". |
| CHECK_EQ(stream->state_, kWritable); |
| |
| // 3. Let controller be stream.[[writableStreamController]]. |
| WritableStreamDefaultController* controller = |
| stream->writable_stream_controller_; |
| |
| // 4. Assert: controller is not undefined. |
| DCHECK(controller); |
| |
| // 5. Set stream.[[state]] to "erroring". |
| stream->state_ = kErroring; |
| |
| // 6. Set stream.[[storedError]] to reason. |
| stream->stored_error_.Set(script_state->GetIsolate(), reason); |
| |
| // 7. Let writer be stream.[[writer]]. |
| WritableStreamDefaultWriter* writer = stream->writer_; |
| |
| // 8. If writer is not undefined, perform ! |
| // WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason). |
| if (writer) { |
| WritableStreamDefaultWriter::EnsureReadyPromiseRejected(script_state, |
| writer, reason); |
| } |
| |
| // 9. If ! WritableStreamHasOperationMarkedInFlight(stream) is false and |
| // controller.[[started]] is true, perform ! |
| // WritableStreamFinishErroring(stream). |
| if (!HasOperationMarkedInFlight(stream) && controller->Started()) { |
| FinishErroring(script_state, stream); |
| } |
| } |
| |
| void WritableStream::FinishErroring(ScriptState* script_state, |
| WritableStream* stream) { |
| // https://streams.spec.whatwg.org/#writable-stream-finish-erroring |
| // 1. Assert: stream.[[state]] is "erroring". |
| CHECK_EQ(stream->state_, kErroring); |
| |
| // 2. Assert: ! WritableStreamHasOperationMarkedInFlight(stream) is false. |
| DCHECK(!HasOperationMarkedInFlight(stream)); |
| |
| // 3. Set stream.[[state]] to "errored". |
| stream->state_ = kErrored; |
| |
| // 4. Perform ! stream.[[writableStreamController]].[[ErrorSteps]](). |
| stream->writable_stream_controller_->ErrorSteps(); |
| |
| // 5. Let storedError be stream.[[storedError]]. |
| auto* isolate = script_state->GetIsolate(); |
| const auto stored_error = stream->stored_error_.NewLocal(isolate); |
| |
| // 6. Repeat for each writeRequest that is an element of |
| // stream.[[writeRequests]], |
| // a. Reject writeRequest with storedError. |
| RejectPromises(script_state, &stream->write_requests_, stored_error); |
| |
| // 7. Set stream.[[writeRequests]] to an empty List. |
| stream->write_requests_.clear(); |
| |
| // 8. If stream.[[pendingAbortRequest]] is undefined, |
| if (!stream->pending_abort_request_) { |
| // a. Perform ! |
| // WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream). |
| RejectCloseAndClosedPromiseIfNeeded(script_state, stream); |
| |
| // b. Return. |
| return; |
| } |
| |
| // 9. Let abortRequest be stream.[[pendingAbortRequest]]. |
| auto* abort_request = stream->pending_abort_request_.Get(); |
| |
| // 10. Set stream.[[pendingAbortRequest]] to undefined. |
| stream->pending_abort_request_ = nullptr; |
| |
| // 11. If abortRequest.[[wasAlreadyErroring]] is true, |
| if (abort_request->WasAlreadyErroring()) { |
| // a. Reject abortRequest.[[promise]] with storedError. |
| abort_request->GetPromise()->Reject(script_state, stored_error); |
| |
| // b. Perform ! |
| // WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream) |
| RejectCloseAndClosedPromiseIfNeeded(script_state, stream); |
| |
| // c. Return. |
| return; |
| } |
| |
| // 12. Let promise be ! stream.[[writableStreamController]].[[AbortSteps]]( |
| // abortRequest.[[reason]]). |
| auto promise = stream->writable_stream_controller_->AbortSteps( |
| script_state, abort_request->Reason(isolate)); |
| |
| class ResolvePromiseFunction final : public PromiseHandler { |
| public: |
| ResolvePromiseFunction(ScriptState* script_state, |
| WritableStream* stream, |
| StreamPromiseResolver* promise) |
| : PromiseHandler(script_state), stream_(stream), promise_(promise) {} |
| |
| void CallWithLocal(v8::Local<v8::Value>) override { |
| // 13. Upon fulfillment of promise, |
| // a. Resolve abortRequest.[[promise]] with undefined. |
| promise_->ResolveWithUndefined(GetScriptState()); |
| |
| // b. Perform ! |
| // WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream). |
| RejectCloseAndClosedPromiseIfNeeded(GetScriptState(), stream_); |
| } |
| |
| void Trace(Visitor* visitor) const override { |
| visitor->Trace(stream_); |
| visitor->Trace(promise_); |
| PromiseHandler::Trace(visitor); |
| } |
| |
| private: |
| Member<WritableStream> stream_; |
| Member<StreamPromiseResolver> promise_; |
| }; |
| |
| class RejectPromiseFunction final : public PromiseHandler { |
| public: |
| RejectPromiseFunction(ScriptState* script_state, |
| WritableStream* stream, |
| StreamPromiseResolver* promise) |
| : PromiseHandler(script_state), stream_(stream), promise_(promise) {} |
| |
| void CallWithLocal(v8::Local<v8::Value> reason) override { |
| // 14. Upon rejection of promise with reason reason, |
| // a. Reject abortRequest.[[promise]] with reason. |
| promise_->Reject(GetScriptState(), reason); |
| |
| // b. Perform ! |
| // WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream). |
| RejectCloseAndClosedPromiseIfNeeded(GetScriptState(), stream_); |
| } |
| |
| void Trace(Visitor* visitor) const override { |
| visitor->Trace(stream_); |
| visitor->Trace(promise_); |
| PromiseHandler::Trace(visitor); |
| } |
| |
| private: |
| Member<WritableStream> stream_; |
| Member<StreamPromiseResolver> promise_; |
| }; |
| |
| StreamThenPromise(script_state->GetContext(), promise, |
| MakeGarbageCollected<ResolvePromiseFunction>( |
| script_state, stream, abort_request->GetPromise()), |
| MakeGarbageCollected<RejectPromiseFunction>( |
| script_state, stream, abort_request->GetPromise())); |
| } |
| |
| void WritableStream::FinishInFlightWrite(ScriptState* script_state, |
| WritableStream* stream) { |
| // https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write |
| // 1. Assert: stream.[[inFlightWriteRequest]] is not undefined. |
| DCHECK(stream->in_flight_write_request_); |
| |
| // 2. Resolve stream.[[inFlightWriteRequest]] with undefined. |
| stream->in_flight_write_request_->ResolveWithUndefined(script_state); |
| |
| // 3. Set stream.[[inFlightWriteRequest]] to undefined. |
| stream->in_flight_write_request_ = nullptr; |
| } |
| |
| void WritableStream::FinishInFlightWriteWithError(ScriptState* script_state, |
| WritableStream* stream, |
| v8::Local<v8::Value> error) { |
| // https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write-with-error |
| // 1. Assert: stream.[[inFlightWriteRequest]] is not undefined. |
| DCHECK(stream->in_flight_write_request_); |
| |
| // 2. Reject stream.[[inFlightWriteRequest]] with error. |
| stream->in_flight_write_request_->Reject(script_state, error); |
| |
| // 3. Set stream.[[inFlightWriteRequest]] to undefined. |
| stream->in_flight_write_request_ = nullptr; |
| |
| // 4. Assert: stream.[[state]] is "writable" or "erroring". |
| const auto state = stream->state_; |
| CHECK(state == kWritable || state == kErroring); |
| |
| // 5. Perform ! WritableStreamDealWithRejection(stream, error). |
| DealWithRejection(script_state, stream, error); |
| } |
| |
| void WritableStream::FinishInFlightClose(ScriptState* script_state, |
| WritableStream* stream) { |
| // https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close |
| // 1. Assert: stream.[[inFlightCloseRequest]] is not undefined. |
| DCHECK(stream->in_flight_close_request_); |
| |
| // 2. Resolve stream.[[inFlightCloseRequest]] with undefined. |
| stream->in_flight_close_request_->ResolveWithUndefined(script_state); |
| |
| // 3. Set stream.[[inFlightCloseRequest]] to undefined. |
| stream->in_flight_close_request_ = nullptr; |
| |
| // 4. Let state be stream.[[state]]. |
| const auto state = stream->state_; |
| |
| // 5. Assert: stream.[[state]] is "writable" or "erroring". |
| CHECK(state == kWritable || state == kErroring); |
| |
| // 6. If state is "erroring", |
| if (state == kErroring) { |
| // a. Set stream.[[storedError]] to undefined. |
| stream->stored_error_.Clear(); |
| |
| // b. If stream.[[pendingAbortRequest]] is not undefined, |
| if (stream->pending_abort_request_) { |
| // i. Resolve stream.[[pendingAbortRequest]].[[promise]] with |
| // undefined. |
| stream->pending_abort_request_->GetPromise()->ResolveWithUndefined( |
| script_state); |
| |
| // ii. Set stream.[[pendingAbortRequest]] to undefined. |
| stream->pending_abort_request_ = nullptr; |
| } |
| } |
| |
| // 7. Set stream.[[state]] to "closed". |
| stream->state_ = kClosed; |
| |
| // 8. Let writer be stream.[[writer]]. |
| const auto writer = stream->writer_; |
| |
| // 9. If writer is not undefined, resolve writer.[[closedPromise]] with |
| // undefined. |
| if (writer) { |
| writer->ClosedPromise()->ResolveWithUndefined(script_state); |
| } |
| |
| // 10. Assert: stream.[[pendingAbortRequest]] is undefined. |
| DCHECK(!stream->pending_abort_request_); |
| |
| // 11. Assert: stream.[[storedError]] is undefined. |
| DCHECK(stream->stored_error_.IsEmpty()); |
| } |
| |
| void WritableStream::FinishInFlightCloseWithError(ScriptState* script_state, |
| WritableStream* stream, |
| v8::Local<v8::Value> error) { |
| // https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close-with-error |
| // 1. Assert: stream.[[inFlightCloseRequest]] is not undefined. |
| DCHECK(stream->in_flight_close_request_); |
| |
| // 2. Reject stream.[[inFlightCloseRequest]] with error. |
| stream->in_flight_close_request_->Reject(script_state, error); |
| |
| // 3. Set stream.[[inFlightCloseRequest]] to undefined. |
| stream->in_flight_close_request_ = nullptr; |
| |
| // 4. Assert: stream.[[state]] is "writable" or "erroring". |
| const auto state = stream->state_; |
| CHECK(state == kWritable || state == kErroring); |
| |
| // 5. If stream.[[pendingAbortRequest]] is not undefined, |
| if (stream->pending_abort_request_) { |
| // a. Reject stream.[[pendingAbortRequest]].[[promise]] with error. |
| stream->pending_abort_request_->GetPromise()->Reject(script_state, error); |
| |
| // b. Set stream.[[pendingAbortRequest]] to undefined. |
| stream->pending_abort_request_ = nullptr; |
| } |
| |
| // 6. Perform ! WritableStreamDealWithRejection(stream, error). |
| DealWithRejection(script_state, stream, error); |
| } |
| |
| void WritableStream::MarkCloseRequestInFlight(WritableStream* stream) { |
| // https://streams.spec.whatwg.org/#writable-stream-mark-close-request-in-flight |
| // 1. Assert: stream.[[inFlightCloseRequest]] is undefined. |
| DCHECK(!stream->in_flight_close_request_); |
| |
| // 2. Assert: stream.[[closeRequest]] is not undefined. |
| DCHECK(stream->close_request_); |
| |
| // 3. Set stream.[[inFlightCloseRequest]] to stream.[[closeRequest]]. |
| stream->in_flight_close_request_ = stream->close_request_; |
| |
| // 4. Set stream.[[closeRequest]] to undefined. |
| stream->close_request_ = nullptr; |
| } |
| |
| void WritableStream::MarkFirstWriteRequestInFlight(WritableStream* stream) { |
| // https://streams.spec.whatwg.org/#writable-stream-mark-first-write-request-in-flight |
| // 1. Assert: stream.[[inFlightWriteRequest]] is undefined. |
| DCHECK(!stream->in_flight_write_request_); |
| |
| // 2. Assert: stream.[[writeRequests]] is not empty. |
| DCHECK(!stream->write_requests_.empty()); |
| |
| // 3. Let writeRequest be the first element of stream.[[writeRequests]]. |
| StreamPromiseResolver* write_request = stream->write_requests_.front(); |
| |
| // 4. Remove writeRequest from stream.[[writeRequests]], shifting all other |
| // elements downward (so that the second becomes the first, and so on). |
| stream->write_requests_.pop_front(); |
| |
| // 5. Set stream.[[inFlightWriteRequest]] to writeRequest. |
| stream->in_flight_write_request_ = write_request; |
| } |
| |
| void WritableStream::UpdateBackpressure(ScriptState* script_state, |
| WritableStream* stream, |
| bool backpressure) { |
| // https://streams.spec.whatwg.org/#writable-stream-update-backpressure |
| // 1. Assert: stream.[[state]] is "writable". |
| CHECK_EQ(stream->state_, kWritable); |
| |
| // 2. Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false. |
| CHECK(!CloseQueuedOrInFlight(stream)); |
| |
| // 3. Let writer be stream.[[writer]]. |
| WritableStreamDefaultWriter* writer = stream->writer_; |
| |
| // 4. If writer is not undefined and backpressure is not |
| // stream.[[backpressure]], |
| if (writer && backpressure != stream->has_backpressure_) { |
| // a. If backpressure is true, set writer.[[readyPromise]] to a new |
| // promise. |
| if (backpressure) { |
| writer->SetReadyPromise( |
| MakeGarbageCollected<StreamPromiseResolver>(script_state)); |
| } else { |
| // b. Otherwise, |
| // i. Assert: backpressure is false. |
| DCHECK(!backpressure); |
| |
| // ii. Resolve writer.[[readyPromise]] with undefined. |
| writer->ReadyPromise()->ResolveWithUndefined(script_state); |
| } |
| } |
| |
| // 5. Set stream.[[backpressure]] to backpressure. |
| stream->has_backpressure_ = backpressure; |
| } |
| |
| v8::Local<v8::Value> WritableStream::GetStoredError( |
| v8::Isolate* isolate) const { |
| return stored_error_.NewLocal(isolate); |
| } |
| |
| void WritableStream::SetCloseRequest(StreamPromiseResolver* close_request) { |
| close_request_ = close_request; |
| } |
| |
| void WritableStream::SetController( |
| WritableStreamDefaultController* controller) { |
| writable_stream_controller_ = controller; |
| } |
| |
| void WritableStream::SetWriter(WritableStreamDefaultWriter* writer) { |
| writer_ = writer; |
| } |
| |
| std::unique_ptr<WritableStreamTransferringOptimizer> |
| WritableStream::TakeTransferringOptimizer() { |
| return std::move(transferring_optimizer_); |
| } |
| |
| // static |
| v8::Local<v8::String> WritableStream::CreateCannotActionOnStateStreamMessage( |
| v8::Isolate* isolate, |
| const char* action, |
| const char* state_name) { |
| return V8String(isolate, String::Format("Cannot %s a %s writable stream", |
| action, state_name)); |
| } |
| |
| // static |
| v8::Local<v8::Value> WritableStream::CreateCannotActionOnStateStreamException( |
| v8::Isolate* isolate, |
| const char* action, |
| State state) { |
| const char* state_name = nullptr; |
| switch (state) { |
| case WritableStream::kClosed: |
| state_name = "CLOSED"; |
| break; |
| |
| case WritableStream::kErrored: |
| state_name = "ERRORED"; |
| break; |
| |
| default: |
| NOTREACHED(); |
| } |
| return v8::Exception::TypeError( |
| CreateCannotActionOnStateStreamMessage(isolate, action, state_name)); |
| } |
| |
| void WritableStream::Trace(Visitor* visitor) const { |
| visitor->Trace(close_request_); |
| visitor->Trace(in_flight_write_request_); |
| visitor->Trace(in_flight_close_request_); |
| visitor->Trace(pending_abort_request_); |
| visitor->Trace(stored_error_); |
| visitor->Trace(writable_stream_controller_); |
| visitor->Trace(writer_); |
| visitor->Trace(write_requests_); |
| ScriptWrappable::Trace(visitor); |
| } |
| |
| // This is not implemented inside the constructor in C++, because calling into |
| // JavaScript from the constructor can cause GC problems. |
| void WritableStream::InitInternal(ScriptState* script_state, |
| ScriptValue raw_underlying_sink, |
| ScriptValue raw_strategy, |
| ExceptionState& exception_state) { |
| // The first parts of this constructor implementation correspond to the object |
| // conversions that are implicit in the definition in the standard: |
| // https://streams.spec.whatwg.org/#ws-constructor |
| DCHECK(!raw_underlying_sink.IsEmpty()); |
| DCHECK(!raw_strategy.IsEmpty()); |
| |
| auto context = script_state->GetContext(); |
| auto* isolate = script_state->GetIsolate(); |
| |
| v8::Local<v8::Object> underlying_sink; |
| ScriptValueToObject(script_state, raw_underlying_sink, &underlying_sink, |
| exception_state); |
| if (exception_state.HadException()) { |
| return; |
| } |
| |
| // 2. Let size be ? GetV(strategy, "size"). |
| // 3. Let highWaterMark be ? GetV(strategy, "highWaterMark"). |
| StrategyUnpacker strategy_unpacker(script_state, raw_strategy, |
| exception_state); |
| if (exception_state.HadException()) { |
| return; |
| } |
| |
| // 4. Let type be ? GetV(underlyingSink, "type"). |
| v8::TryCatch try_catch(isolate); |
| v8::Local<v8::Value> type; |
| if (!underlying_sink->Get(context, V8AtomicString(isolate, "type")) |
| .ToLocal(&type)) { |
| exception_state.RethrowV8Exception(try_catch.Exception()); |
| return; |
| } |
| |
| // 5. If type is not undefined, throw a RangeError exception. |
| if (!type->IsUndefined()) { |
| exception_state.ThrowRangeError("Invalid type is specified"); |
| return; |
| } |
| |
| // 6. Let sizeAlgorithm be ? MakeSizeAlgorithmFromSizeFunction(size). |
| auto* size_algorithm = |
| strategy_unpacker.MakeSizeAlgorithm(script_state, exception_state); |
| if (exception_state.HadException()) { |
| return; |
| } |
| DCHECK(size_algorithm); |
| |
| // 7. If highWaterMark is undefined, let highWaterMark be 1. |
| // 8. Set highWaterMark to ? ValidateAndNormalizeHighWaterMark(highWaterMark). |
| double high_water_mark = |
| strategy_unpacker.GetHighWaterMark(script_state, 1, exception_state); |
| if (exception_state.HadException()) { |
| return; |
| } |
| |
| // 9. Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink(this, |
| // underlyingSink, highWaterMark, sizeAlgorithm). |
| WritableStreamDefaultController::SetUpFromUnderlyingSink( |
| script_state, this, underlying_sink, high_water_mark, size_algorithm, |
| exception_state); |
| } |
| |
| bool WritableStream::HasOperationMarkedInFlight(const WritableStream* stream) { |
| // https://streams.spec.whatwg.org/#writable-stream-has-operation-marked-in-flight |
| // 1. If stream.[[inFlightWriteRequest]] is undefined and |
| // controller.[[inFlightCloseRequest]] is undefined, return false. |
| // 2. Return true. |
| return stream->in_flight_write_request_ || stream->in_flight_close_request_; |
| } |
| |
| void WritableStream::RejectCloseAndClosedPromiseIfNeeded( |
| ScriptState* script_state, |
| WritableStream* stream) { |
| // https://streams.spec.whatwg.org/#writable-stream-reject-close-and-closed-promise-if-needed |
| // // 1. Assert: stream.[[state]] is "errored". |
| CHECK_EQ(stream->state_, kErrored); |
| |
| auto* isolate = script_state->GetIsolate(); |
| |
| // 2. If stream.[[closeRequest]] is not undefined, |
| if (stream->close_request_) { |
| // a. Assert: stream.[[inFlightCloseRequest]] is undefined. |
| DCHECK(!stream->in_flight_close_request_); |
| |
| // b. Reject stream.[[closeRequest]] with stream.[[storedError]]. |
| stream->close_request_->Reject(script_state, |
| stream->stored_error_.NewLocal(isolate)); |
| |
| // c. Set stream.[[closeRequest]] to undefined. |
| stream->close_request_ = nullptr; |
| } |
| |
| // 3. Let writer be stream.[[writer]]. |
| const auto writer = stream->writer_; |
| |
| // 4. If writer is not undefined, |
| if (writer) { |
| // a. Reject writer.[[closedPromise]] with stream.[[storedError]]. |
| writer->ClosedPromise()->Reject(script_state, |
| stream->stored_error_.NewLocal(isolate)); |
| |
| // b. Set writer.[[closedPromise]].[[PromiseIsHandled]] to true. |
| writer->ClosedPromise()->MarkAsHandled(isolate); |
| } |
| } |
| |
| // TODO(ricea): Functions for transferable streams. |
| |
| // Utility functions (not from the standard). |
| |
| void WritableStream::RejectPromises(ScriptState* script_state, |
| PromiseQueue* queue, |
| v8::Local<v8::Value> e) { |
| for (auto promise : *queue) { |
| promise->Reject(script_state, e); |
| } |
| } |
| |
| } // namespace blink |