| // Copyright 2014 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "third_party/blink/renderer/core/fetch/body_stream_buffer.h" |
| |
| #include <memory> |
| #include "base/auto_reset.h" |
| #include "third_party/blink/renderer/core/execution_context/execution_context.h" |
| #include "third_party/blink/renderer/core/fetch/body.h" |
| #include "third_party/blink/renderer/core/fetch/bytes_consumer_tee.h" |
| #include "third_party/blink/renderer/core/fetch/bytes_uploader.h" |
| #include "third_party/blink/renderer/core/fetch/readable_stream_bytes_consumer.h" |
| #include "third_party/blink/renderer/core/streams/readable_stream.h" |
| #include "third_party/blink/renderer/core/streams/readable_stream_default_controller_with_script_scope.h" |
| #include "third_party/blink/renderer/core/typed_arrays/dom_array_buffer.h" |
| #include "third_party/blink/renderer/core/typed_arrays/dom_typed_array.h" |
| #include "third_party/blink/renderer/platform/bindings/exception_code.h" |
| #include "third_party/blink/renderer/platform/bindings/exception_state.h" |
| #include "third_party/blink/renderer/platform/bindings/script_state.h" |
| #include "third_party/blink/renderer/platform/bindings/v8_throw_exception.h" |
| #include "third_party/blink/renderer/platform/blob/blob_data.h" |
| #include "third_party/blink/renderer/platform/heap/heap.h" |
| #include "third_party/blink/renderer/platform/loader/fetch/script_cached_metadata_handler.h" |
| #include "third_party/blink/renderer/platform/network/encoded_form_data.h" |
| #include "third_party/blink/renderer/platform/wtf/assertions.h" |
| #include "third_party/blink/renderer/platform/wtf/functional.h" |
| #include "third_party/blink/renderer/platform/wtf/std_lib_extras.h" |
| |
| namespace blink { |
| |
| class BodyStreamBuffer::LoaderClient final |
| : public GarbageCollected<LoaderClient>, |
| public ExecutionContextLifecycleObserver, |
| public FetchDataLoader::Client { |
| public: |
| LoaderClient(ExecutionContext* execution_context, |
| BodyStreamBuffer* buffer, |
| FetchDataLoader::Client* client) |
| : ExecutionContextLifecycleObserver(execution_context), |
| buffer_(buffer), |
| client_(client) {} |
| |
| void DidFetchDataLoadedBlobHandle( |
| scoped_refptr<BlobDataHandle> blob_data_handle) override { |
| buffer_->EndLoading(); |
| client_->DidFetchDataLoadedBlobHandle(std::move(blob_data_handle)); |
| } |
| |
| void DidFetchDataLoadedArrayBuffer(DOMArrayBuffer* array_buffer) override { |
| buffer_->EndLoading(); |
| client_->DidFetchDataLoadedArrayBuffer(array_buffer); |
| } |
| |
| void DidFetchDataLoadedFormData(FormData* form_data) override { |
| buffer_->EndLoading(); |
| client_->DidFetchDataLoadedFormData(form_data); |
| } |
| |
| void DidFetchDataLoadedString(const String& string) override { |
| buffer_->EndLoading(); |
| client_->DidFetchDataLoadedString(string); |
| } |
| |
| void DidFetchDataStartedDataPipe( |
| mojo::ScopedDataPipeConsumerHandle data_pipe) override { |
| client_->DidFetchDataStartedDataPipe(std::move(data_pipe)); |
| } |
| |
| void DidFetchDataLoadedDataPipe() override { |
| buffer_->EndLoading(); |
| client_->DidFetchDataLoadedDataPipe(); |
| } |
| |
| void DidFetchDataLoadedCustomFormat() override { |
| buffer_->EndLoading(); |
| client_->DidFetchDataLoadedCustomFormat(); |
| } |
| |
| void DidFetchDataLoadFailed() override { |
| buffer_->EndLoading(); |
| client_->DidFetchDataLoadFailed(); |
| } |
| |
| void Abort() override { NOTREACHED(); } |
| |
| void Trace(Visitor* visitor) const override { |
| visitor->Trace(buffer_); |
| visitor->Trace(client_); |
| ExecutionContextLifecycleObserver::Trace(visitor); |
| FetchDataLoader::Client::Trace(visitor); |
| } |
| |
| private: |
| void ContextDestroyed() override { buffer_->StopLoading(); } |
| |
| Member<BodyStreamBuffer> buffer_; |
| Member<FetchDataLoader::Client> client_; |
| DISALLOW_COPY_AND_ASSIGN(LoaderClient); |
| }; |
| |
| // Use a Create() method to split construction from initialisation. |
| // Initialisation may result in nested calls to ContextDestroyed() and so is not |
| // safe to do during construction. |
| |
| // static |
| BodyStreamBuffer* BodyStreamBuffer::Create( |
| ScriptState* script_state, |
| BytesConsumer* consumer, |
| AbortSignal* signal, |
| ScriptCachedMetadataHandler* cached_metadata_handler, |
| scoped_refptr<BlobDataHandle> side_data_blob) { |
| auto* buffer = MakeGarbageCollected<BodyStreamBuffer>( |
| PassKey(), script_state, consumer, signal, cached_metadata_handler, |
| std::move(side_data_blob)); |
| buffer->Init(); |
| return buffer; |
| } |
| |
| BodyStreamBuffer::BodyStreamBuffer( |
| PassKey, |
| ScriptState* script_state, |
| BytesConsumer* consumer, |
| AbortSignal* signal, |
| ScriptCachedMetadataHandler* cached_metadata_handler, |
| scoped_refptr<BlobDataHandle> side_data_blob) |
| : UnderlyingSourceBase(script_state), |
| script_state_(script_state), |
| consumer_(consumer), |
| signal_(signal), |
| cached_metadata_handler_(cached_metadata_handler), |
| side_data_blob_(std::move(side_data_blob)), |
| made_from_readable_stream_(false) {} |
| |
| void BodyStreamBuffer::Init() { |
| DCHECK(consumer_); |
| |
| stream_ = |
| ReadableStream::CreateWithCountQueueingStrategy(script_state_, this, 0); |
| stream_broken_ = !stream_; |
| |
| // ContextDestroyed() can be called inside the ReadableStream constructor when |
| // a worker thread is being terminated. See https://crbug.com/1007162 for |
| // details. If consumer_ is null, assume that this happened and this object |
| // will never actually be used, and so it is fine to skip the rest of |
| // initialisation. |
| if (!consumer_) |
| return; |
| |
| consumer_->SetClient(this); |
| if (signal_) { |
| if (signal_->aborted()) { |
| Abort(); |
| } else { |
| signal_->AddAlgorithm( |
| WTF::Bind(&BodyStreamBuffer::Abort, WrapWeakPersistent(this))); |
| } |
| } |
| OnStateChange(); |
| } |
| |
| BodyStreamBuffer::BodyStreamBuffer( |
| ScriptState* script_state, |
| ReadableStream* stream, |
| ScriptCachedMetadataHandler* cached_metadata_handler, |
| scoped_refptr<BlobDataHandle> side_data_blob) |
| : UnderlyingSourceBase(script_state), |
| script_state_(script_state), |
| stream_(stream), |
| signal_(nullptr), |
| cached_metadata_handler_(cached_metadata_handler), |
| side_data_blob_(std::move(side_data_blob)), |
| made_from_readable_stream_(true) { |
| DCHECK(stream_); |
| } |
| |
| scoped_refptr<BlobDataHandle> BodyStreamBuffer::DrainAsBlobDataHandle( |
| BytesConsumer::BlobSizePolicy policy) { |
| DCHECK(!IsStreamLocked()); |
| DCHECK(!IsStreamDisturbed()); |
| if (IsStreamClosed() || IsStreamErrored() || stream_broken_) |
| return nullptr; |
| |
| if (made_from_readable_stream_) |
| return nullptr; |
| |
| scoped_refptr<BlobDataHandle> blob_data_handle = |
| consumer_->DrainAsBlobDataHandle(policy); |
| if (blob_data_handle) { |
| CloseAndLockAndDisturb(); |
| return blob_data_handle; |
| } |
| return nullptr; |
| } |
| |
| scoped_refptr<EncodedFormData> BodyStreamBuffer::DrainAsFormData() { |
| DCHECK(!IsStreamLocked()); |
| DCHECK(!IsStreamDisturbed()); |
| if (IsStreamClosed() || IsStreamErrored() || stream_broken_) |
| return nullptr; |
| |
| if (made_from_readable_stream_) |
| return nullptr; |
| |
| scoped_refptr<EncodedFormData> form_data = consumer_->DrainAsFormData(); |
| if (form_data) { |
| CloseAndLockAndDisturb(); |
| return form_data; |
| } |
| return nullptr; |
| } |
| |
| void BodyStreamBuffer::DrainAsChunkedDataPipeGetter( |
| ScriptState* script_state, |
| mojo::PendingReceiver<network::mojom::blink::ChunkedDataPipeGetter> |
| pending_receiver) { |
| DCHECK(!IsStreamLocked()); |
| auto* consumer = |
| MakeGarbageCollected<ReadableStreamBytesConsumer>(script_state, stream_); |
| stream_uploader_ = MakeGarbageCollected<BytesUploader>( |
| consumer, std::move(pending_receiver), |
| ExecutionContext::From(script_state) |
| ->GetTaskRunner(TaskType::kNetworking)); |
| } |
| |
| void BodyStreamBuffer::StartLoading(FetchDataLoader* loader, |
| FetchDataLoader::Client* client, |
| ExceptionState& exception_state) { |
| DCHECK(!loader_); |
| DCHECK(script_state_->ContextIsValid()); |
| loader_ = loader; |
| if (signal_) { |
| if (signal_->aborted()) { |
| client->Abort(); |
| return; |
| } |
| signal_->AddAlgorithm( |
| WTF::Bind(&FetchDataLoader::Client::Abort, WrapWeakPersistent(client))); |
| } |
| auto* handle = ReleaseHandle(exception_state); |
| if (exception_state.HadException()) |
| return; |
| loader->Start(handle, |
| MakeGarbageCollected<LoaderClient>( |
| ExecutionContext::From(script_state_), this, client)); |
| } |
| |
| void BodyStreamBuffer::Tee(BodyStreamBuffer** branch1, |
| BodyStreamBuffer** branch2, |
| ExceptionState& exception_state) { |
| DCHECK(!IsStreamLocked()); |
| DCHECK(!IsStreamDisturbed()); |
| *branch1 = nullptr; |
| *branch2 = nullptr; |
| auto* cached_metadata_handler = cached_metadata_handler_.Get(); |
| scoped_refptr<BlobDataHandle> side_data_blob = TakeSideDataBlob(); |
| |
| if (made_from_readable_stream_) { |
| if (stream_broken_) { |
| // We don't really know what state the stream is in, so throw an exception |
| // rather than making things worse. |
| exception_state.ThrowDOMException( |
| DOMExceptionCode::kInvalidStateError, |
| "Unsafe to tee stream in unknown state"); |
| return; |
| } |
| ReadableStream* stream1 = nullptr; |
| ReadableStream* stream2 = nullptr; |
| |
| stream_->Tee(script_state_, &stream1, &stream2, exception_state); |
| if (exception_state.HadException()) { |
| stream_broken_ = true; |
| return; |
| } |
| |
| *branch1 = MakeGarbageCollected<BodyStreamBuffer>( |
| script_state_, stream1, cached_metadata_handler, side_data_blob); |
| *branch2 = MakeGarbageCollected<BodyStreamBuffer>( |
| script_state_, stream2, cached_metadata_handler, side_data_blob); |
| return; |
| } |
| BytesConsumer* dest1 = nullptr; |
| BytesConsumer* dest2 = nullptr; |
| auto* handle = ReleaseHandle(exception_state); |
| if (exception_state.HadException()) { |
| stream_broken_ = true; |
| return; |
| } |
| BytesConsumerTee(ExecutionContext::From(script_state_), handle, &dest1, |
| &dest2); |
| *branch1 = BodyStreamBuffer::Create(script_state_, dest1, signal_, |
| cached_metadata_handler, side_data_blob); |
| *branch2 = BodyStreamBuffer::Create(script_state_, dest2, signal_, |
| cached_metadata_handler, side_data_blob); |
| } |
| |
| ScriptPromise BodyStreamBuffer::pull(ScriptState* script_state) { |
| DCHECK_EQ(script_state, script_state_); |
| if (!consumer_) { |
| // This is a speculative workaround for a crash. See |
| // https://crbug.com/773525. |
| // TODO(yhirano): Remove this branch or have a better comment. |
| return ScriptPromise::CastUndefined(script_state); |
| } |
| |
| if (stream_needs_more_) |
| return ScriptPromise::CastUndefined(script_state); |
| stream_needs_more_ = true; |
| if (!in_process_data_) |
| ProcessData(); |
| return ScriptPromise::CastUndefined(script_state); |
| } |
| |
| ScriptPromise BodyStreamBuffer::Cancel(ScriptState* script_state, |
| ScriptValue reason) { |
| DCHECK_EQ(script_state, script_state_); |
| if (Controller()) |
| Controller()->Close(); |
| CancelConsumer(); |
| return ScriptPromise::CastUndefined(script_state); |
| } |
| |
| void BodyStreamBuffer::OnStateChange() { |
| if (!consumer_ || !GetExecutionContext() || |
| GetExecutionContext()->IsContextDestroyed()) |
| return; |
| |
| switch (consumer_->GetPublicState()) { |
| case BytesConsumer::PublicState::kReadableOrWaiting: |
| break; |
| case BytesConsumer::PublicState::kClosed: |
| Close(); |
| return; |
| case BytesConsumer::PublicState::kErrored: |
| GetError(); |
| return; |
| } |
| ProcessData(); |
| } |
| |
| bool BodyStreamBuffer::HasPendingActivity() const { |
| return loader_; |
| } |
| |
| void BodyStreamBuffer::ContextDestroyed() { |
| CancelConsumer(); |
| UnderlyingSourceBase::ContextDestroyed(); |
| } |
| |
| bool BodyStreamBuffer::IsStreamReadable() const { |
| return stream_->IsReadable(); |
| } |
| |
| bool BodyStreamBuffer::IsStreamClosed() const { |
| return stream_->IsClosed(); |
| } |
| |
| bool BodyStreamBuffer::IsStreamErrored() const { |
| return stream_->IsErrored(); |
| } |
| |
| bool BodyStreamBuffer::IsStreamLocked() const { |
| return stream_->IsLocked(); |
| } |
| |
| bool BodyStreamBuffer::IsStreamDisturbed() const { |
| return stream_->IsDisturbed(); |
| } |
| |
| void BodyStreamBuffer::CloseAndLockAndDisturb() { |
| DCHECK(!stream_broken_); |
| |
| cached_metadata_handler_ = nullptr; |
| |
| if (IsStreamReadable()) { |
| // Note that the stream cannot be "draining", because it doesn't have |
| // the internal buffer. |
| Close(); |
| } |
| |
| stream_->LockAndDisturb(script_state_); |
| } |
| |
| bool BodyStreamBuffer::IsAborted() { |
| if (!signal_) |
| return false; |
| return signal_->aborted(); |
| } |
| |
| scoped_refptr<BlobDataHandle> BodyStreamBuffer::TakeSideDataBlob() { |
| return std::move(side_data_blob_); |
| } |
| |
| void BodyStreamBuffer::Trace(Visitor* visitor) const { |
| visitor->Trace(script_state_); |
| visitor->Trace(stream_); |
| visitor->Trace(stream_uploader_); |
| visitor->Trace(consumer_); |
| visitor->Trace(loader_); |
| visitor->Trace(signal_); |
| visitor->Trace(cached_metadata_handler_); |
| UnderlyingSourceBase::Trace(visitor); |
| } |
| |
| void BodyStreamBuffer::Abort() { |
| if (!Controller()) { |
| DCHECK(!GetExecutionContext()); |
| DCHECK(!consumer_); |
| return; |
| } |
| Controller()->Error( |
| MakeGarbageCollected<DOMException>(DOMExceptionCode::kAbortError)); |
| CancelConsumer(); |
| } |
| |
| void BodyStreamBuffer::Close() { |
| // Close() can be called during construction, in which case Controller() |
| // will not be set yet. |
| if (Controller()) |
| Controller()->Close(); |
| CancelConsumer(); |
| } |
| |
| void BodyStreamBuffer::GetError() { |
| { |
| ScriptState::Scope scope(script_state_); |
| Controller()->Error(V8ThrowException::CreateTypeError( |
| script_state_->GetIsolate(), "network error")); |
| } |
| CancelConsumer(); |
| } |
| |
| void BodyStreamBuffer::CancelConsumer() { |
| side_data_blob_.reset(); |
| if (consumer_) { |
| consumer_->Cancel(); |
| consumer_ = nullptr; |
| } |
| } |
| |
| void BodyStreamBuffer::ProcessData() { |
| DCHECK(consumer_); |
| DCHECK(!in_process_data_); |
| |
| base::AutoReset<bool> auto_reset(&in_process_data_, true); |
| while (stream_needs_more_) { |
| const char* buffer = nullptr; |
| size_t available = 0; |
| auto result = consumer_->BeginRead(&buffer, &available); |
| if (result == BytesConsumer::Result::kShouldWait) |
| return; |
| DOMUint8Array* array = nullptr; |
| if (result == BytesConsumer::Result::kOk) { |
| array = |
| DOMUint8Array::Create(reinterpret_cast<const unsigned char*>(buffer), |
| SafeCast<uint32_t>(available)); |
| result = consumer_->EndRead(available); |
| } |
| switch (result) { |
| case BytesConsumer::Result::kOk: |
| case BytesConsumer::Result::kDone: |
| if (array) { |
| // Clear m_streamNeedsMore in order to detect a pull call. |
| stream_needs_more_ = false; |
| Controller()->Enqueue(array); |
| } |
| if (result == BytesConsumer::Result::kDone) { |
| Close(); |
| return; |
| } |
| // If m_streamNeedsMore is true, it means that pull is called and |
| // the stream needs more data even if the desired size is not |
| // positive. |
| if (!stream_needs_more_) |
| stream_needs_more_ = Controller()->DesiredSize() > 0; |
| break; |
| case BytesConsumer::Result::kShouldWait: |
| NOTREACHED(); |
| return; |
| case BytesConsumer::Result::kError: |
| GetError(); |
| return; |
| } |
| } |
| } |
| |
| void BodyStreamBuffer::EndLoading() { |
| DCHECK(loader_); |
| loader_ = nullptr; |
| } |
| |
| void BodyStreamBuffer::StopLoading() { |
| if (!loader_) |
| return; |
| loader_->Cancel(); |
| loader_ = nullptr; |
| } |
| |
| BytesConsumer* BodyStreamBuffer::ReleaseHandle( |
| ExceptionState& exception_state) { |
| DCHECK(!IsStreamLocked()); |
| DCHECK(!IsStreamDisturbed()); |
| |
| if (stream_broken_) { |
| exception_state.ThrowDOMException( |
| DOMExceptionCode::kInvalidStateError, |
| "Body stream has suffered a fatal error and cannot be inspected"); |
| return nullptr; |
| } |
| |
| if (!GetExecutionContext()) { |
| // Avoid crashing if ContextDestroyed() has been called. |
| exception_state.ThrowDOMException( |
| DOMExceptionCode::kInvalidStateError, |
| "Cannot release body in a window or worker than has been detached"); |
| return nullptr; |
| } |
| |
| // Do this after state checks to avoid side-effects when the method does |
| // nothing. |
| side_data_blob_.reset(); |
| |
| if (made_from_readable_stream_) { |
| DCHECK(script_state_->ContextIsValid()); |
| ScriptState::Scope scope(script_state_); |
| return MakeGarbageCollected<ReadableStreamBytesConsumer>(script_state_, |
| stream_); |
| } |
| // We need to call these before calling CloseAndLockAndDisturb. |
| const bool is_closed = IsStreamClosed(); |
| const bool is_errored = IsStreamErrored(); |
| |
| BytesConsumer* consumer = consumer_.Release(); |
| |
| CloseAndLockAndDisturb(); |
| |
| if (is_closed) { |
| // Note that the stream cannot be "draining", because it doesn't have |
| // the internal buffer. |
| return BytesConsumer::CreateClosed(); |
| } |
| if (is_errored) |
| return BytesConsumer::CreateErrored(BytesConsumer::Error("error")); |
| |
| DCHECK(consumer); |
| consumer->ClearClient(); |
| return consumer; |
| } |
| |
| } // namespace blink |