blob: 47374a83ec8ddb873c329a4237373c9bf5098b24 [file] [log] [blame]
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <stddef.h>
#include <stdint.h>
#include "base/containers/span.h"
#include "base/types/strong_alias.h"
#include "mojo/public/cpp/system/data_pipe.h"
#include "mojo/public/cpp/system/simple_watcher.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h"
#include "third_party/blink/renderer/core/execution_context/execution_context_lifecycle_observer.h"
#include "third_party/blink/renderer/modules/modules_export.h"
#include "third_party/blink/renderer/platform/heap/thread_state.h"
namespace v8 {
class Isolate;
namespace blink {
class ScriptState;
class StreamAbortInfo;
class WritableStream;
class WritableStreamDefaultController;
// Implementation of the OutgoingStream mixin from the standard. SendStream and
// BidirectionalStream delegate to this.
class MODULES_EXPORT OutgoingStream final
: public GarbageCollected<OutgoingStream> {
USING_PRE_FINALIZER(OutgoingStream, Dispose);
// An interface for SendStream and BidirectionalStream to implement when using
// this class. At most one of these methods will be called.
class Client : public GarbageCollectedMixin {
virtual ~Client() = default;
// Request that a Fin message for this stream be sent to the server, and
// that the QuicTransport object drop its reference to the stream.
virtual void SendFin() = 0;
// Indicates that this stream is aborted. QuicTransport should drop its
// reference to the stream, and in a bidirectional stream the incoming side
// should be reset.
virtual void OnOutgoingStreamAbort() = 0;
enum class State {
OutgoingStream(ScriptState*, Client*, mojo::ScopedDataPipeProducerHandle);
// Init() must be called before the stream is used.
void Init();
// Implementation of OutgoingStream IDL, used by client classes to implement
// it.
WritableStream* Writable() const {
DVLOG(1) << "OutgoingStream::writable() called";
return writable_;
ScriptPromise WritingAborted() const { return writing_aborted_; }
ScriptState* GetScriptState() { return script_state_; }
void AbortWriting(StreamAbortInfo*);
// Called from QuicTransport via a WebTransportStream. Expects a JavaScript
// scope to be entered.
void Reset();
State GetState() const { return state_; }
// Called from QuicTransport rather than using
// ExecutionContextLifecycleObserver to ensure correct destruction order.
// Does not execute JavaScript.
void ContextDestroyed();
void Trace(Visitor*) const;
class UnderlyingSink;
using IsLocalAbort = base::StrongAlias<class IsLocalAbortTag, bool>;
// Called when |data_pipe_| becomes writable or errored.
void OnHandleReady(MojoResult, const mojo::HandleSignalsState&);
// Called when |data_pipe_| is closed.
void OnPeerClosed(MojoResult, const mojo::HandleSignalsState&);
// Rejects any unfinished write() calls and resets |data_pipe_|.
void HandlePipeClosed();
// Implements UnderlyingSink::write().
ScriptPromise SinkWrite(ScriptState*, ScriptValue chunk, ExceptionState&);
// Writes |data| to |data_pipe_|, possible saving unwritten data to
// |cached_data_|.
ScriptPromise WriteOrCacheData(ScriptState*, base::span<const uint8_t> data);
// Attempts to write some more of |cached_data_| to |data_pipe_|.
void WriteCachedData();
// Writes zero or more bytes of |data| synchronously to |data_pipe_|,
// returning the number of bytes that were written.
size_t WriteDataSynchronously(base::span<const uint8_t> data);
// Creates a DOMException indicating that the stream has been aborted.
// If IsLocalAbort it true it will indicate a locally-initiated abort,
// otherwise it will indicate a remote-initiated abort.
ScriptValue CreateAbortException(IsLocalAbort);
// Errors |writable_|, resolves |writing_aborted_| and resets |data_pipe_|.
// The error message used to error |writable_| depends on whether IsLocalAbort
// is true or not.
void ErrorStreamAbortAndReset(IsLocalAbort);
// Resolve the |writing_aborted_| promise and reset the |data_pipe_|.
void AbortAndReset();
// Resets |data_pipe_| and clears the watchers. Also discards |cached_data_|.
// If the pipe is open it will be closed as a side-effect.
void ResetPipe();
// Prepares the object for destruction.
void Dispose();
class CachedDataBuffer {
CachedDataBuffer(v8::Isolate* isolate, const uint8_t* data, size_t length);
size_t length() const { return length_; }
uint8_t* data() { return buffer_; }
// We need the isolate to call |AdjustAmountOfExternalAllocatedMemory| for
// the memory stored in |buffer_|.
v8::Isolate* isolate_;
size_t length_ = 0u;
uint8_t* buffer_ = nullptr;
const Member<ScriptState> script_state_;
Member<Client> client_;
mojo::ScopedDataPipeProducerHandle data_pipe_;
// Only armed when we need to write something.
mojo::SimpleWatcher write_watcher_;
// Always armed to detect close.
mojo::SimpleWatcher close_watcher_;
// Data which has been passed to write() but still needs to be written
// asynchronously.
// Uses a custom CachedDataBuffer rather than a Vector because
// WTF::Vector is currently limited to 2GB.
// TODO(ricea): Change this to a Vector when it becomes 64-bit safe.
std::unique_ptr<CachedDataBuffer> cached_data_;
// The offset into |cached_data_| of the first byte that still needs to be
// written.
size_t offset_ = 0;
Member<WritableStream> writable_;
Member<WritableStreamDefaultController> controller_;
// Promise returned by the |writingAborted| attribute.
ScriptPromise writing_aborted_;
Member<ScriptPromiseResolver> writing_aborted_resolver_;
// If an asynchronous write() on the underlying sink object is pending, this
// will be non-null.
Member<ScriptPromiseResolver> write_promise_resolver_;
State state_ = State::kOpen;
} // namespace blink