blob: 3b072d4d45cbf7729319a7d42f22386b1b106f06 [file] [log] [blame]
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/modules/webtransport/incoming_stream.h"
#include <string.h>
#include <utility>
#include "third_party/blink/renderer/bindings/core/v8/script_function.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_array_buffer.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_iterator_result_value.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_throw_dom_exception.h"
#include "third_party/blink/renderer/bindings/modules/v8/v8_stream_abort_info.h"
#include "third_party/blink/renderer/core/execution_context/execution_context.h"
#include "third_party/blink/renderer/core/streams/readable_stream.h"
#include "third_party/blink/renderer/core/streams/readable_stream_default_controller_with_script_scope.h"
#include "third_party/blink/renderer/core/streams/readable_stream_generic_reader.h"
#include "third_party/blink/renderer/core/streams/stream_promise_resolver.h"
#include "third_party/blink/renderer/core/streams/underlying_source_base.h"
#include "third_party/blink/renderer/core/typed_arrays/array_buffer/array_buffer_contents.h"
#include "third_party/blink/renderer/core/typed_arrays/dom_array_buffer.h"
#include "third_party/blink/renderer/core/typed_arrays/dom_typed_array.h"
#include "third_party/blink/renderer/platform/bindings/exception_code.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/bindings/script_state.h"
#include "third_party/blink/renderer/platform/bindings/trace_wrapper_v8_reference.h"
#include "third_party/blink/renderer/platform/heap/heap.h"
#include "third_party/blink/renderer/platform/heap/persistent.h"
#include "third_party/blink/renderer/platform/wtf/assertions.h"
#include "third_party/blink/renderer/platform/wtf/functional.h"
#include "v8/include/v8.h"
namespace blink {
// An implementation of UnderlyingSourceBase that forwards all operations to the
// IncomingStream object that created it.
class IncomingStream::UnderlyingSource final : public UnderlyingSourceBase {
public:
explicit UnderlyingSource(ScriptState* script_state, IncomingStream* stream)
: UnderlyingSourceBase(script_state), incoming_stream_(stream) {}
ScriptPromise Start(ScriptState* script_state) override {
incoming_stream_->controller_ = Controller();
return ScriptPromise::CastUndefined(script_state);
}
ScriptPromise pull(ScriptState* script_state) override {
incoming_stream_->ReadFromPipeAndEnqueue();
return ScriptPromise::CastUndefined(script_state);
}
ScriptPromise Cancel(ScriptState* script_state, ScriptValue reason) override {
incoming_stream_->AbortAndReset();
return ScriptPromise::CastUndefined(script_state);
}
void Trace(Visitor* visitor) const override {
visitor->Trace(incoming_stream_);
UnderlyingSourceBase::Trace(visitor);
}
private:
const Member<IncomingStream> incoming_stream_;
};
IncomingStream::IncomingStream(ScriptState* script_state,
base::OnceClosure on_abort,
mojo::ScopedDataPipeConsumerHandle handle)
: script_state_(script_state),
on_abort_(std::move(on_abort)),
data_pipe_(std::move(handle)),
read_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL),
close_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) {}
IncomingStream::~IncomingStream() = default;
void IncomingStream::Init() {
DVLOG(1) << "IncomingStream::Init() this=" << this;
read_watcher_.Watch(data_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
WTF::BindRepeating(&IncomingStream::OnHandleReady,
WrapWeakPersistent(this)));
close_watcher_.Watch(data_pipe_.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
WTF::BindRepeating(&IncomingStream::OnPeerClosed,
WrapWeakPersistent(this)));
// This object cannot be garbage collected as long as
// |reading_aborted_resolver_| is set and the ExecutionContext has not been
// destroyed, so it is guaranteed that that conditions in the
// ScriptPromiseResolver destructor will be satisfied.
reading_aborted_resolver_ =
MakeGarbageCollected<ScriptPromiseResolver>(script_state_);
reading_aborted_ = reading_aborted_resolver_->Promise();
readable_ = ReadableStream::CreateWithCountQueueingStrategy(
script_state_,
MakeGarbageCollected<UnderlyingSource>(script_state_, this), 1);
}
void IncomingStream::OnIncomingStreamClosed(bool fin_received) {
DVLOG(1) << "IncomingStream::OnIncomingStreamClosed(" << fin_received
<< ") this=" << this;
DCHECK_NE(state_, State::kClosed);
state_ = State::kClosed;
DCHECK(!fin_received_.has_value());
fin_received_ = fin_received;
// Wait until HandlePipeClosed() has also been called before processing the
// close.
if (is_pipe_closed_) {
// We need a JavaScript scope to be entered in order to resolve the
// |reading_aborted_| promise.
ScriptState::Scope scope(script_state_);
ProcessClose();
}
}
void IncomingStream::AbortReading(StreamAbortInfo*) {
DVLOG(1) << "IncomingStream::abortReading() this=" << this;
CloseAbortAndReset();
}
void IncomingStream::Reset() {
DVLOG(1) << "IncomingStream::Reset() this=" << this;
// We no longer need to call |on_abort_|.
on_abort_.Reset();
ErrorStreamAbortAndReset(CreateAbortException(IsLocalAbort(false)));
}
void IncomingStream::ContextDestroyed() {
DVLOG(1) << "IncomingStream::ContextDestroyed() this=" << this;
ResetPipe();
}
void IncomingStream::Trace(Visitor* visitor) const {
visitor->Trace(script_state_);
visitor->Trace(readable_);
visitor->Trace(controller_);
visitor->Trace(reading_aborted_);
visitor->Trace(reading_aborted_resolver_);
}
void IncomingStream::OnHandleReady(MojoResult result,
const mojo::HandleSignalsState&) {
DVLOG(1) << "IncomingStream::OnHandleReady() this=" << this
<< " result=" << result;
switch (result) {
case MOJO_RESULT_OK:
ReadFromPipeAndEnqueue();
break;
case MOJO_RESULT_FAILED_PRECONDITION:
// Will be handled by |close_watcher_|.
break;
default:
NOTREACHED();
}
}
void IncomingStream::OnPeerClosed(MojoResult result,
const mojo::HandleSignalsState&) {
DVLOG(1) << "IncomingStream::OnPeerClosed() this=" << this
<< " result=" << result;
switch (result) {
case MOJO_RESULT_OK:
HandlePipeClosed();
break;
default:
NOTREACHED();
}
}
void IncomingStream::HandlePipeClosed() {
DVLOG(1) << "IncomingStream::HandlePipeClosed() this=" << this;
DCHECK(!is_pipe_closed_);
is_pipe_closed_ = true;
// Reset the pipe immediately to prevent being called in a loop.
ResetPipe();
// Wait until OnIncomingStreamClosed() has also been called before processing
// the close.
if (fin_received_.has_value()) {
// We need a JavaScript scope to be entered in order to resolve the
// |reading_aborted_| promise.
ScriptState::Scope scope(script_state_);
ProcessClose();
}
}
void IncomingStream::ProcessClose() {
DVLOG(1) << "IncomingStream::ProcessClose() this=" << this;
DCHECK(fin_received_.has_value());
if (fin_received_.value()) {
CloseAbortAndReset();
}
ErrorStreamAbortAndReset(CreateAbortException(IsLocalAbort(false)));
}
void IncomingStream::ReadFromPipeAndEnqueue() {
DVLOG(1) << "IncomingStream::ReadFromPipeAndEnqueue() this=" << this
<< " in_two_phase_read_=" << in_two_phase_read_
<< " read_pending_=" << read_pending_;
// Protect against re-entrancy.
if (in_two_phase_read_) {
read_pending_ = true;
return;
}
DCHECK(!read_pending_);
const void* buffer = nullptr;
uint32_t buffer_num_bytes = 0;
auto result = data_pipe_->BeginReadData(&buffer, &buffer_num_bytes,
MOJO_BEGIN_READ_DATA_FLAG_NONE);
switch (result) {
case MOJO_RESULT_OK: {
in_two_phase_read_ = true;
// EnqueueBytes() may re-enter this method via pull().
EnqueueBytes(buffer, buffer_num_bytes);
data_pipe_->EndReadData(buffer_num_bytes);
in_two_phase_read_ = false;
if (read_pending_) {
read_pending_ = false;
// pull() will not be called when another pull() is in progress, so the
// maximum recursion depth is 1.
ReadFromPipeAndEnqueue();
}
break;
}
case MOJO_RESULT_SHOULD_WAIT:
read_watcher_.ArmOrNotify();
return;
case MOJO_RESULT_FAILED_PRECONDITION:
// This will be handled by close_watcher_.
return;
default:
NOTREACHED() << "Unexpected result: " << result;
return;
}
}
void IncomingStream::EnqueueBytes(const void* source, uint32_t byte_length) {
DVLOG(1) << "IncomingStream::EnqueueBytes() this=" << this;
auto* buffer =
DOMUint8Array::Create(static_cast<const uint8_t*>(source), byte_length);
controller_->Enqueue(buffer);
}
ScriptValue IncomingStream::CreateAbortException(IsLocalAbort is_local_abort) {
DVLOG(1) << "IncomingStream::CreateAbortException() this=" << this
<< " is_local_abort=" << static_cast<bool>(is_local_abort);
DOMExceptionCode code = is_local_abort ? DOMExceptionCode::kAbortError
: DOMExceptionCode::kNetworkError;
String message =
String::Format("The stream was aborted %s",
is_local_abort ? "locally" : "by the remote server");
return ScriptValue(script_state_->GetIsolate(),
V8ThrowDOMException::CreateOrEmpty(
script_state_->GetIsolate(), code, message));
}
void IncomingStream::CloseAbortAndReset() {
DVLOG(1) << "IncomingStream::CloseAbortAndReset() this=" << this;
if (controller_) {
controller_->Close();
controller_ = nullptr;
}
AbortAndReset();
}
void IncomingStream::ErrorStreamAbortAndReset(ScriptValue exception) {
DVLOG(1) << "IncomingStream::ErrorStreamAbortAndReset() this=" << this;
if (controller_) {
controller_->Error(exception);
controller_ = nullptr;
}
AbortAndReset();
}
void IncomingStream::AbortAndReset() {
DVLOG(1) << "IncomingStream::AbortAndReset() this=" << this;
if (reading_aborted_resolver_) {
// TODO(ricea): Set errorCode on the StreamAbortInfo.
reading_aborted_resolver_->Resolve(StreamAbortInfo::Create());
reading_aborted_resolver_ = nullptr;
}
state_ = State::kAborted;
if (on_abort_) {
// Cause QuicTransport to drop its reference to us.
std::move(on_abort_).Run();
}
ResetPipe();
}
void IncomingStream::ResetPipe() {
DVLOG(1) << "IncomingStream::ResetPipe() this=" << this;
read_watcher_.Cancel();
close_watcher_.Cancel();
data_pipe_.reset();
}
void IncomingStream::Dispose() {
DVLOG(1) << "IncomingStream::Dispose() this=" << this;
ResetPipe();
}
} // namespace blink