// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "third_party/blink/renderer/core/fetch/body_stream_buffer.h"

#include <memory>
#include "base/auto_reset.h"
#include "third_party/blink/renderer/core/execution_context/execution_context.h"
#include "third_party/blink/renderer/core/fetch/body.h"
#include "third_party/blink/renderer/core/fetch/bytes_consumer_tee.h"
#include "third_party/blink/renderer/core/fetch/bytes_uploader.h"
#include "third_party/blink/renderer/core/fetch/readable_stream_bytes_consumer.h"
#include "third_party/blink/renderer/core/streams/readable_stream.h"
#include "third_party/blink/renderer/core/streams/readable_stream_default_controller_with_script_scope.h"
#include "third_party/blink/renderer/core/typed_arrays/dom_array_buffer.h"
#include "third_party/blink/renderer/core/typed_arrays/dom_typed_array.h"
#include "third_party/blink/renderer/platform/bindings/exception_code.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/bindings/script_state.h"
#include "third_party/blink/renderer/platform/bindings/v8_throw_exception.h"
#include "third_party/blink/renderer/platform/blob/blob_data.h"
#include "third_party/blink/renderer/platform/heap/heap.h"
#include "third_party/blink/renderer/platform/loader/fetch/script_cached_metadata_handler.h"
#include "third_party/blink/renderer/platform/network/encoded_form_data.h"
#include "third_party/blink/renderer/platform/wtf/assertions.h"
#include "third_party/blink/renderer/platform/wtf/functional.h"
#include "third_party/blink/renderer/platform/wtf/std_lib_extras.h"

namespace blink {

class BodyStreamBuffer::LoaderClient final
    : public GarbageCollected<LoaderClient>,
      public ExecutionContextLifecycleObserver,
      public FetchDataLoader::Client {
 public:
  LoaderClient(ExecutionContext* execution_context,
               BodyStreamBuffer* buffer,
               FetchDataLoader::Client* client)
      : ExecutionContextLifecycleObserver(execution_context),
        buffer_(buffer),
        client_(client) {}

  void DidFetchDataLoadedBlobHandle(
      scoped_refptr<BlobDataHandle> blob_data_handle) override {
    buffer_->EndLoading();
    client_->DidFetchDataLoadedBlobHandle(std::move(blob_data_handle));
  }

  void DidFetchDataLoadedArrayBuffer(DOMArrayBuffer* array_buffer) override {
    buffer_->EndLoading();
    client_->DidFetchDataLoadedArrayBuffer(array_buffer);
  }

  void DidFetchDataLoadedFormData(FormData* form_data) override {
    buffer_->EndLoading();
    client_->DidFetchDataLoadedFormData(form_data);
  }

  void DidFetchDataLoadedString(const String& string) override {
    buffer_->EndLoading();
    client_->DidFetchDataLoadedString(string);
  }

  void DidFetchDataStartedDataPipe(
      mojo::ScopedDataPipeConsumerHandle data_pipe) override {
    client_->DidFetchDataStartedDataPipe(std::move(data_pipe));
  }

  void DidFetchDataLoadedDataPipe() override {
    buffer_->EndLoading();
    client_->DidFetchDataLoadedDataPipe();
  }

  void DidFetchDataLoadedCustomFormat() override {
    buffer_->EndLoading();
    client_->DidFetchDataLoadedCustomFormat();
  }

  void DidFetchDataLoadFailed() override {
    buffer_->EndLoading();
    client_->DidFetchDataLoadFailed();
  }

  void Abort() override { NOTREACHED(); }

  void Trace(Visitor* visitor) const override {
    visitor->Trace(buffer_);
    visitor->Trace(client_);
    ExecutionContextLifecycleObserver::Trace(visitor);
    FetchDataLoader::Client::Trace(visitor);
  }

 private:
  void ContextDestroyed() override { buffer_->StopLoading(); }

  Member<BodyStreamBuffer> buffer_;
  Member<FetchDataLoader::Client> client_;
  DISALLOW_COPY_AND_ASSIGN(LoaderClient);
};

// Use a Create() method to split construction from initialisation.
// Initialisation may result in nested calls to ContextDestroyed() and so is not
// safe to do during construction.

// static
BodyStreamBuffer* BodyStreamBuffer::Create(
    ScriptState* script_state,
    BytesConsumer* consumer,
    AbortSignal* signal,
    ScriptCachedMetadataHandler* cached_metadata_handler,
    scoped_refptr<BlobDataHandle> side_data_blob) {
  auto* buffer = MakeGarbageCollected<BodyStreamBuffer>(
      PassKey(), script_state, consumer, signal, cached_metadata_handler,
      std::move(side_data_blob));
  buffer->Init();
  return buffer;
}

BodyStreamBuffer::BodyStreamBuffer(
    PassKey,
    ScriptState* script_state,
    BytesConsumer* consumer,
    AbortSignal* signal,
    ScriptCachedMetadataHandler* cached_metadata_handler,
    scoped_refptr<BlobDataHandle> side_data_blob)
    : UnderlyingSourceBase(script_state),
      script_state_(script_state),
      consumer_(consumer),
      signal_(signal),
      cached_metadata_handler_(cached_metadata_handler),
      side_data_blob_(std::move(side_data_blob)),
      made_from_readable_stream_(false) {}

void BodyStreamBuffer::Init() {
  DCHECK(consumer_);

  stream_ =
      ReadableStream::CreateWithCountQueueingStrategy(script_state_, this, 0);
  stream_broken_ = !stream_;

  // ContextDestroyed() can be called inside the ReadableStream constructor when
  // a worker thread is being terminated. See https://crbug.com/1007162 for
  // details. If consumer_ is null, assume that this happened and this object
  // will never actually be used, and so it is fine to skip the rest of
  // initialisation.
  if (!consumer_)
    return;

  consumer_->SetClient(this);
  if (signal_) {
    if (signal_->aborted()) {
      Abort();
    } else {
      signal_->AddAlgorithm(
          WTF::Bind(&BodyStreamBuffer::Abort, WrapWeakPersistent(this)));
    }
  }
  OnStateChange();
}

BodyStreamBuffer::BodyStreamBuffer(
    ScriptState* script_state,
    ReadableStream* stream,
    ScriptCachedMetadataHandler* cached_metadata_handler,
    scoped_refptr<BlobDataHandle> side_data_blob)
    : UnderlyingSourceBase(script_state),
      script_state_(script_state),
      stream_(stream),
      signal_(nullptr),
      cached_metadata_handler_(cached_metadata_handler),
      side_data_blob_(std::move(side_data_blob)),
      made_from_readable_stream_(true) {
  DCHECK(stream_);
}

scoped_refptr<BlobDataHandle> BodyStreamBuffer::DrainAsBlobDataHandle(
    BytesConsumer::BlobSizePolicy policy) {
  DCHECK(!IsStreamLocked());
  DCHECK(!IsStreamDisturbed());
  if (IsStreamClosed() || IsStreamErrored() || stream_broken_)
    return nullptr;

  if (made_from_readable_stream_)
    return nullptr;

  scoped_refptr<BlobDataHandle> blob_data_handle =
      consumer_->DrainAsBlobDataHandle(policy);
  if (blob_data_handle) {
    CloseAndLockAndDisturb();
    return blob_data_handle;
  }
  return nullptr;
}

scoped_refptr<EncodedFormData> BodyStreamBuffer::DrainAsFormData() {
  DCHECK(!IsStreamLocked());
  DCHECK(!IsStreamDisturbed());
  if (IsStreamClosed() || IsStreamErrored() || stream_broken_)
    return nullptr;

  if (made_from_readable_stream_)
    return nullptr;

  scoped_refptr<EncodedFormData> form_data = consumer_->DrainAsFormData();
  if (form_data) {
    CloseAndLockAndDisturb();
    return form_data;
  }
  return nullptr;
}

void BodyStreamBuffer::DrainAsChunkedDataPipeGetter(
    ScriptState* script_state,
    mojo::PendingReceiver<network::mojom::blink::ChunkedDataPipeGetter>
        pending_receiver) {
  DCHECK(!IsStreamLocked());
  auto* consumer =
      MakeGarbageCollected<ReadableStreamBytesConsumer>(script_state, stream_);
  stream_uploader_ = MakeGarbageCollected<BytesUploader>(
      consumer, std::move(pending_receiver),
      ExecutionContext::From(script_state)
          ->GetTaskRunner(TaskType::kNetworking));
}

void BodyStreamBuffer::StartLoading(FetchDataLoader* loader,
                                    FetchDataLoader::Client* client,
                                    ExceptionState& exception_state) {
  DCHECK(!loader_);
  DCHECK(script_state_->ContextIsValid());
  loader_ = loader;
  if (signal_) {
    if (signal_->aborted()) {
      client->Abort();
      return;
    }
    signal_->AddAlgorithm(
        WTF::Bind(&FetchDataLoader::Client::Abort, WrapWeakPersistent(client)));
  }
  auto* handle = ReleaseHandle(exception_state);
  if (exception_state.HadException())
    return;
  loader->Start(handle,
                MakeGarbageCollected<LoaderClient>(
                    ExecutionContext::From(script_state_), this, client));
}

void BodyStreamBuffer::Tee(BodyStreamBuffer** branch1,
                           BodyStreamBuffer** branch2,
                           ExceptionState& exception_state) {
  DCHECK(!IsStreamLocked());
  DCHECK(!IsStreamDisturbed());
  *branch1 = nullptr;
  *branch2 = nullptr;
  auto* cached_metadata_handler = cached_metadata_handler_.Get();
  scoped_refptr<BlobDataHandle> side_data_blob = TakeSideDataBlob();

  if (made_from_readable_stream_) {
    if (stream_broken_) {
      // We don't really know what state the stream is in, so throw an exception
      // rather than making things worse.
      exception_state.ThrowDOMException(
          DOMExceptionCode::kInvalidStateError,
          "Unsafe to tee stream in unknown state");
      return;
    }
    ReadableStream* stream1 = nullptr;
    ReadableStream* stream2 = nullptr;

    stream_->Tee(script_state_, &stream1, &stream2, exception_state);
    if (exception_state.HadException()) {
      stream_broken_ = true;
      return;
    }

    *branch1 = MakeGarbageCollected<BodyStreamBuffer>(
        script_state_, stream1, cached_metadata_handler, side_data_blob);
    *branch2 = MakeGarbageCollected<BodyStreamBuffer>(
        script_state_, stream2, cached_metadata_handler, side_data_blob);
    return;
  }
  BytesConsumer* dest1 = nullptr;
  BytesConsumer* dest2 = nullptr;
  auto* handle = ReleaseHandle(exception_state);
  if (exception_state.HadException()) {
    stream_broken_ = true;
    return;
  }
  BytesConsumerTee(ExecutionContext::From(script_state_), handle, &dest1,
                   &dest2);
  *branch1 = BodyStreamBuffer::Create(script_state_, dest1, signal_,
                                      cached_metadata_handler, side_data_blob);
  *branch2 = BodyStreamBuffer::Create(script_state_, dest2, signal_,
                                      cached_metadata_handler, side_data_blob);
}

ScriptPromise BodyStreamBuffer::pull(ScriptState* script_state) {
  DCHECK_EQ(script_state, script_state_);
  if (!consumer_) {
    // This is a speculative workaround for a crash. See
    // https://crbug.com/773525.
    // TODO(yhirano): Remove this branch or have a better comment.
    return ScriptPromise::CastUndefined(script_state);
  }

  if (stream_needs_more_)
    return ScriptPromise::CastUndefined(script_state);
  stream_needs_more_ = true;
  if (!in_process_data_)
    ProcessData();
  return ScriptPromise::CastUndefined(script_state);
}

ScriptPromise BodyStreamBuffer::Cancel(ScriptState* script_state,
                                       ScriptValue reason) {
  DCHECK_EQ(script_state, script_state_);
  if (Controller())
    Controller()->Close();
  CancelConsumer();
  return ScriptPromise::CastUndefined(script_state);
}

void BodyStreamBuffer::OnStateChange() {
  if (!consumer_ || !GetExecutionContext() ||
      GetExecutionContext()->IsContextDestroyed())
    return;

  switch (consumer_->GetPublicState()) {
    case BytesConsumer::PublicState::kReadableOrWaiting:
      break;
    case BytesConsumer::PublicState::kClosed:
      Close();
      return;
    case BytesConsumer::PublicState::kErrored:
      GetError();
      return;
  }
  ProcessData();
}

bool BodyStreamBuffer::HasPendingActivity() const {
  return loader_;
}

void BodyStreamBuffer::ContextDestroyed() {
  CancelConsumer();
  UnderlyingSourceBase::ContextDestroyed();
}

bool BodyStreamBuffer::IsStreamReadable() const {
  return stream_->IsReadable();
}

bool BodyStreamBuffer::IsStreamClosed() const {
  return stream_->IsClosed();
}

bool BodyStreamBuffer::IsStreamErrored() const {
  return stream_->IsErrored();
}

bool BodyStreamBuffer::IsStreamLocked() const {
  return stream_->IsLocked();
}

bool BodyStreamBuffer::IsStreamDisturbed() const {
  return stream_->IsDisturbed();
}

void BodyStreamBuffer::CloseAndLockAndDisturb() {
  DCHECK(!stream_broken_);

  cached_metadata_handler_ = nullptr;

  if (IsStreamReadable()) {
    // Note that the stream cannot be "draining", because it doesn't have
    // the internal buffer.
    Close();
  }

  stream_->LockAndDisturb(script_state_);
}

bool BodyStreamBuffer::IsAborted() {
  if (!signal_)
    return false;
  return signal_->aborted();
}

scoped_refptr<BlobDataHandle> BodyStreamBuffer::TakeSideDataBlob() {
  return std::move(side_data_blob_);
}

void BodyStreamBuffer::Trace(Visitor* visitor) const {
  visitor->Trace(script_state_);
  visitor->Trace(stream_);
  visitor->Trace(stream_uploader_);
  visitor->Trace(consumer_);
  visitor->Trace(loader_);
  visitor->Trace(signal_);
  visitor->Trace(cached_metadata_handler_);
  UnderlyingSourceBase::Trace(visitor);
}

void BodyStreamBuffer::Abort() {
  if (!Controller()) {
    DCHECK(!GetExecutionContext());
    DCHECK(!consumer_);
    return;
  }
  Controller()->Error(
      MakeGarbageCollected<DOMException>(DOMExceptionCode::kAbortError));
  CancelConsumer();
}

void BodyStreamBuffer::Close() {
  // Close() can be called during construction, in which case Controller()
  // will not be set yet.
  if (Controller())
    Controller()->Close();
  CancelConsumer();
}

void BodyStreamBuffer::GetError() {
  {
    ScriptState::Scope scope(script_state_);
    Controller()->Error(V8ThrowException::CreateTypeError(
        script_state_->GetIsolate(), "network error"));
  }
  CancelConsumer();
}

void BodyStreamBuffer::CancelConsumer() {
  side_data_blob_.reset();
  if (consumer_) {
    consumer_->Cancel();
    consumer_ = nullptr;
  }
}

void BodyStreamBuffer::ProcessData() {
  DCHECK(consumer_);
  DCHECK(!in_process_data_);

  base::AutoReset<bool> auto_reset(&in_process_data_, true);
  while (stream_needs_more_) {
    const char* buffer = nullptr;
    size_t available = 0;
    auto result = consumer_->BeginRead(&buffer, &available);
    if (result == BytesConsumer::Result::kShouldWait)
      return;
    DOMUint8Array* array = nullptr;
    if (result == BytesConsumer::Result::kOk) {
      array =
          DOMUint8Array::Create(reinterpret_cast<const unsigned char*>(buffer),
                                SafeCast<uint32_t>(available));
      result = consumer_->EndRead(available);
    }
    switch (result) {
      case BytesConsumer::Result::kOk:
      case BytesConsumer::Result::kDone:
        if (array) {
          // Clear m_streamNeedsMore in order to detect a pull call.
          stream_needs_more_ = false;
          Controller()->Enqueue(array);
        }
        if (result == BytesConsumer::Result::kDone) {
          Close();
          return;
        }
        // If m_streamNeedsMore is true, it means that pull is called and
        // the stream needs more data even if the desired size is not
        // positive.
        if (!stream_needs_more_)
          stream_needs_more_ = Controller()->DesiredSize() > 0;
        break;
      case BytesConsumer::Result::kShouldWait:
        NOTREACHED();
        return;
      case BytesConsumer::Result::kError:
        GetError();
        return;
    }
  }
}

void BodyStreamBuffer::EndLoading() {
  DCHECK(loader_);
  loader_ = nullptr;
}

void BodyStreamBuffer::StopLoading() {
  if (!loader_)
    return;
  loader_->Cancel();
  loader_ = nullptr;
}

BytesConsumer* BodyStreamBuffer::ReleaseHandle(
    ExceptionState& exception_state) {
  DCHECK(!IsStreamLocked());
  DCHECK(!IsStreamDisturbed());

  if (stream_broken_) {
    exception_state.ThrowDOMException(
        DOMExceptionCode::kInvalidStateError,
        "Body stream has suffered a fatal error and cannot be inspected");
    return nullptr;
  }

  if (!GetExecutionContext()) {
    // Avoid crashing if ContextDestroyed() has been called.
    exception_state.ThrowDOMException(
        DOMExceptionCode::kInvalidStateError,
        "Cannot release body in a window or worker than has been detached");
    return nullptr;
  }

  // Do this after state checks to avoid side-effects when the method does
  // nothing.
  side_data_blob_.reset();

  if (made_from_readable_stream_) {
    DCHECK(script_state_->ContextIsValid());
    ScriptState::Scope scope(script_state_);
    return MakeGarbageCollected<ReadableStreamBytesConsumer>(script_state_,
                                                             stream_);
  }
  // We need to call these before calling CloseAndLockAndDisturb.
  const bool is_closed = IsStreamClosed();
  const bool is_errored = IsStreamErrored();

  BytesConsumer* consumer = consumer_.Release();

  CloseAndLockAndDisturb();

  if (is_closed) {
    // Note that the stream cannot be "draining", because it doesn't have
    // the internal buffer.
    return BytesConsumer::CreateClosed();
  }
  if (is_errored)
    return BytesConsumer::CreateErrored(BytesConsumer::Error("error"));

  DCHECK(consumer);
  consumer->ClearClient();
  return consumer;
}

}  // namespace blink
