| // Copyright 2019 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "third_party/blink/renderer/modules/webtransport/quic_transport.h" |
| |
| #include <stdint.h> |
| |
| #include <utility> |
| |
| #include "base/numerics/safe_conversions.h" |
| #include "mojo/public/cpp/bindings/remote.h" |
| #include "third_party/blink/public/common/browser_interface_broker_proxy.h" |
| #include "third_party/blink/public/mojom/webtransport/quic_transport_connector.mojom-blink.h" |
| #include "third_party/blink/public/platform/task_type.h" |
| #include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h" |
| #include "third_party/blink/renderer/bindings/core/v8/v8_array_buffer.h" |
| #include "third_party/blink/renderer/bindings/core/v8/v8_array_buffer_view.h" |
| #include "third_party/blink/renderer/bindings/core/v8/v8_throw_dom_exception.h" |
| #include "third_party/blink/renderer/bindings/modules/v8/v8_quic_transport_options.h" |
| #include "third_party/blink/renderer/bindings/modules/v8/v8_rtc_dtls_fingerprint.h" |
| #include "third_party/blink/renderer/bindings/modules/v8/v8_web_transport_close_info.h" |
| #include "third_party/blink/renderer/core/execution_context/execution_context.h" |
| #include "third_party/blink/renderer/core/frame/csp/content_security_policy.h" |
| #include "third_party/blink/renderer/core/frame/web_feature.h" |
| #include "third_party/blink/renderer/core/probe/core_probes.h" |
| #include "third_party/blink/renderer/core/streams/readable_stream.h" |
| #include "third_party/blink/renderer/core/streams/readable_stream_default_controller_with_script_scope.h" |
| #include "third_party/blink/renderer/core/streams/underlying_sink_base.h" |
| #include "third_party/blink/renderer/core/streams/underlying_source_base.h" |
| #include "third_party/blink/renderer/core/streams/writable_stream.h" |
| #include "third_party/blink/renderer/core/typed_arrays/dom_typed_array.h" |
| #include "third_party/blink/renderer/modules/webtransport/bidirectional_stream.h" |
| #include "third_party/blink/renderer/modules/webtransport/receive_stream.h" |
| #include "third_party/blink/renderer/modules/webtransport/send_stream.h" |
| #include "third_party/blink/renderer/modules/webtransport/web_transport_stream.h" |
| #include "third_party/blink/renderer/platform/bindings/exception_code.h" |
| #include "third_party/blink/renderer/platform/bindings/exception_state.h" |
| #include "third_party/blink/renderer/platform/bindings/script_state.h" |
| #include "third_party/blink/renderer/platform/heap/persistent.h" |
| #include "third_party/blink/renderer/platform/heap/visitor.h" |
| #include "third_party/blink/renderer/platform/loader/fetch/unique_identifier.h" |
| #include "third_party/blink/renderer/platform/wtf/functional.h" |
| #include "third_party/blink/renderer/platform/wtf/text/wtf_string.h" |
| #include "third_party/blink/renderer/platform/wtf/vector.h" |
| |
| namespace blink { |
| |
| namespace { |
| |
| // Creates a mojo DataPipe with the options we use for our stream data pipes. On |
| // success, returns true. On failure, throws an exception and returns false. |
| bool CreateStreamDataPipe(mojo::ScopedDataPipeProducerHandle* producer, |
| mojo::ScopedDataPipeConsumerHandle* consumer, |
| ExceptionState& exception_state) { |
| MojoCreateDataPipeOptions options; |
| options.struct_size = sizeof(MojoCreateDataPipeOptions); |
| options.flags = MOJO_CREATE_DATA_PIPE_FLAG_NONE; |
| options.element_num_bytes = 1; |
| // TODO(ricea): Find an appropriate value for capacity_num_bytes. |
| options.capacity_num_bytes = 0; |
| |
| MojoResult result = mojo::CreateDataPipe(&options, *producer, *consumer); |
| if (result != MOJO_RESULT_OK) { |
| // Probably out of resources. |
| exception_state.ThrowDOMException(DOMExceptionCode::kUnknownError, |
| "Insufficient resources."); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| } // namespace |
| |
| // Sends a datagram on write(). |
| class QuicTransport::DatagramUnderlyingSink final : public UnderlyingSinkBase { |
| public: |
| DatagramUnderlyingSink(QuicTransport* quic_transport, int high_water_mark) |
| : quic_transport_(quic_transport), high_water_mark_(high_water_mark) {} |
| |
| ScriptPromise start(ScriptState* script_state, |
| WritableStreamDefaultController*, |
| ExceptionState&) override { |
| return ScriptPromise::CastUndefined(script_state); |
| } |
| |
| ScriptPromise write(ScriptState* script_state, |
| ScriptValue chunk, |
| WritableStreamDefaultController*, |
| ExceptionState& exception_state) override { |
| auto v8chunk = chunk.V8Value(); |
| if (v8chunk->IsArrayBuffer()) { |
| DOMArrayBuffer* data = |
| V8ArrayBuffer::ToImpl(v8chunk.As<v8::ArrayBuffer>()); |
| return SendDatagram( |
| {static_cast<const uint8_t*>(data->Data()), data->ByteLength()}); |
| } |
| |
| auto* isolate = script_state->GetIsolate(); |
| if (v8chunk->IsArrayBufferView()) { |
| NotShared<DOMArrayBufferView> data = |
| ToNotShared<NotShared<DOMArrayBufferView>>(isolate, v8chunk, |
| exception_state); |
| if (exception_state.HadException()) { |
| return ScriptPromise(); |
| } |
| |
| return SendDatagram({static_cast<const uint8_t*>(data->buffer()->Data()) + |
| data->byteOffset(), |
| data->byteLength()}); |
| } |
| |
| exception_state.ThrowTypeError( |
| "Datagram is not an ArrayBuffer or ArrayBufferView type."); |
| return ScriptPromise(); |
| } |
| |
| ScriptPromise close(ScriptState* script_state, ExceptionState&) override { |
| quic_transport_ = nullptr; |
| return ScriptPromise::CastUndefined(script_state); |
| } |
| |
| ScriptPromise abort(ScriptState* script_state, |
| ScriptValue reason, |
| ExceptionState&) override { |
| quic_transport_ = nullptr; |
| return ScriptPromise::CastUndefined(script_state); |
| } |
| |
| void Trace(Visitor* visitor) const override { |
| visitor->Trace(quic_transport_); |
| visitor->Trace(pending_datagrams_); |
| UnderlyingSinkBase::Trace(visitor); |
| } |
| |
| private: |
| ScriptPromise SendDatagram(base::span<const uint8_t> data) { |
| if (!quic_transport_->quic_transport_.is_bound()) { |
| // Silently drop the datagram if we are not connected. |
| // TODO(ricea): Change the behaviour if the standard changes. See |
| // https://github.com/WICG/web-transport/issues/93. |
| return ScriptPromise::CastUndefined(quic_transport_->script_state_); |
| } |
| |
| auto* resolver = MakeGarbageCollected<ScriptPromiseResolver>( |
| quic_transport_->script_state_); |
| pending_datagrams_.push_back(resolver); |
| |
| quic_transport_->quic_transport_->SendDatagram( |
| data, WTF::Bind(&DatagramUnderlyingSink::OnDatagramProcessed, |
| WrapWeakPersistent(this))); |
| if (pending_datagrams_.size() < static_cast<wtf_size_t>(high_water_mark_)) { |
| // In this case we pretend that the datagram is processed immediately, to |
| // get more requests from the stream. |
| return ScriptPromise::CastUndefined(quic_transport_->script_state_); |
| } |
| return resolver->Promise(); |
| } |
| |
| void OnDatagramProcessed(bool sent) { |
| DCHECK(!pending_datagrams_.empty()); |
| |
| ScriptPromiseResolver* resolver = pending_datagrams_.front(); |
| pending_datagrams_.pop_front(); |
| |
| resolver->Resolve(); |
| } |
| |
| Member<QuicTransport> quic_transport_; |
| const int high_water_mark_; |
| HeapDeque<Member<ScriptPromiseResolver>> pending_datagrams_; |
| }; |
| |
| // Captures a pointer to the ReadableStreamDefaultControllerWithScriptScope in |
| // the Start() method, and then does nothing else. Queuing of received datagrams |
| // is done inside the implementation of QuicTransport. |
| class QuicTransport::DatagramUnderlyingSource final |
| : public UnderlyingSourceBase { |
| public: |
| DatagramUnderlyingSource(ScriptState* script_state, |
| QuicTransport* quic_transport) |
| : UnderlyingSourceBase(script_state), quic_transport_(quic_transport) {} |
| |
| ScriptPromise Start(ScriptState* script_state) override { |
| quic_transport_->received_datagrams_controller_ = Controller(); |
| return ScriptPromise::CastUndefined(script_state); |
| } |
| |
| ScriptPromise pull(ScriptState* script_state) override { |
| return ScriptPromise::CastUndefined(script_state); |
| } |
| |
| ScriptPromise Cancel(ScriptState* script_state, ScriptValue reason) override { |
| // Stop Enqueue() from being called again. |
| |
| quic_transport_->received_datagrams_controller_->NoteHasBeenCanceled(); |
| quic_transport_->received_datagrams_controller_ = nullptr; |
| quic_transport_ = nullptr; |
| return ScriptPromise::CastUndefined(script_state); |
| } |
| |
| void Trace(Visitor* visitor) const override { |
| visitor->Trace(quic_transport_); |
| UnderlyingSourceBase::Trace(visitor); |
| } |
| |
| private: |
| Member<QuicTransport> quic_transport_; |
| }; |
| |
| class QuicTransport::StreamVendingUnderlyingSource final |
| : public UnderlyingSourceBase { |
| public: |
| class StreamVendor : public GarbageCollected<StreamVendor> { |
| public: |
| using EnqueueCallback = base::OnceCallback<void(ScriptWrappable*)>; |
| virtual void RequestStream(EnqueueCallback) = 0; |
| virtual void Trace(Visitor*) const {} |
| }; |
| |
| template <class VendorType> |
| static StreamVendingUnderlyingSource* CreateWithVendor( |
| ScriptState* script_state, |
| QuicTransport* quic_transport) { |
| auto* vendor = |
| MakeGarbageCollected<VendorType>(script_state, quic_transport); |
| return MakeGarbageCollected<StreamVendingUnderlyingSource>(script_state, |
| vendor); |
| } |
| |
| StreamVendingUnderlyingSource(ScriptState* script_state, StreamVendor* vendor) |
| : UnderlyingSourceBase(script_state), |
| script_state_(script_state), |
| vendor_(vendor) {} |
| |
| ScriptPromise pull(ScriptState* script_state) override { |
| if (!is_opened_) { |
| is_pull_waiting_ = true; |
| return ScriptPromise::CastUndefined(script_state); |
| } |
| |
| vendor_->RequestStream(WTF::Bind(&StreamVendingUnderlyingSource::Enqueue, |
| WrapWeakPersistent(this))); |
| |
| return ScriptPromise::CastUndefined(script_state); |
| } |
| |
| // Used by QuicTransport to error the stream. |
| void Error(v8::Local<v8::Value> reason) { Controller()->Error(reason); } |
| |
| // Used by QuicTransport to close the stream. |
| void Close() { Controller()->Close(); } |
| |
| // Used by QuicTransport to notify that the QuicTransport interface is |
| // available. |
| void NotifyOpened() { |
| is_opened_ = true; |
| |
| if (is_pull_waiting_) { |
| ScriptState::Scope scope(script_state_); |
| pull(script_state_); |
| is_pull_waiting_ = false; |
| } |
| } |
| |
| void Trace(Visitor* visitor) const override { |
| visitor->Trace(script_state_); |
| visitor->Trace(vendor_); |
| UnderlyingSourceBase::Trace(visitor); |
| } |
| |
| private: |
| void Enqueue(ScriptWrappable* stream) { Controller()->Enqueue(stream); } |
| |
| const Member<ScriptState> script_state_; |
| const Member<StreamVendor> vendor_; |
| bool is_opened_ = false; |
| bool is_pull_waiting_ = false; |
| }; |
| |
| class QuicTransport::ReceiveStreamVendor final |
| : public QuicTransport::StreamVendingUnderlyingSource::StreamVendor { |
| public: |
| ReceiveStreamVendor(ScriptState* script_state, QuicTransport* quic_transport) |
| : script_state_(script_state), quic_transport_(quic_transport) {} |
| |
| void RequestStream(EnqueueCallback enqueue) override { |
| quic_transport_->quic_transport_->AcceptUnidirectionalStream( |
| WTF::Bind(&ReceiveStreamVendor::OnAcceptUnidirectionalStreamResponse, |
| WrapWeakPersistent(this), std::move(enqueue))); |
| } |
| |
| void Trace(Visitor* visitor) const override { |
| visitor->Trace(script_state_); |
| visitor->Trace(quic_transport_); |
| StreamVendor::Trace(visitor); |
| } |
| |
| private: |
| void OnAcceptUnidirectionalStreamResponse( |
| EnqueueCallback enqueue, |
| uint32_t stream_id, |
| mojo::ScopedDataPipeConsumerHandle readable) { |
| ScriptState::Scope scope(script_state_); |
| auto* receive_stream = MakeGarbageCollected<ReceiveStream>( |
| script_state_, quic_transport_, stream_id, std::move(readable)); |
| receive_stream->Init(); |
| // 0xfffffffe and 0xffffffff are reserved values in stream_map_. |
| CHECK_LT(stream_id, 0xfffffffe); |
| quic_transport_->stream_map_.insert(stream_id, receive_stream); |
| |
| std::move(enqueue).Run(receive_stream); |
| } |
| |
| const Member<ScriptState> script_state_; |
| const Member<QuicTransport> quic_transport_; |
| }; |
| |
| class QuicTransport::BidirectionalStreamVendor final |
| : public QuicTransport::StreamVendingUnderlyingSource::StreamVendor { |
| public: |
| BidirectionalStreamVendor(ScriptState* script_state, |
| QuicTransport* quic_transport) |
| : script_state_(script_state), quic_transport_(quic_transport) {} |
| |
| void RequestStream(EnqueueCallback enqueue) override { |
| quic_transport_->quic_transport_->AcceptBidirectionalStream(WTF::Bind( |
| &BidirectionalStreamVendor::OnAcceptBidirectionalStreamResponse, |
| WrapWeakPersistent(this), std::move(enqueue))); |
| } |
| |
| void Trace(Visitor* visitor) const override { |
| visitor->Trace(script_state_); |
| visitor->Trace(quic_transport_); |
| StreamVendor::Trace(visitor); |
| } |
| |
| private: |
| void OnAcceptBidirectionalStreamResponse( |
| EnqueueCallback enqueue, |
| uint32_t stream_id, |
| mojo::ScopedDataPipeConsumerHandle incoming_consumer, |
| mojo::ScopedDataPipeProducerHandle outgoing_producer) { |
| ScriptState::Scope scope(script_state_); |
| auto* bidirectional_stream = MakeGarbageCollected<BidirectionalStream>( |
| script_state_, quic_transport_, stream_id, std::move(outgoing_producer), |
| std::move(incoming_consumer)); |
| bidirectional_stream->Init(); |
| // 0xfffffffe and 0xffffffff are reserved values in stream_map_. |
| CHECK_LT(stream_id, 0xfffffffe); |
| quic_transport_->stream_map_.insert(stream_id, bidirectional_stream); |
| |
| std::move(enqueue).Run(bidirectional_stream); |
| } |
| |
| const Member<ScriptState> script_state_; |
| const Member<QuicTransport> quic_transport_; |
| }; |
| |
| QuicTransport* QuicTransport::Create(ScriptState* script_state, |
| const String& url, |
| QuicTransportOptions* options, |
| ExceptionState& exception_state) { |
| DVLOG(1) << "QuicTransport::Create() url=" << url; |
| DCHECK(options); |
| ExecutionContext::From(script_state)->CountUse(WebFeature::kQuicTransport); |
| auto* transport = |
| MakeGarbageCollected<QuicTransport>(PassKey(), script_state, url); |
| transport->Init(url, *options, exception_state); |
| return transport; |
| } |
| |
| QuicTransport::QuicTransport(PassKey, |
| ScriptState* script_state, |
| const String& url) |
| : QuicTransport(script_state, url, ExecutionContext::From(script_state)) {} |
| |
| QuicTransport::QuicTransport(ScriptState* script_state, |
| const String& url, |
| ExecutionContext* context) |
| : ExecutionContextLifecycleObserver(context), |
| script_state_(script_state), |
| url_(NullURL(), url), |
| quic_transport_(context), |
| handshake_client_receiver_(this, context), |
| client_receiver_(this, context), |
| inspector_transport_id_(CreateUniqueIdentifier()) {} |
| |
| ScriptPromise QuicTransport::createSendStream(ScriptState* script_state, |
| ExceptionState& exception_state) { |
| DVLOG(1) << "QuicTransport::createSendStream() this=" << this; |
| |
| GetExecutionContext()->CountUse(WebFeature::kQuicTransportStreamApis); |
| if (!quic_transport_.is_bound()) { |
| // TODO(ricea): Should we wait if we're still connecting? |
| exception_state.ThrowDOMException(DOMExceptionCode::kNetworkError, |
| "No connection."); |
| return ScriptPromise(); |
| } |
| |
| mojo::ScopedDataPipeProducerHandle data_pipe_producer; |
| mojo::ScopedDataPipeConsumerHandle data_pipe_consumer; |
| |
| if (!CreateStreamDataPipe(&data_pipe_producer, &data_pipe_consumer, |
| exception_state)) { |
| return ScriptPromise(); |
| } |
| |
| auto* resolver = MakeGarbageCollected<ScriptPromiseResolver>(script_state); |
| create_stream_resolvers_.insert(resolver); |
| quic_transport_->CreateStream( |
| std::move(data_pipe_consumer), mojo::ScopedDataPipeProducerHandle(), |
| WTF::Bind(&QuicTransport::OnCreateSendStreamResponse, |
| WrapWeakPersistent(this), WrapWeakPersistent(resolver), |
| std::move(data_pipe_producer))); |
| |
| return resolver->Promise(); |
| } |
| |
| ReadableStream* QuicTransport::receiveStreams() { |
| GetExecutionContext()->CountUse(WebFeature::kQuicTransportStreamApis); |
| return received_streams_; |
| } |
| |
| ScriptPromise QuicTransport::createBidirectionalStream( |
| ScriptState* script_state, |
| ExceptionState& exception_state) { |
| DVLOG(1) << "QuicTransport::createBidirectionalStream() this=" << this; |
| |
| GetExecutionContext()->CountUse(WebFeature::kQuicTransportStreamApis); |
| if (!quic_transport_.is_bound()) { |
| // TODO(ricea): We should wait if we are still connecting. |
| exception_state.ThrowDOMException(DOMExceptionCode::kNetworkError, |
| "No connection."); |
| return ScriptPromise(); |
| } |
| |
| MojoCreateDataPipeOptions options; |
| options.struct_size = sizeof(MojoCreateDataPipeOptions); |
| options.flags = MOJO_CREATE_DATA_PIPE_FLAG_NONE; |
| options.element_num_bytes = 1; |
| // TODO(ricea): Find an appropriate value for capacity_num_bytes. |
| options.capacity_num_bytes = 0; |
| |
| mojo::ScopedDataPipeProducerHandle outgoing_producer; |
| mojo::ScopedDataPipeConsumerHandle outgoing_consumer; |
| if (!CreateStreamDataPipe(&outgoing_producer, &outgoing_consumer, |
| exception_state)) { |
| return ScriptPromise(); |
| } |
| |
| mojo::ScopedDataPipeProducerHandle incoming_producer; |
| mojo::ScopedDataPipeConsumerHandle incoming_consumer; |
| if (!CreateStreamDataPipe(&incoming_producer, &incoming_consumer, |
| exception_state)) { |
| return ScriptPromise(); |
| } |
| |
| auto* resolver = MakeGarbageCollected<ScriptPromiseResolver>(script_state); |
| create_stream_resolvers_.insert(resolver); |
| quic_transport_->CreateStream( |
| std::move(outgoing_consumer), std::move(incoming_producer), |
| WTF::Bind(&QuicTransport::OnCreateBidirectionalStreamResponse, |
| WrapWeakPersistent(this), WrapWeakPersistent(resolver), |
| std::move(outgoing_producer), std::move(incoming_consumer))); |
| |
| return resolver->Promise(); |
| } |
| |
| ReadableStream* QuicTransport::receiveBidirectionalStreams() { |
| GetExecutionContext()->CountUse(WebFeature::kQuicTransportStreamApis); |
| return received_bidirectional_streams_; |
| } |
| |
| WritableStream* QuicTransport::sendDatagrams() { |
| GetExecutionContext()->CountUse(WebFeature::kQuicTransportDatagramApis); |
| return outgoing_datagrams_; |
| } |
| |
| ReadableStream* QuicTransport::receiveDatagrams() { |
| GetExecutionContext()->CountUse(WebFeature::kQuicTransportDatagramApis); |
| return received_datagrams_; |
| } |
| |
| void QuicTransport::close(const WebTransportCloseInfo* close_info) { |
| DVLOG(1) << "QuicTransport::close() this=" << this; |
| // TODO(ricea): Send |close_info| to the network service. |
| |
| if (cleanly_closed_) { |
| // close() has already been called. Ignore it. |
| return; |
| } |
| cleanly_closed_ = true; |
| |
| if (received_datagrams_controller_) { |
| received_datagrams_controller_->Close(); |
| received_datagrams_controller_ = nullptr; |
| } |
| |
| received_streams_underlying_source_->Close(); |
| received_bidirectional_streams_underlying_source_->Close(); |
| |
| // If we don't manage to close the writable stream here, then it will |
| // error when a write() is attempted. |
| if (!WritableStream::IsLocked(outgoing_datagrams_) && |
| !WritableStream::CloseQueuedOrInFlight(outgoing_datagrams_)) { |
| auto promise = WritableStream::Close(script_state_, outgoing_datagrams_); |
| promise->MarkAsHandled(); |
| } |
| closed_resolver_->Resolve(close_info); |
| |
| v8::Local<v8::Value> reason = V8ThrowException::CreateTypeError( |
| script_state_->GetIsolate(), "Connection closed."); |
| ready_resolver_->Reject(reason); |
| RejectPendingStreamResolvers(); |
| ResetAll(); |
| } |
| |
| void QuicTransport::OnConnectionEstablished( |
| mojo::PendingRemote<network::mojom::blink::QuicTransport> quic_transport, |
| mojo::PendingReceiver<network::mojom::blink::QuicTransportClient> |
| client_receiver) { |
| DVLOG(1) << "QuicTransport::OnConnectionEstablished() this=" << this; |
| handshake_client_receiver_.reset(); |
| |
| probe::WebTransportConnectionEstablished(GetExecutionContext(), |
| inspector_transport_id_); |
| |
| auto task_runner = |
| GetExecutionContext()->GetTaskRunner(TaskType::kNetworking); |
| |
| client_receiver_.Bind(std::move(client_receiver), task_runner); |
| client_receiver_.set_disconnect_handler( |
| WTF::Bind(&QuicTransport::OnConnectionError, WrapWeakPersistent(this))); |
| |
| DCHECK(!quic_transport_.is_bound()); |
| quic_transport_.Bind(std::move(quic_transport), task_runner); |
| |
| received_streams_underlying_source_->NotifyOpened(); |
| received_bidirectional_streams_underlying_source_->NotifyOpened(); |
| |
| ready_resolver_->Resolve(); |
| } |
| |
| QuicTransport::~QuicTransport() = default; |
| |
| void QuicTransport::OnHandshakeFailed( |
| network::mojom::blink::QuicTransportErrorPtr error) { |
| // |error| should be null from security/privacy reasons. |
| DCHECK(!error); |
| DVLOG(1) << "QuicTransport::OnHandshakeFailed() this=" << this; |
| ScriptState::Scope scope(script_state_); |
| { |
| v8::Local<v8::Value> reason = V8ThrowException::CreateTypeError( |
| script_state_->GetIsolate(), "Connection lost."); |
| ready_resolver_->Reject(reason); |
| closed_resolver_->Reject(reason); |
| } |
| ResetAll(); |
| } |
| |
| void QuicTransport::OnDatagramReceived(base::span<const uint8_t> data) { |
| ReadableStreamDefaultControllerWithScriptScope* controller = |
| received_datagrams_controller_; |
| |
| // Discard datagrams if the readable has been cancelled. |
| if (!controller) |
| return; |
| |
| // The spec says we should discard older datagrams first, but that's not what |
| // ReadableStream does, so instead we might need to maintain a separate queue |
| // with the desired semantics. But for now we'll just use a small queue in |
| // ReadableStream. |
| // TODO(ricea): Figure out how to get nice semantics here. |
| |
| if (controller->DesiredSize() > 0) { |
| auto* array = DOMUint8Array::Create( |
| data.data(), base::checked_cast<wtf_size_t>(data.size())); |
| controller->Enqueue(array); |
| } |
| } |
| |
| void QuicTransport::OnIncomingStreamClosed(uint32_t stream_id, |
| bool fin_received) { |
| DVLOG(1) << "QuicTransport::OnIncomingStreamClosed(" << stream_id << ", " |
| << fin_received << ") this=" << this; |
| WebTransportStream* stream = stream_map_.at(stream_id); |
| // |stream| can be unset because of races between different ways of closing |
| // |bidirectional streams. |
| if (stream) { |
| stream->OnIncomingStreamClosed(fin_received); |
| } |
| } |
| |
| void QuicTransport::ContextDestroyed() { |
| DVLOG(1) << "QuicTransport::ContextDestroyed() this=" << this; |
| // Child streams must be reset first to ensure that garbage collection |
| // ordering is safe. ContextDestroyed() is required not to execute JavaScript, |
| // so this loop will not be re-entered. |
| for (WebTransportStream* stream : stream_map_.Values()) { |
| stream->ContextDestroyed(); |
| } |
| Dispose(); |
| } |
| |
| bool QuicTransport::HasPendingActivity() const { |
| DVLOG(1) << "QuicTransport::HasPendingActivity() this=" << this; |
| return handshake_client_receiver_.is_bound() || client_receiver_.is_bound(); |
| } |
| |
| void QuicTransport::SendFin(uint32_t stream_id) { |
| quic_transport_->SendFin(stream_id); |
| } |
| |
| void QuicTransport::AbortStream(uint32_t stream_id) { |
| quic_transport_->AbortStream(stream_id, /*code=*/0); |
| } |
| |
| void QuicTransport::ForgetStream(uint32_t stream_id) { |
| stream_map_.erase(stream_id); |
| } |
| |
| void QuicTransport::SetDatagramWritableQueueExpirationDuration( |
| base::TimeDelta duration) { |
| quic_transport_->SetOutgoingDatagramExpirationDuration(duration); |
| } |
| |
| void QuicTransport::Trace(Visitor* visitor) const { |
| visitor->Trace(received_datagrams_); |
| visitor->Trace(received_datagrams_controller_); |
| visitor->Trace(outgoing_datagrams_); |
| visitor->Trace(script_state_); |
| visitor->Trace(create_stream_resolvers_); |
| visitor->Trace(quic_transport_); |
| visitor->Trace(handshake_client_receiver_); |
| visitor->Trace(client_receiver_); |
| visitor->Trace(ready_resolver_); |
| visitor->Trace(ready_); |
| visitor->Trace(closed_resolver_); |
| visitor->Trace(closed_); |
| visitor->Trace(stream_map_); |
| visitor->Trace(received_streams_); |
| visitor->Trace(received_streams_underlying_source_); |
| visitor->Trace(received_bidirectional_streams_); |
| visitor->Trace(received_bidirectional_streams_underlying_source_); |
| ExecutionContextLifecycleObserver::Trace(visitor); |
| ScriptWrappable::Trace(visitor); |
| } |
| |
| void QuicTransport::Init(const String& url, |
| const QuicTransportOptions& options, |
| ExceptionState& exception_state) { |
| DVLOG(1) << "QuicTransport::Init() url=" << url << " this=" << this; |
| if (!url_.IsValid()) { |
| exception_state.ThrowDOMException(DOMExceptionCode::kSyntaxError, |
| "The URL '" + url + "' is invalid."); |
| return; |
| } |
| |
| if (!url_.ProtocolIs("quic-transport")) { |
| exception_state.ThrowDOMException( |
| DOMExceptionCode::kSyntaxError, |
| "The URL's scheme must be 'quic-transport'. '" + url_.Protocol() + |
| "' is not allowed."); |
| return; |
| } |
| |
| if (url_.HasFragmentIdentifier()) { |
| exception_state.ThrowDOMException( |
| DOMExceptionCode::kSyntaxError, |
| "The URL contains a fragment identifier ('#" + |
| url_.FragmentIdentifier() + |
| "'). Fragment identifiers are not allowed in QuicTransport URLs."); |
| return; |
| } |
| |
| ready_resolver_ = MakeGarbageCollected<ScriptPromiseResolver>(script_state_); |
| ready_ = ready_resolver_->Promise(); |
| |
| closed_resolver_ = MakeGarbageCollected<ScriptPromiseResolver>(script_state_); |
| closed_ = closed_resolver_->Promise(); |
| |
| auto* execution_context = GetExecutionContext(); |
| |
| if (!execution_context->GetContentSecurityPolicyForCurrentWorld() |
| ->AllowConnectToSource(url_, url_, RedirectStatus::kNoRedirect)) { |
| // TODO(ricea): This error should probably be asynchronous like it is for |
| // WebSockets and fetch. |
| exception_state.ThrowSecurityError( |
| "Failed to connect to '" + url_.ElidedString() + "'", |
| "Refused to connect to '" + url_.ElidedString() + |
| "' because it violates the document's Content Security Policy"); |
| return; |
| } |
| |
| Vector<network::mojom::blink::QuicTransportCertificateFingerprintPtr> |
| fingerprints; |
| if (options.hasServerCertificateFingerprints()) { |
| for (const auto& fingerprint : options.serverCertificateFingerprints()) { |
| fingerprints.push_back( |
| network::mojom::blink::QuicTransportCertificateFingerprint::New( |
| fingerprint->algorithm(), fingerprint->value())); |
| } |
| } |
| |
| // TODO(ricea): Register SchedulingPolicy so that we don't get throttled and |
| // to disable bfcache. Must be done before shipping. |
| |
| // TODO(ricea): Check the SubresourceFilter and fail asynchronously if |
| // disallowed. Must be done before shipping. |
| |
| mojo::Remote<mojom::blink::QuicTransportConnector> connector; |
| execution_context->GetBrowserInterfaceBroker().GetInterface( |
| connector.BindNewPipeAndPassReceiver( |
| execution_context->GetTaskRunner(TaskType::kNetworking))); |
| |
| connector->Connect( |
| url_, std::move(fingerprints), |
| handshake_client_receiver_.BindNewPipeAndPassRemote( |
| execution_context->GetTaskRunner(TaskType::kNetworking))); |
| |
| handshake_client_receiver_.set_disconnect_handler( |
| WTF::Bind(&QuicTransport::OnConnectionError, WrapWeakPersistent(this))); |
| |
| probe::WebTransportCreated(execution_context, inspector_transport_id_, url_); |
| |
| // The choice of 1 for the ReadableStream means that it will queue one |
| // datagram even when read() is not being called. Unfortunately, that datagram |
| // may become arbitrarily stale. |
| // TODO(ricea): Consider having a datagram queue inside this class instead. |
| received_datagrams_ = ReadableStream::CreateWithCountQueueingStrategy( |
| script_state_, |
| MakeGarbageCollected<DatagramUnderlyingSource>(script_state_, this), 1); |
| int outgoing_datagrams_high_water_mark = 1; |
| if (options.hasDatagramWritableHighWaterMark()) { |
| outgoing_datagrams_high_water_mark = |
| options.datagramWritableHighWaterMark(); |
| } |
| |
| // We create a WritableStream with high water mark 1 and try to mimic the |
| // given high water mark in the Sink, from two reasons: |
| // 1. This is better because we can hide the RTT between the renderer and the |
| // network service. |
| // 2. Keeping datagrams in the renderer would be confusing for the timer for |
| // the datagram |
| // queue in the network service, because the timestamp is taken when the |
| // datagram is added to the queue. |
| outgoing_datagrams_ = WritableStream::CreateWithCountQueueingStrategy( |
| script_state_, |
| MakeGarbageCollected<DatagramUnderlyingSink>( |
| this, outgoing_datagrams_high_water_mark), |
| 1); |
| |
| received_streams_underlying_source_ = |
| StreamVendingUnderlyingSource::CreateWithVendor<ReceiveStreamVendor>( |
| script_state_, this); |
| received_streams_ = ReadableStream::CreateWithCountQueueingStrategy( |
| script_state_, received_streams_underlying_source_, 1); |
| |
| received_bidirectional_streams_underlying_source_ = |
| StreamVendingUnderlyingSource::CreateWithVendor< |
| BidirectionalStreamVendor>(script_state_, this); |
| |
| received_bidirectional_streams_ = |
| ReadableStream::CreateWithCountQueueingStrategy( |
| script_state_, received_bidirectional_streams_underlying_source_, 1); |
| } |
| |
| void QuicTransport::ResetAll() { |
| DVLOG(1) << "QuicTransport::ResetAll() this=" << this; |
| |
| // This loop is safe even if re-entered. It will always terminate because |
| // every iteration erases one entry from the map. |
| while (!stream_map_.IsEmpty()) { |
| auto it = stream_map_.begin(); |
| auto close_proxy = it->value; |
| stream_map_.erase(it); |
| close_proxy->Reset(); |
| } |
| Dispose(); |
| } |
| |
| void QuicTransport::Dispose() { |
| DVLOG(1) << "QuicTransport::Dispose() this=" << this; |
| probe::WebTransportClosed(GetExecutionContext(), inspector_transport_id_); |
| stream_map_.clear(); |
| quic_transport_.reset(); |
| handshake_client_receiver_.reset(); |
| client_receiver_.reset(); |
| } |
| |
| void QuicTransport::OnConnectionError() { |
| DVLOG(1) << "QuicTransport::OnConnectionError() this=" << this; |
| |
| ScriptState::Scope scope(script_state_); |
| if (!cleanly_closed_) { |
| v8::Local<v8::Value> reason = V8ThrowException::CreateTypeError( |
| script_state_->GetIsolate(), "Connection lost."); |
| if (received_datagrams_controller_) { |
| received_datagrams_controller_->Error(reason); |
| received_datagrams_controller_ = nullptr; |
| } |
| received_streams_underlying_source_->Error(reason); |
| received_bidirectional_streams_underlying_source_->Error(reason); |
| WritableStreamDefaultController::ErrorIfNeeded( |
| script_state_, outgoing_datagrams_->Controller(), reason); |
| ready_resolver_->Reject(reason); |
| closed_resolver_->Reject(reason); |
| } |
| |
| RejectPendingStreamResolvers(); |
| ResetAll(); |
| } |
| |
| void QuicTransport::RejectPendingStreamResolvers() { |
| v8::Local<v8::Value> reason = V8ThrowException::CreateTypeError( |
| script_state_->GetIsolate(), "Connection lost."); |
| for (ScriptPromiseResolver* resolver : create_stream_resolvers_) { |
| resolver->Reject(reason); |
| } |
| create_stream_resolvers_.clear(); |
| } |
| |
| void QuicTransport::OnCreateSendStreamResponse( |
| ScriptPromiseResolver* resolver, |
| mojo::ScopedDataPipeProducerHandle producer, |
| bool succeeded, |
| uint32_t stream_id) { |
| DVLOG(1) << "QuicTransport::OnCreateSendStreamResponse() this=" << this |
| << " succeeded=" << succeeded << " stream_id=" << stream_id; |
| |
| // Shouldn't resolve the promise if the execution context has gone away. |
| if (!GetExecutionContext()) |
| return; |
| |
| // Shouldn't resolve the promise if the mojo interface is disconnected. |
| if (!resolver || !create_stream_resolvers_.Take(resolver)) |
| return; |
| |
| ScriptState::Scope scope(script_state_); |
| if (!succeeded) { |
| resolver->Reject(V8ThrowDOMException::CreateOrEmpty( |
| script_state_->GetIsolate(), DOMExceptionCode::kNetworkError, |
| "Failed to create send stream.")); |
| return; |
| } |
| |
| auto* send_stream = MakeGarbageCollected<SendStream>( |
| script_state_, this, stream_id, std::move(producer)); |
| send_stream->Init(); |
| |
| // 0xfffffffe and 0xffffffff are reserved values in stream_map_. |
| CHECK_LT(stream_id, 0xfffffffe); |
| stream_map_.insert(stream_id, send_stream); |
| |
| resolver->Resolve(send_stream); |
| } |
| |
| void QuicTransport::OnCreateBidirectionalStreamResponse( |
| ScriptPromiseResolver* resolver, |
| mojo::ScopedDataPipeProducerHandle outgoing_producer, |
| mojo::ScopedDataPipeConsumerHandle incoming_consumer, |
| bool succeeded, |
| uint32_t stream_id) { |
| DVLOG(1) << "QuicTransport::OnCreateBidirectionalStreamResponse() this=" |
| << this << " succeeded=" << succeeded << " stream_id=" << stream_id; |
| |
| // Shouldn't resolve the promise if the execution context has gone away. |
| if (!GetExecutionContext()) |
| return; |
| |
| // Shouldn't resolve the promise if the mojo interface is disconnected. |
| if (!resolver || !create_stream_resolvers_.Take(resolver)) |
| return; |
| |
| ScriptState::Scope scope(script_state_); |
| if (!succeeded) { |
| resolver->Reject(V8ThrowDOMException::CreateOrEmpty( |
| script_state_->GetIsolate(), DOMExceptionCode::kNetworkError, |
| "Failed to create bidirectional stream.")); |
| return; |
| } |
| |
| auto* bidirectional_stream = MakeGarbageCollected<BidirectionalStream>( |
| script_state_, this, stream_id, std::move(outgoing_producer), |
| std::move(incoming_consumer)); |
| bidirectional_stream->Init(); |
| |
| // 0xfffffffe and 0xffffffff are reserved values in stream_map_. |
| CHECK_LT(stream_id, 0xfffffffe); |
| stream_map_.insert(stream_id, bidirectional_stream); |
| |
| resolver->Resolve(bidirectional_stream); |
| } |
| |
| } // namespace blink |