blob: c793e966ff2afcb9fb3f461c0acc64935882d3bd [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Functions for transferable streams. See design doc
// https://docs.google.com/document/d/1_KuZzg5c3pncLJPFa8SuVm23AP4tft6mzPCL5at3I9M/edit
#include "third_party/blink/renderer/core/streams/transferable_streams.h"
#include "base/stl_util.h"
#include "third_party/blink/renderer/bindings/core/v8/readable_stream_default_reader_or_readable_stream_byob_reader.h"
#include "third_party/blink/renderer/bindings/core/v8/script_function.h"
#include "third_party/blink/renderer/bindings/core/v8/to_v8_for_core.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_dom_exception.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_iterator_result_value.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_post_message_options.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_readable_stream_default_controller.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_throw_dom_exception.h"
#include "third_party/blink/renderer/core/dom/dom_exception.h"
#include "third_party/blink/renderer/core/dom/events/native_event_listener.h"
#include "third_party/blink/renderer/core/events/message_event.h"
#include "third_party/blink/renderer/core/messaging/message_port.h"
#include "third_party/blink/renderer/core/streams/miscellaneous_operations.h"
#include "third_party/blink/renderer/core/streams/promise_handler.h"
#include "third_party/blink/renderer/core/streams/readable_stream.h"
#include "third_party/blink/renderer/core/streams/readable_stream_default_controller.h"
#include "third_party/blink/renderer/core/streams/readable_stream_default_controller_with_script_scope.h"
#include "third_party/blink/renderer/core/streams/readable_stream_transferring_optimizer.h"
#include "third_party/blink/renderer/core/streams/stream_algorithms.h"
#include "third_party/blink/renderer/core/streams/stream_promise_resolver.h"
#include "third_party/blink/renderer/core/streams/underlying_source_base.h"
#include "third_party/blink/renderer/core/streams/writable_stream.h"
#include "third_party/blink/renderer/core/streams/writable_stream_default_controller.h"
#include "third_party/blink/renderer/core/streams/writable_stream_transferring_optimizer.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/bindings/script_state.h"
#include "third_party/blink/renderer/platform/bindings/v8_binding.h"
#include "third_party/blink/renderer/platform/bindings/v8_throw_exception.h"
#include "third_party/blink/renderer/platform/heap/heap.h"
#include "third_party/blink/renderer/platform/heap/visitor.h"
#include "third_party/blink/renderer/platform/wtf/assertions.h"
#include "v8/include/v8.h"
// See the design doc at
// https://docs.google.com/document/d/1_KuZzg5c3pncLJPFa8SuVm23AP4tft6mzPCL5at3I9M/edit
// for explanation of how transferable streams are constructed from the "cross
// realm identity transform" implemented in this file.
// The peer (the other end of the MessagePort) is untrusted as it may be
// compromised. This means we have to be very careful in unpacking the messages
// from the peer. LOG(WARNING) is used for cases where a message from the peer
// appears to be invalid. If this appears during ordinary testing it indicates a
// bug.
//
// The -vmodule=transferable_streams=3 command-line argument can be used for
// debugging of the protocol.
namespace blink {
namespace {
template <typename T, typename... Args>
NewScriptFunction* CreateFunction(ScriptState* script_state, Args&&... args) {
return MakeGarbageCollected<NewScriptFunction>(
script_state, MakeGarbageCollected<T>(std::forward<Args>(args)...));
}
// These are the types of messages that are sent between peers.
enum class MessageType { kPull, kChunk, kClose, kError };
// Creates a JavaScript object with a null prototype structured like {key1:
// value2, key2: value2}. This is used to create objects to be serialized by
// postMessage.
v8::Local<v8::Object> CreateKeyValueObject(v8::Isolate* isolate,
const char* key1,
v8::Local<v8::Value> value1,
const char* key2,
v8::Local<v8::Value> value2) {
v8::Local<v8::Name> names[] = {V8AtomicString(isolate, key1),
V8AtomicString(isolate, key2)};
v8::Local<v8::Value> values[] = {value1, value2};
static_assert(base::size(names) == base::size(values),
"names and values arrays must be the same size");
return v8::Object::New(isolate, v8::Null(isolate), names, values,
base::size(names));
}
// Unpacks an object created by CreateKeyValueObject(). |value1| and |value2|
// are out parameters. Returns false on failure.
bool UnpackKeyValueObject(ScriptState* script_state,
v8::Local<v8::Object> object,
const char* key1,
v8::Local<v8::Value>* value1,
const char* key2,
v8::Local<v8::Value>* value2) {
auto* isolate = script_state->GetIsolate();
v8::TryCatch try_catch(isolate);
auto context = script_state->GetContext();
if (!object->Get(context, V8AtomicString(isolate, key1)).ToLocal(value1)) {
DLOG(WARNING) << "Error reading key: '" << key1 << "'";
return false;
}
if (!object->Get(context, V8AtomicString(isolate, key2)).ToLocal(value2)) {
DLOG(WARNING) << "Error reading key: '" << key2 << "'";
return false;
}
return true;
}
// Sends a message with type |type| and contents |value| over |port|. The type
// is packed as a number with key "t", and the value is packed with key "v".
void PackAndPostMessage(ScriptState* script_state,
MessagePort* port,
MessageType type,
v8::Local<v8::Value> value,
ExceptionState& exception_state) {
DVLOG(3) << "PackAndPostMessage sending message type "
<< static_cast<int>(type);
auto* isolate = script_state->GetIsolate();
// https://streams.spec.whatwg.org/#abstract-opdef-packandpostmessage
// 1. Let message be OrdinaryObjectCreate(null).
// 2. Perform ! CreateDataProperty(message, "type", type).
// 3. Perform ! CreateDataProperty(message, "value", value).
v8::Local<v8::Object> packed = CreateKeyValueObject(
isolate, "t", v8::Number::New(isolate, static_cast<int>(type)), "v",
value);
// 4. Let targetPort be the port with which port is entangled, if any;
// otherwise let it be null.
// 5. Let options be «[ "transfer" → « » ]».
// 6. Run the message port post message steps providing targetPort, message,
// and options.
port->postMessage(script_state, ScriptValue(isolate, packed),
PostMessageOptions::Create(), exception_state);
}
// Sends a kError message to the remote side, disregarding failure.
void CrossRealmTransformSendError(ScriptState* script_state,
MessagePort* port,
v8::Local<v8::Value> error) {
ExceptionState exception_state(script_state->GetIsolate(),
ExceptionState::kUnknownContext, "", "");
// https://streams.spec.whatwg.org/#abstract-opdef-crossrealmtransformsenderror
// 1. Perform PackAndPostMessage(port, "error", error), discarding the result.
PackAndPostMessage(script_state, port, MessageType::kError, error,
exception_state);
if (exception_state.HadException()) {
DLOG(WARNING) << "Disregarding exception while sending error";
exception_state.ClearException();
}
}
// Same as PackAndPostMessage(), except that it attempts to handle exceptions by
// sending a kError message to the remote side. Any error from sending the
// kError message is ignored.
//
// The calling convention differs slightly from the standard to minimize
// verbosity at the calling sites. The function returns true for a normal
// completion and false for an abrupt completion.When there's an abrupt
// completion result.[[Value]] is stored into |error|.
bool PackAndPostMessageHandlingError(ScriptState* script_state,
MessagePort* port,
MessageType type,
v8::Local<v8::Value> value,
v8::Local<v8::Value>* error) {
ExceptionState exception_state(script_state->GetIsolate(),
ExceptionState::kUnknownContext, "", "");
// https://streams.spec.whatwg.org/#abstract-opdef-packandpostmessagehandlingerror
// 1. Let result be PackAndPostMessage(port, type, value).
PackAndPostMessage(script_state, port, type, value, exception_state);
// 2. If result is an abrupt completion,
if (exception_state.HadException()) {
// 1. Perform ! CrossRealmTransformSendError(port, result.[[Value]]).
// 3. Return result as a completion record.
*error = exception_state.GetException();
CrossRealmTransformSendError(script_state, port, *error);
exception_state.ClearException();
return false;
}
return true;
}
// Base class for CrossRealmTransformWritable and CrossRealmTransformReadable.
// Contains common methods that are used when handling MessagePort events.
class CrossRealmTransformStream
: public GarbageCollected<CrossRealmTransformStream> {
public:
// Neither of the subclasses require finalization, so no destructor.
virtual ScriptState* GetScriptState() const = 0;
virtual MessagePort* GetMessagePort() const = 0;
// HandleMessage() is called by CrossRealmTransformMessageListener to handle
// an incoming message from the MessagePort.
virtual void HandleMessage(MessageType type, v8::Local<v8::Value> value) = 0;
// HandleError() is called by CrossRealmTransformErrorListener when an error
// event is fired on the message port. It should error the stream.
virtual void HandleError(v8::Local<v8::Value> error) = 0;
virtual void Trace(Visitor*) const {}
};
// Handles MessageEvents from the MessagePort.
class CrossRealmTransformMessageListener final : public NativeEventListener {
public:
explicit CrossRealmTransformMessageListener(CrossRealmTransformStream* target)
: target_(target) {}
void Invoke(ExecutionContext*, Event* event) override {
// TODO(ricea): Find a way to guarantee this cast is safe.
MessageEvent* message = static_cast<MessageEvent*>(event);
ScriptState* script_state = target_->GetScriptState();
// The deserializer code called by message->data() looks up the ScriptState
// from the current context, so we need to make sure it is set.
ScriptState::Scope scope(script_state);
// Common to
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable
// and
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable.
// 1. Let data be the data of the message.
v8::Local<v8::Value> data = message->data(script_state).V8Value();
// 2. Assert: Type(data) is Object.
// In the world of the standard, this is guaranteed to be true. In the real
// world, the data could come from a compromised renderer and be malicious.
if (!data->IsObject()) {
DLOG(WARNING) << "Invalid message from peer ignored (not object)";
return;
}
// 3. Let type be ! Get(data, "type").
// 4. Let value be ! Get(data, "value").
v8::Local<v8::Value> type;
v8::Local<v8::Value> value;
if (!UnpackKeyValueObject(script_state, data.As<v8::Object>(), "t", &type,
"v", &value)) {
DLOG(WARNING) << "Invalid message from peer ignored";
return;
}
// 5. Assert: Type(type) is String
// This implementation uses numbers for types rather than strings.
if (!type->IsNumber()) {
DLOG(WARNING) << "Invalid message from peer ignored (type is not number)";
return;
}
int type_value = type.As<v8::Number>()->Value();
DVLOG(3) << "MessageListener saw message type " << type_value;
target_->HandleMessage(static_cast<MessageType>(type_value), value);
}
void Trace(Visitor* visitor) const override {
visitor->Trace(target_);
NativeEventListener::Trace(visitor);
}
private:
const Member<CrossRealmTransformStream> target_;
};
// Handles "error" events from the MessagePort.
class CrossRealmTransformErrorListener final : public NativeEventListener {
public:
explicit CrossRealmTransformErrorListener(CrossRealmTransformStream* target)
: target_(target) {}
void Invoke(ExecutionContext*, Event*) override {
ScriptState* script_state = target_->GetScriptState();
// Need to enter a script scope to manipulate JavaScript objects.
ScriptState::Scope scope(script_state);
// Common to
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable
// and
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable.
// 1. Let error be a new "DataCloneError" DOMException.
v8::Local<v8::Value> error = V8ThrowDOMException::CreateOrEmpty(
script_state->GetIsolate(), DOMExceptionCode::kDataCloneError,
"chunk could not be cloned");
// 2. Perform ! CrossRealmTransformSendError(port, error).
auto* message_port = target_->GetMessagePort();
CrossRealmTransformSendError(script_state, message_port, error);
// 4. Disentangle port.
message_port->close();
DVLOG(3) << "ErrorListener saw messageerror";
target_->HandleError(error);
}
void Trace(Visitor* visitor) const override {
visitor->Trace(target_);
NativeEventListener::Trace(visitor);
}
private:
const Member<CrossRealmTransformStream> target_;
};
// Class for data associated with the writable side of the cross realm transform
// stream.
class CrossRealmTransformWritable final : public CrossRealmTransformStream {
public:
CrossRealmTransformWritable(ScriptState* script_state, MessagePort* port)
: script_state_(script_state),
message_port_(port),
backpressure_promise_(
MakeGarbageCollected<StreamPromiseResolver>(script_state)) {}
WritableStream* CreateWritableStream(ExceptionState&);
ScriptState* GetScriptState() const override { return script_state_; }
MessagePort* GetMessagePort() const override { return message_port_; }
void HandleMessage(MessageType type, v8::Local<v8::Value> value) override;
void HandleError(v8::Local<v8::Value> error) override;
void Trace(Visitor* visitor) const override {
visitor->Trace(script_state_);
visitor->Trace(message_port_);
visitor->Trace(backpressure_promise_);
visitor->Trace(controller_);
CrossRealmTransformStream::Trace(visitor);
}
private:
class WriteAlgorithm;
class CloseAlgorithm;
class AbortAlgorithm;
const Member<ScriptState> script_state_;
const Member<MessagePort> message_port_;
Member<StreamPromiseResolver> backpressure_promise_;
Member<WritableStreamDefaultController> controller_;
};
class CrossRealmTransformWritable::WriteAlgorithm final
: public StreamAlgorithm {
public:
explicit WriteAlgorithm(CrossRealmTransformWritable* writable)
: writable_(writable) {}
// Sends the chunk to the readable side, possibly after waiting for
// backpressure.
v8::Local<v8::Promise> Run(ScriptState* script_state,
int argc,
v8::Local<v8::Value> argv[]) override {
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable
// 8. Let writeAlgorithm be the following steps, taking a chunk argument:
DCHECK_EQ(argc, 1);
auto chunk = argv[0];
// 1. If backpressurePromise is undefined, set backpressurePromise to a
// promise resolved with undefined.
// As an optimization for the common case, we call DoWrite() synchronously
// instead. The difference is not observable because the result is only
// visible asynchronously anyway. This avoids doing an extra allocation and
// creating a TraceWrappertV8Reference.
if (!writable_->backpressure_promise_) {
return DoWrite(script_state, chunk);
}
auto* isolate = script_state->GetIsolate();
// 2. Return the result of reacting to backpressurePromise with the
// following fulfillment steps:
return StreamThenPromise(
script_state->GetContext(),
writable_->backpressure_promise_->V8Promise(isolate),
MakeGarbageCollected<DoWriteOnResolve>(script_state, chunk, this));
}
void Trace(Visitor* visitor) const override {
visitor->Trace(writable_);
StreamAlgorithm::Trace(visitor);
}
private:
// A promise handler which calls DoWrite() when the promise resolves.
class DoWriteOnResolve final : public PromiseHandlerWithValue {
public:
DoWriteOnResolve(ScriptState* script_state,
v8::Local<v8::Value> chunk,
WriteAlgorithm* target)
: PromiseHandlerWithValue(script_state),
chunk_(script_state->GetIsolate(), chunk),
target_(target) {}
v8::Local<v8::Value> CallWithLocal(v8::Local<v8::Value>) override {
ScriptState* script_state = GetScriptState();
return target_->DoWrite(script_state,
chunk_.NewLocal(script_state->GetIsolate()));
}
void Trace(Visitor* visitor) const override {
visitor->Trace(chunk_);
visitor->Trace(target_);
PromiseHandlerWithValue::Trace(visitor);
}
private:
const TraceWrapperV8Reference<v8::Value> chunk_;
const Member<WriteAlgorithm> target_;
};
// Sends a chunk over the message port to the readable side.
v8::Local<v8::Promise> DoWrite(ScriptState* script_state,
v8::Local<v8::Value> chunk) {
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable
// 8. Let writeAlgorithm be the following steps, taking a chunk argument:
// 2. Return the result of reacting to backpressurePromise with the
// following fulfillment steps:
// 1. Set backpressurePromise to a new promise.
writable_->backpressure_promise_ =
MakeGarbageCollected<StreamPromiseResolver>(script_state);
v8::Local<v8::Value> error;
// 2. Let result be PackAndPostMessageHandlingError(port, "chunk",
// chunk).
bool success =
PackAndPostMessageHandlingError(script_state, writable_->message_port_,
MessageType::kChunk, chunk, &error);
// 3. If result is an abrupt completion,
if (!success) {
// 1. Disentangle port.
writable_->message_port_->close();
// 2. Return a promise rejected with result.[[Value]].
return PromiseReject(script_state, error);
}
// 4. Otherwise, return a promise resolved with undefined.
return PromiseResolveWithUndefined(script_state);
}
const Member<CrossRealmTransformWritable> writable_;
};
class CrossRealmTransformWritable::CloseAlgorithm final
: public StreamAlgorithm {
public:
explicit CloseAlgorithm(CrossRealmTransformWritable* writable)
: writable_(writable) {}
// Sends a close message to the readable side and closes the message port.
v8::Local<v8::Promise> Run(ScriptState* script_state,
int argc,
v8::Local<v8::Value> argv[]) override {
DCHECK_EQ(argc, 0);
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable
// 9. Let closeAlgorithm be the folowing steps:
v8::Local<v8::Value> error;
// 1. Perform ! PackAndPostMessage(port, "close", undefined).
// In the standard, this can't fail. However, in the implementation failure
// is possible, so we have to handle it.
bool success = PackAndPostMessageHandlingError(
script_state, writable_->message_port_, MessageType::kClose,
v8::Undefined(script_state->GetIsolate()), &error);
// 2. Disentangle port.
writable_->message_port_->close();
// Error the stream if an error occurred.
if (!success) {
return PromiseReject(script_state, error);
}
// 3. Return a promise resolved with undefined.
return PromiseResolveWithUndefined(script_state);
}
void Trace(Visitor* visitor) const override {
visitor->Trace(writable_);
StreamAlgorithm::Trace(visitor);
}
private:
const Member<CrossRealmTransformWritable> writable_;
};
class CrossRealmTransformWritable::AbortAlgorithm final
: public StreamAlgorithm {
public:
explicit AbortAlgorithm(CrossRealmTransformWritable* writable)
: writable_(writable) {}
// Sends an abort message to the readable side and closes the message port.
v8::Local<v8::Promise> Run(ScriptState* script_state,
int argc,
v8::Local<v8::Value> argv[]) override {
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable
// 10. Let abortAlgorithm be the following steps, taking a reason argument:
DCHECK_EQ(argc, 1);
auto reason = argv[0];
v8::Local<v8::Value> error;
// 1. Let result be PackAndPostMessageHandlingError(port, "error",
// reason).
bool success =
PackAndPostMessageHandlingError(script_state, writable_->message_port_,
MessageType::kError, reason, &error);
// 2. Disentangle port.
writable_->message_port_->close();
// 3. If result is an abrupt completion, return a promise rejected with
// result.[[Value]].
if (!success) {
return PromiseReject(script_state, error);
}
// 4. Otherwise, return a promise resolved with undefined.
return PromiseResolveWithUndefined(script_state);
}
void Trace(Visitor* visitor) const override {
visitor->Trace(writable_);
StreamAlgorithm::Trace(visitor);
}
private:
const Member<CrossRealmTransformWritable> writable_;
};
WritableStream* CrossRealmTransformWritable::CreateWritableStream(
ExceptionState& exception_state) {
DCHECK(!controller_) << "CreateWritableStream() can only be called once";
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable
// The order of operations is significantly different from the standard, but
// functionally equivalent.
// 3. Let backpressurePromise be a new promise.
// |backpressure_promise_| is initialized by the constructor.
// 4. Add a handler for port’s message event with the following steps:
// 6. Enable port’s port message queue.
message_port_->setOnmessage(
MakeGarbageCollected<CrossRealmTransformMessageListener>(this));
// 5. Add a handler for port’s messageerror event with the following steps:
message_port_->setOnmessageerror(
MakeGarbageCollected<CrossRealmTransformErrorListener>(this));
// 1. Perform ! InitializeWritableStream(stream).
// 2. Let controller be a new WritableStreamDefaultController.
// 7. Let startAlgorithm be an algorithm that returns undefined.
// 11. Let sizeAlgorithm be an algorithm that returns 1.
// 12. Perform ! SetUpWritableStreamDefaultController(stream, controller,
// startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, 1,
// sizeAlgorithm).
auto* stream =
WritableStream::Create(script_state_, CreateTrivialStartAlgorithm(),
MakeGarbageCollected<WriteAlgorithm>(this),
MakeGarbageCollected<CloseAlgorithm>(this),
MakeGarbageCollected<AbortAlgorithm>(this), 1,
CreateDefaultSizeAlgorithm(), exception_state);
if (exception_state.HadException()) {
return nullptr;
}
controller_ = stream->Controller();
return stream;
}
void CrossRealmTransformWritable::HandleMessage(MessageType type,
v8::Local<v8::Value> value) {
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable
// 4. Add a handler for port’s message event with the following steps:
// The initial steps are done by CrossRealmTransformMessageListener
switch (type) {
// 6. If type is "pull",
case MessageType::kPull:
// 1. If backpressurePromise is not undefined,
if (backpressure_promise_) {
// 1. Resolve backpressurePromise with undefined.
backpressure_promise_->ResolveWithUndefined(script_state_);
// 2. Set backpressurePromise to undefined.
backpressure_promise_ = nullptr;
}
return;
// 7. Otherwise if type is "error",
case MessageType::kError:
// 1. Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller,
// value).
WritableStreamDefaultController::ErrorIfNeeded(script_state_, controller_,
value);
// 2. If backpressurePromise is not undefined,
if (backpressure_promise_) {
// 1. Resolve backpressurePromise with undefined.
// 2. Set backpressurePromise to undefined.
backpressure_promise_->ResolveWithUndefined(script_state_);
backpressure_promise_ = nullptr;
}
return;
default:
DLOG(WARNING) << "Invalid message from peer ignored (invalid type): "
<< static_cast<int>(type);
return;
}
}
void CrossRealmTransformWritable::HandleError(v8::Local<v8::Value> error) {
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable
// 5. Add a handler for port’s messageerror event with the following steps:
// The first two steps, and the last step, are performed by
// CrossRealmTransformErrorListener.
// 3. Perform ! WritableStreamDefaultControllerError(controller, error).
// TODO(ricea): Fix the standard to say ErrorIfNeeded and update the above
// line once that is done.
WritableStreamDefaultController::ErrorIfNeeded(script_state_, controller_,
error);
}
// Class for data associated with the readable side of the cross realm transform
// stream.
class CrossRealmTransformReadable final : public CrossRealmTransformStream {
public:
CrossRealmTransformReadable(ScriptState* script_state, MessagePort* port)
: script_state_(script_state), message_port_(port) {}
ReadableStream* CreateReadableStream(ExceptionState&);
ScriptState* GetScriptState() const override { return script_state_; }
MessagePort* GetMessagePort() const override { return message_port_; }
void HandleMessage(MessageType type, v8::Local<v8::Value> value) override;
void HandleError(v8::Local<v8::Value> error) override;
void Trace(Visitor* visitor) const override {
visitor->Trace(script_state_);
visitor->Trace(message_port_);
visitor->Trace(controller_);
CrossRealmTransformStream::Trace(visitor);
}
private:
class PullAlgorithm;
class CancelAlgorithm;
const Member<ScriptState> script_state_;
const Member<MessagePort> message_port_;
Member<ReadableStreamDefaultController> controller_;
};
class CrossRealmTransformReadable::PullAlgorithm final
: public StreamAlgorithm {
public:
explicit PullAlgorithm(CrossRealmTransformReadable* readable)
: readable_(readable) {}
// Sends a pull message to the writable side and then waits for backpressure
// to clear.
v8::Local<v8::Promise> Run(ScriptState* script_state,
int argc,
v8::Local<v8::Value> argv[]) override {
DCHECK_EQ(argc, 0);
auto* isolate = script_state->GetIsolate();
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable
// 7. Let pullAlgorithm be the following steps:
v8::Local<v8::Value> error;
// 1. Perform ! PackAndPostMessage(port, "pull", undefined).
// In the standard this can't throw an exception, but in the implementation
// it can, so we need to be able to handle it.
bool success = PackAndPostMessageHandlingError(
script_state, readable_->message_port_, MessageType::kPull,
v8::Undefined(isolate), &error);
if (!success) {
readable_->message_port_->close();
return PromiseReject(script_state, error);
}
// 2. Return a promise resolved with undefined.
// The Streams Standard guarantees that PullAlgorithm won't be called again
// until Enqueue() is called.
return PromiseResolveWithUndefined(script_state);
}
void Trace(Visitor* visitor) const override {
visitor->Trace(readable_);
StreamAlgorithm::Trace(visitor);
}
private:
const Member<CrossRealmTransformReadable> readable_;
};
class CrossRealmTransformReadable::CancelAlgorithm final
: public StreamAlgorithm {
public:
explicit CancelAlgorithm(CrossRealmTransformReadable* readable)
: readable_(readable) {}
// Sends a cancel message to the writable side and closes the message port.
v8::Local<v8::Promise> Run(ScriptState* script_state,
int argc,
v8::Local<v8::Value> argv[]) override {
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable
// 8. Let cancelAlgorithm be the following steps, taking a reason argument:
DCHECK_EQ(argc, 1);
auto reason = argv[0];
v8::Local<v8::Value> error;
// 1. Let result be PackAndPostMessageHandlingError(port, "error",
// reason).
bool success =
PackAndPostMessageHandlingError(script_state, readable_->message_port_,
MessageType::kError, reason, &error);
// 2. Disentangle port.
readable_->message_port_->close();
// 3. If result is an abrupt completion, return a promise rejected with
// result.[[Value]].
if (!success) {
return PromiseReject(script_state, error);
}
// 4. Otherwise, return a promise resolved with undefined.
return PromiseResolveWithUndefined(script_state);
}
void Trace(Visitor* visitor) const override {
visitor->Trace(readable_);
StreamAlgorithm::Trace(visitor);
}
private:
const Member<CrossRealmTransformReadable> readable_;
};
class ConcatenatingUnderlyingSource final : public UnderlyingSourceBase {
public:
using Constant = NewScriptFunction::Constant;
class PullSource2 final : public NewScriptFunction::Callable {
public:
explicit PullSource2(ConcatenatingUnderlyingSource* source)
: source_(source) {}
ScriptValue Call(ScriptState* script_state, ScriptValue value) override {
return source_->source2_->pull(script_state).AsScriptValue();
}
void Trace(Visitor* visitor) const override {
visitor->Trace(source_);
NewScriptFunction::Callable::Trace(visitor);
}
private:
const Member<ConcatenatingUnderlyingSource> source_;
};
class OnReadingSource1Success final : public NewScriptFunction::Callable {
public:
explicit OnReadingSource1Success(ConcatenatingUnderlyingSource* source)
: source_(source) {}
void Trace(Visitor* visitor) const override {
visitor->Trace(source_);
NewScriptFunction::Callable::Trace(visitor);
}
ScriptValue Call(ScriptState* script_state,
ScriptValue read_result) override {
DCHECK(read_result.IsObject());
bool done = false;
v8::Local<v8::Value> value =
V8UnpackIteratorResult(script_state,
read_result.V8Value().As<v8::Object>(), &done)
.ToLocalChecked();
if (done) {
// We've finished reading `source1_`. Let's start reading `source2_`.
source_->has_finished_reading_stream1_ = true;
ReadableStreamDefaultController* controller =
source_->Controller()->GetOriginalController();
return source_->source2_
->startWrapper(script_state,
ScriptValue::From(script_state, controller))
.Then(CreateFunction<PullSource2>(script_state, source_))
.AsScriptValue();
}
source_->Controller()->Enqueue(value);
return ScriptPromise::CastUndefined(script_state).AsScriptValue();
}
private:
const Member<ConcatenatingUnderlyingSource> source_;
};
class OnReadingSource1Fail final : public NewScriptFunction::Callable {
public:
explicit OnReadingSource1Fail(ConcatenatingUnderlyingSource* source)
: source_(source) {}
void Trace(Visitor* visitor) const override {
visitor->Trace(source_);
NewScriptFunction::Callable::Trace(visitor);
}
ScriptValue Call(ScriptState* script_state, ScriptValue value) override {
ScriptValue reason(script_state->GetIsolate(),
v8::Undefined(script_state->GetIsolate()));
ReadableStream* dummy_stream =
ReadableStream::CreateWithCountQueueingStrategy(
script_state, source_->source2_,
/*high_water_mark=*/0);
ExceptionState exception_state(script_state->GetIsolate(),
ExceptionState::kUnknownContext, "", "");
dummy_stream->cancel(script_state, reason, exception_state);
// We don't care about the result of the cancellation, including
// exceptions.
exception_state.ClearException();
return ScriptPromise::Reject(script_state, value).AsScriptValue();
}
private:
const Member<ConcatenatingUnderlyingSource> source_;
};
ConcatenatingUnderlyingSource(ScriptState* script_state,
ReadableStream* stream1,
UnderlyingSourceBase* source2)
: UnderlyingSourceBase(script_state),
stream1_(stream1),
source2_(source2) {}
ScriptPromise Start(ScriptState* script_state) override {
ExceptionState exception_state(script_state->GetIsolate(),
ExceptionState::kUnknownContext, "", "");
reader_for_stream1_ = ReadableStream::AcquireDefaultReader(
script_state, stream1_, /*for_author_code=*/false, exception_state);
if (exception_state.HadException()) {
return ScriptPromise::Reject(script_state, exception_state);
}
DCHECK(reader_for_stream1_);
return ScriptPromise::CastUndefined(script_state);
}
ScriptPromise pull(ScriptState* script_state) override {
if (has_finished_reading_stream1_) {
return source2_->pull(script_state);
}
ExceptionState exception_state(script_state->GetIsolate(),
ExceptionState::kUnknownContext, "", "");
ScriptPromise read_promise =
reader_for_stream1_->read(script_state, exception_state);
if (exception_state.HadException()) {
return ScriptPromise::Reject(script_state, exception_state);
}
return read_promise.Then(
CreateFunction<OnReadingSource1Success>(script_state, this),
CreateFunction<OnReadingSource1Fail>(script_state, this));
}
ScriptPromise Cancel(ScriptState* script_state, ScriptValue reason) override {
if (has_finished_reading_stream1_) {
return source2_->Cancel(script_state, reason);
}
ExceptionState exception_state(script_state->GetIsolate(),
ExceptionState::kUnknownContext, "", "");
ScriptPromise cancel_promise1 =
reader_for_stream1_->cancel(script_state, reason, exception_state);
if (exception_state.HadException()) {
cancel_promise1 = ScriptPromise::Reject(script_state, exception_state);
}
ReadableStream* dummy_stream =
ReadableStream::CreateWithCountQueueingStrategy(script_state, source2_,
/*high_water_mark=*/0);
ScriptPromise cancel_promise2 =
dummy_stream->cancel(script_state, reason, exception_state);
if (exception_state.HadException()) {
cancel_promise2 = ScriptPromise::Reject(script_state, exception_state);
}
return ScriptPromise::All(script_state, {cancel_promise1, cancel_promise2});
}
void Trace(Visitor* visitor) const override {
visitor->Trace(stream1_);
visitor->Trace(reader_for_stream1_);
visitor->Trace(source2_);
UnderlyingSourceBase::Trace(visitor);
}
private:
Member<ReadableStream> stream1_;
Member<ReadableStreamDefaultReader> reader_for_stream1_;
bool has_finished_reading_stream1_ = false;
Member<UnderlyingSourceBase> source2_;
};
ReadableStream* CrossRealmTransformReadable::CreateReadableStream(
ExceptionState& exception_state) {
DCHECK(!controller_) << "CreateReadableStream can only be called once";
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable
// The order of operations is significantly different from the standard, but
// functionally equivalent.
// 3. Add a handler for port’s message event with the following steps:
// 5. Enable port’s port message queue.
message_port_->setOnmessage(
MakeGarbageCollected<CrossRealmTransformMessageListener>(this));
// 4. Add a handler for port’s messageerror event with the following steps:
message_port_->setOnmessageerror(
MakeGarbageCollected<CrossRealmTransformErrorListener>(this));
// 6. Let startAlgorithm be an algorithm that returns undefined.
// 7. Let pullAlgorithm be the following steps:
// 8. Let cancelAlgorithm be the following steps, taking a reason argument:
// 9. Let sizeAlgorithm be an algorithm that returns 1.
// 10. Perform ! SetUpReadableStreamDefaultController(stream, controller,
// startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, sizeAlgorithm).
auto* stream = ReadableStream::Create(
script_state_, CreateTrivialStartAlgorithm(),
MakeGarbageCollected<PullAlgorithm>(this),
MakeGarbageCollected<CancelAlgorithm>(this),
/* highWaterMark = */ 0, CreateDefaultSizeAlgorithm(), exception_state);
if (exception_state.HadException()) {
return nullptr;
}
// The stream is created right above, and the type of the source is not given,
// hence it is guaranteed that the controller is a
// ReadableStreamDefaultController.
controller_ = To<ReadableStreamDefaultController>(stream->GetController());
return stream;
}
void CrossRealmTransformReadable::HandleMessage(MessageType type,
v8::Local<v8::Value> value) {
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable
// 3. Add a handler for port’s message event with the following steps:
// The first 5 steps are handled by CrossRealmTransformMessageListener.
switch (type) {
// 6. If type is "chunk",
case MessageType::kChunk:
// 1. Perform ! ReadableStreamDefaultControllerEnqueue(controller,
// value).
// TODO(ricea): Update ReadableStreamDefaultController::Enqueue() to match
// the standard so this extra check is not needed.
if (ReadableStreamDefaultController::CanCloseOrEnqueue(controller_)) {
// This can't throw because we always use the default strategy size
// algorithm, which doesn't throw, and always returns a valid value of
// 1.0.
ReadableStreamDefaultController::Enqueue(script_state_, controller_,
value, ASSERT_NO_EXCEPTION);
}
return;
// 7. Otherwise, if type is "close",
case MessageType::kClose:
// 1. Perform ! ReadableStreamDefaultControllerClose(controller).
// TODO(ricea): Update ReadableStreamDefaultController::Close() to match
// the standard so this extra check is not needed.
if (ReadableStreamDefaultController::CanCloseOrEnqueue(controller_)) {
ReadableStreamDefaultController::Close(script_state_, controller_);
}
// Disentangle port.
message_port_->close();
return;
// 8. Otherwise, if type is "error",
case MessageType::kError:
// 1. Perform ! ReadableStreamDefaultControllerError(controller, value).
ReadableStreamDefaultController::Error(script_state_, controller_, value);
// 2. Disentangle port.
message_port_->close();
return;
default:
DLOG(WARNING) << "Invalid message from peer ignored (invalid type): "
<< static_cast<int>(type);
return;
}
}
void CrossRealmTransformReadable::HandleError(v8::Local<v8::Value> error) {
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable
// 4. Add a handler for port’s messageerror event with the following steps:
// The first two steps, and the last step, are performed by
// CrossRealmTransformErrorListener.
// 3. Perform ! ReadableStreamDefaultControllerError(controller, error).
ReadableStreamDefaultController::Error(script_state_, controller_, error);
}
} // namespace
CORE_EXPORT WritableStream* CreateCrossRealmTransformWritable(
ScriptState* script_state,
MessagePort* port,
std::unique_ptr<WritableStreamTransferringOptimizer> optimizer,
ExceptionState& exception_state) {
WritableStream* stream =
MakeGarbageCollected<CrossRealmTransformWritable>(script_state, port)
->CreateWritableStream(exception_state);
if (exception_state.HadException()) {
return nullptr;
}
if (!optimizer) {
return stream;
}
UnderlyingSinkBase* sink =
optimizer->PerformInProcessOptimization(script_state);
if (!sink) {
return stream;
}
stream->close(script_state, exception_state);
if (exception_state.HadException()) {
return nullptr;
}
return WritableStream::CreateWithCountQueueingStrategy(script_state, sink,
/*high_water_mark=*/1);
}
CORE_EXPORT ReadableStream* CreateCrossRealmTransformReadable(
ScriptState* script_state,
MessagePort* port,
std::unique_ptr<ReadableStreamTransferringOptimizer> optimizer,
ExceptionState& exception_state) {
ReadableStream* stream =
MakeGarbageCollected<CrossRealmTransformReadable>(script_state, port)
->CreateReadableStream(exception_state);
if (!optimizer) {
return stream;
}
UnderlyingSourceBase* source2 =
optimizer->PerformInProcessOptimization(script_state);
if (!source2) {
return stream;
}
return ReadableStream::CreateWithCountQueueingStrategy(
script_state,
MakeGarbageCollected<ConcatenatingUnderlyingSource>(script_state, stream,
source2),
/*high_water_mark=*/0);
}
ReadableStream* CreateConcatenatedReadableStream(
ScriptState* script_state,
UnderlyingSourceBase* source1,
UnderlyingSourceBase* source2) {
auto* const stream1 =
ReadableStream::CreateWithCountQueueingStrategy(script_state, source1,
/*high_water_mark=*/0);
return ReadableStream::CreateWithCountQueueingStrategy(
script_state,
MakeGarbageCollected<ConcatenatingUnderlyingSource>(script_state, stream1,
source2),
/*high_water_mark=*/0);
}
} // namespace blink