| // Copyright 2020 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "third_party/blink/renderer/core/fetch/bytes_uploader.h" |
| |
| #include "base/numerics/checked_math.h" |
| #include "base/numerics/safe_conversions.h" |
| #include "base/single_thread_task_runner.h" |
| #include "net/base/net_errors.h" |
| #include "third_party/blink/public/platform/task_type.h" |
| #include "third_party/blink/renderer/core/execution_context/execution_context.h" |
| #include "third_party/blink/renderer/platform/heap/persistent.h" |
| #include "third_party/blink/renderer/platform/loader/fetch/bytes_consumer.h" |
| |
| namespace blink { |
| |
| BytesUploader::BytesUploader( |
| BytesConsumer* consumer, |
| mojo::PendingReceiver<network::mojom::blink::ChunkedDataPipeGetter> |
| pending_receiver, |
| scoped_refptr<base::SingleThreadTaskRunner> task_runner) |
| : consumer_(consumer), |
| receiver_(this, std::move(pending_receiver)), |
| upload_pipe_watcher_(FROM_HERE, |
| mojo::SimpleWatcher::ArmingPolicy::MANUAL, |
| std::move(task_runner)) { |
| DCHECK(consumer_); |
| DCHECK_EQ(consumer_->GetPublicState(), |
| BytesConsumer::PublicState::kReadableOrWaiting); |
| } |
| |
| BytesUploader::~BytesUploader() = default; |
| |
| void BytesUploader::Trace(blink::Visitor* visitor) const { |
| visitor->Trace(consumer_); |
| BytesConsumer::Client::Trace(visitor); |
| } |
| |
| void BytesUploader::GetSize(GetSizeCallback get_size_callback) { |
| DCHECK(!get_size_callback_); |
| get_size_callback_ = std::move(get_size_callback); |
| } |
| |
| void BytesUploader::StartReading( |
| mojo::ScopedDataPipeProducerHandle upload_pipe) { |
| DVLOG(3) << this << " StartReading()"; |
| DCHECK(get_size_callback_); |
| DCHECK(upload_pipe); |
| if (upload_pipe_) { |
| // Replay was asked by net/ service. |
| CloseOnError(); |
| return; |
| } |
| upload_pipe_ = std::move(upload_pipe); |
| upload_pipe_watcher_.Watch(upload_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, |
| WTF::BindRepeating(&BytesUploader::OnPipeWriteable, |
| WrapWeakPersistent(this))); |
| consumer_->SetClient(this); |
| if (consumer_->GetPublicState() == |
| BytesConsumer::PublicState::kReadableOrWaiting) { |
| WriteDataOnPipe(); |
| } |
| } |
| |
| void BytesUploader::OnStateChange() { |
| DVLOG(3) << this << " OnStateChange(). consumer_->GetPublicState()=" |
| << consumer_->GetPublicState(); |
| DCHECK(get_size_callback_); |
| switch (consumer_->GetPublicState()) { |
| case BytesConsumer::PublicState::kReadableOrWaiting: |
| WriteDataOnPipe(); |
| return; |
| case BytesConsumer::PublicState::kClosed: |
| Close(); |
| return; |
| case BytesConsumer::PublicState::kErrored: |
| CloseOnError(); |
| return; |
| } |
| NOTREACHED(); |
| } |
| |
| void BytesUploader::OnPipeWriteable(MojoResult unused) { |
| WriteDataOnPipe(); |
| } |
| |
| void BytesUploader::WriteDataOnPipe() { |
| DVLOG(3) << this << " WriteDataOnPipe(). consumer_->GetPublicState()=" |
| << consumer_->GetPublicState(); |
| DCHECK(upload_pipe_); |
| DCHECK(get_size_callback_); |
| if (!upload_pipe_.is_valid()) |
| return; |
| |
| while (true) { |
| const char* buffer; |
| size_t available; |
| auto consumer_result = consumer_->BeginRead(&buffer, &available); |
| DVLOG(3) << " consumer_->BeginRead()=" << consumer_result |
| << ", available=" << available; |
| switch (consumer_result) { |
| case BytesConsumer::Result::kError: |
| CloseOnError(); |
| return; |
| case BytesConsumer::Result::kShouldWait: |
| return; |
| case BytesConsumer::Result::kDone: |
| Close(); |
| return; |
| case BytesConsumer::Result::kOk: |
| break; |
| } |
| DCHECK_EQ(consumer_result, BytesConsumer::Result::kOk); |
| uint32_t written_bytes = base::saturated_cast<uint32_t>(available); |
| const MojoResult mojo_result = upload_pipe_->WriteData( |
| buffer, &written_bytes, MOJO_WRITE_DATA_FLAG_NONE); |
| DVLOG(3) << " upload_pipe_->WriteData()=" << mojo_result |
| << ", mojo_written=" << written_bytes |
| << ", consumer_->EndRead()=" << consumer_result; |
| if (mojo_result == MOJO_RESULT_SHOULD_WAIT) { |
| // Wait for the pipe to have more capacity available |
| consumer_result = consumer_->EndRead(0); |
| upload_pipe_watcher_.ArmOrNotify(); |
| return; |
| } |
| if (mojo_result != MOJO_RESULT_OK) { |
| CloseOnError(); |
| return; |
| } |
| |
| consumer_result = consumer_->EndRead(written_bytes); |
| if (!base::CheckAdd(total_size_, written_bytes) |
| .AssignIfValid(&total_size_)) { |
| CloseOnError(); |
| return; |
| } |
| |
| switch (consumer_result) { |
| case BytesConsumer::Result::kError: |
| CloseOnError(); |
| return; |
| case BytesConsumer::Result::kShouldWait: |
| NOTREACHED(); |
| return; |
| case BytesConsumer::Result::kDone: |
| Close(); |
| return; |
| case BytesConsumer::Result::kOk: |
| break; |
| } |
| } |
| } |
| |
| void BytesUploader::Close() { |
| DVLOG(3) << this << " Close(). total_size=" << total_size_; |
| DCHECK(get_size_callback_); |
| std::move(get_size_callback_).Run(net::OK, total_size_); |
| } |
| |
| void BytesUploader::CloseOnError() { |
| DVLOG(3) << this << " CloseOnError(). total_size=" << total_size_; |
| DCHECK(consumer_); |
| consumer_->Cancel(); |
| DCHECK(get_size_callback_); |
| std::move(get_size_callback_).Run(net::ERR_FAILED, total_size_); |
| } |
| |
| } // namespace blink |