| // Copyright 2015 The Chromium OS Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <brillo/streams/input_stream_set.h> |
| |
| #include <base/bind.h> |
| #include <brillo/message_loops/message_loop.h> |
| #include <brillo/streams/stream_errors.h> |
| #include <brillo/streams/stream_utils.h> |
| |
| namespace brillo { |
| |
| InputStreamSet::InputStreamSet( |
| std::vector<Stream*> source_streams, |
| std::vector<StreamPtr> owned_source_streams, |
| uint64_t initial_stream_size) |
| : source_streams_{std::move(source_streams)}, |
| owned_source_streams_{std::move(owned_source_streams)}, |
| initial_stream_size_{initial_stream_size} {} |
| |
| StreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams, |
| std::vector<StreamPtr> owned_source_streams, |
| ErrorPtr* error) { |
| StreamPtr stream; |
| |
| if (source_streams.empty()) { |
| Error::AddTo(error, FROM_HERE, errors::stream::kDomain, |
| errors::stream::kInvalidParameter, |
| "Source stream list is empty"); |
| return stream; |
| } |
| |
| // Make sure we have only readable streams. |
| for (Stream* src_stream : source_streams) { |
| if (!src_stream->CanRead()) { |
| Error::AddTo(error, FROM_HERE, errors::stream::kDomain, |
| errors::stream::kInvalidParameter, |
| "The stream list must contain only readable streams"); |
| return stream; |
| } |
| } |
| |
| // We are using remaining size here because the multiplexed stream is not |
| // seekable and the bytes already read are essentially "lost" as far as this |
| // stream is concerned. |
| uint64_t initial_stream_size = 0; |
| for (const Stream* stream : source_streams) |
| initial_stream_size += stream->GetRemainingSize(); |
| |
| stream.reset(new InputStreamSet{std::move(source_streams), |
| std::move(owned_source_streams), |
| initial_stream_size}); |
| return stream; |
| } |
| |
| StreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams, |
| ErrorPtr* error) { |
| return Create(std::move(source_streams), {}, error); |
| } |
| |
| StreamPtr InputStreamSet::Create(std::vector<StreamPtr> owned_source_streams, |
| ErrorPtr* error) { |
| std::vector<Stream*> source_streams; |
| source_streams.reserve(owned_source_streams.size()); |
| for (const StreamPtr& stream : owned_source_streams) |
| source_streams.push_back(stream.get()); |
| return Create(std::move(source_streams), std::move(owned_source_streams), |
| error); |
| } |
| |
| bool InputStreamSet::IsOpen() const { |
| return !closed_; |
| } |
| |
| bool InputStreamSet::CanGetSize() const { |
| bool can_get_size = IsOpen(); |
| for (const Stream* stream : source_streams_) { |
| if (!stream->CanGetSize()) { |
| can_get_size = false; |
| break; |
| } |
| } |
| return can_get_size; |
| } |
| |
| uint64_t InputStreamSet::GetSize() const { |
| return initial_stream_size_; |
| } |
| |
| bool InputStreamSet::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) { |
| return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); |
| } |
| |
| uint64_t InputStreamSet::GetRemainingSize() const { |
| uint64_t size = 0; |
| for (const Stream* stream : source_streams_) |
| size += stream->GetRemainingSize(); |
| return size; |
| } |
| |
| bool InputStreamSet::Seek(int64_t /* offset */, |
| Whence /* whence */, |
| uint64_t* /* new_position */, |
| ErrorPtr* error) { |
| return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); |
| } |
| |
| bool InputStreamSet::ReadNonBlocking(void* buffer, |
| size_t size_to_read, |
| size_t* size_read, |
| bool* end_of_stream, |
| ErrorPtr* error) { |
| if (!IsOpen()) |
| return stream_utils::ErrorStreamClosed(FROM_HERE, error); |
| |
| while (!source_streams_.empty()) { |
| Stream* stream = source_streams_.front(); |
| bool eos = false; |
| if (!stream->ReadNonBlocking(buffer, size_to_read, size_read, &eos, error)) |
| return false; |
| |
| if (*size_read > 0 || !eos) { |
| if (end_of_stream) |
| *end_of_stream = false; |
| return true; |
| } |
| |
| source_streams_.erase(source_streams_.begin()); |
| } |
| *size_read = 0; |
| if (end_of_stream) |
| *end_of_stream = true; |
| return true; |
| } |
| |
| bool InputStreamSet::WriteNonBlocking(const void* /* buffer */, |
| size_t /* size_to_write */, |
| size_t* /* size_written */, |
| ErrorPtr* error) { |
| return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); |
| } |
| |
| bool InputStreamSet::CloseBlocking(ErrorPtr* error) { |
| bool success = true; |
| // We want to close only the owned streams. |
| for (StreamPtr& stream_ptr : owned_source_streams_) { |
| if (!stream_ptr->CloseBlocking(error)) |
| success = false; // Keep going for other streams... |
| } |
| owned_source_streams_.clear(); |
| source_streams_.clear(); |
| initial_stream_size_ = 0; |
| closed_ = true; |
| return success; |
| } |
| |
| bool InputStreamSet::WaitForData( |
| AccessMode mode, |
| const base::Callback<void(AccessMode)>& callback, |
| ErrorPtr* error) { |
| if (!IsOpen()) |
| return stream_utils::ErrorStreamClosed(FROM_HERE, error); |
| |
| if (stream_utils::IsWriteAccessMode(mode)) |
| return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); |
| |
| if (!source_streams_.empty()) { |
| Stream* stream = source_streams_.front(); |
| return stream->WaitForData(mode, callback, error); |
| } |
| |
| MessageLoop::current()->PostTask(FROM_HERE, base::Bind(callback, mode)); |
| return true; |
| } |
| |
| bool InputStreamSet::WaitForDataBlocking(AccessMode in_mode, |
| base::TimeDelta timeout, |
| AccessMode* out_mode, |
| ErrorPtr* error) { |
| if (!IsOpen()) |
| return stream_utils::ErrorStreamClosed(FROM_HERE, error); |
| |
| if (stream_utils::IsWriteAccessMode(in_mode)) |
| return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); |
| |
| if (!source_streams_.empty()) { |
| Stream* stream = source_streams_.front(); |
| return stream->WaitForDataBlocking(in_mode, timeout, out_mode, error); |
| } |
| |
| if (out_mode) |
| *out_mode = in_mode; |
| return true; |
| } |
| |
| void InputStreamSet::CancelPendingAsyncOperations() { |
| if (IsOpen() && !source_streams_.empty()) { |
| Stream* stream = source_streams_.front(); |
| stream->CancelPendingAsyncOperations(); |
| } |
| Stream::CancelPendingAsyncOperations(); |
| } |
| |
| } // namespace brillo |