blob: 7a602cecd3b896455a8882b66a9517b3885670ca [file] [log] [blame]
//! Multi-producer multi-consumer channels.
// This module is not currently exposed publicly, but is used
// as the implementation for the channels in `sync::mpsc`. The
// implementation comes from the crossbeam-channel crate:
//
// Copyright (c) 2019 The Crossbeam Project Developers
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod array;
mod context;
mod counter;
mod error;
mod list;
mod select;
mod utils;
mod waker;
mod zero;
use crate::fmt;
use crate::panic::{RefUnwindSafe, UnwindSafe};
use crate::time::{Duration, Instant};
pub use error::*;
/// Creates a channel of unbounded capacity.
///
/// This channel has a growable buffer that can hold any number of messages at a time.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (s, r) = counter::new(list::Channel::new());
let s = Sender { flavor: SenderFlavor::List(s) };
let r = Receiver { flavor: ReceiverFlavor::List(r) };
(s, r)
}
/// Creates a channel of bounded capacity.
///
/// This channel has a buffer that can hold at most `cap` messages at a time.
///
/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
/// receive operations must appear at the same time in order to pair up and pass the message over.
pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
if cap == 0 {
let (s, r) = counter::new(zero::Channel::new());
let s = Sender { flavor: SenderFlavor::Zero(s) };
let r = Receiver { flavor: ReceiverFlavor::Zero(r) };
(s, r)
} else {
let (s, r) = counter::new(array::Channel::with_capacity(cap));
let s = Sender { flavor: SenderFlavor::Array(s) };
let r = Receiver { flavor: ReceiverFlavor::Array(r) };
(s, r)
}
}
/// The sending side of a channel.
pub struct Sender<T> {
flavor: SenderFlavor<T>,
}
/// Sender flavors.
enum SenderFlavor<T> {
/// Bounded channel based on a preallocated array.
Array(counter::Sender<array::Channel<T>>),
/// Unbounded channel implemented as a linked list.
List(counter::Sender<list::Channel<T>>),
/// Zero-capacity channel.
Zero(counter::Sender<zero::Channel<T>>),
}
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}
impl<T> UnwindSafe for Sender<T> {}
impl<T> RefUnwindSafe for Sender<T> {}
impl<T> Sender<T> {
/// Attempts to send a message into the channel without blocking.
///
/// This method will either send a message into the channel immediately or return an error if
/// the channel is full or disconnected. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will send the message only if there
/// happens to be a receive operation on the other side of the channel at the same time.
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.try_send(msg),
SenderFlavor::List(chan) => chan.try_send(msg),
SenderFlavor::Zero(chan) => chan.try_send(msg),
}
}
/// Blocks the current thread until a message is sent or the channel is disconnected.
///
/// If the channel is full and not disconnected, this call will block until the send operation
/// can proceed. If the channel becomes disconnected, this call will wake up and return an
/// error. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will wait for a receive operation to
/// appear on the other side of the channel.
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.send(msg, None),
SenderFlavor::List(chan) => chan.send(msg, None),
SenderFlavor::Zero(chan) => chan.send(msg, None),
}
.map_err(|err| match err {
SendTimeoutError::Disconnected(msg) => SendError(msg),
SendTimeoutError::Timeout(_) => unreachable!(),
})
}
}
// The methods below are not used by `sync::mpsc`, but
// are useful and we'll likely want to expose them
// eventually
#[allow(unused)]
impl<T> Sender<T> {
/// Waits for a message to be sent into the channel, but only for a limited time.
///
/// If the channel is full and not disconnected, this call will block until the send operation
/// can proceed or the operation times out. If the channel becomes disconnected, this call will
/// wake up and return an error. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will wait for a receive operation to
/// appear on the other side of the channel.
pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
match Instant::now().checked_add(timeout) {
Some(deadline) => self.send_deadline(msg, deadline),
// So far in the future that it's practically the same as waiting indefinitely.
None => self.send(msg).map_err(SendTimeoutError::from),
}
}
/// Waits for a message to be sent into the channel, but only until a given deadline.
///
/// If the channel is full and not disconnected, this call will block until the send operation
/// can proceed or the operation times out. If the channel becomes disconnected, this call will
/// wake up and return an error. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will wait for a receive operation to
/// appear on the other side of the channel.
pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
}
}
/// Returns `true` if the channel is empty.
///
/// Note: Zero-capacity channels are always empty.
pub fn is_empty(&self) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.is_empty(),
SenderFlavor::List(chan) => chan.is_empty(),
SenderFlavor::Zero(chan) => chan.is_empty(),
}
}
/// Returns `true` if the channel is full.
///
/// Note: Zero-capacity channels are always full.
pub fn is_full(&self) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.is_full(),
SenderFlavor::List(chan) => chan.is_full(),
SenderFlavor::Zero(chan) => chan.is_full(),
}
}
/// Returns the number of messages in the channel.
pub fn len(&self) -> usize {
match &self.flavor {
SenderFlavor::Array(chan) => chan.len(),
SenderFlavor::List(chan) => chan.len(),
SenderFlavor::Zero(chan) => chan.len(),
}
}
/// If the channel is bounded, returns its capacity.
pub fn capacity(&self) -> Option<usize> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.capacity(),
SenderFlavor::List(chan) => chan.capacity(),
SenderFlavor::Zero(chan) => chan.capacity(),
}
}
/// Returns `true` if senders belong to the same channel.
pub fn same_channel(&self, other: &Sender<T>) -> bool {
match (&self.flavor, &other.flavor) {
(SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
(SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
(SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
_ => false,
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
}
}
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
let flavor = match &self.flavor {
SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
};
Sender { flavor }
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Sender { .. }")
}
}
/// The receiving side of a channel.
pub struct Receiver<T> {
flavor: ReceiverFlavor<T>,
}
/// Receiver flavors.
enum ReceiverFlavor<T> {
/// Bounded channel based on a preallocated array.
Array(counter::Receiver<array::Channel<T>>),
/// Unbounded channel implemented as a linked list.
List(counter::Receiver<list::Channel<T>>),
/// Zero-capacity channel.
Zero(counter::Receiver<zero::Channel<T>>),
}
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}
impl<T> UnwindSafe for Receiver<T> {}
impl<T> RefUnwindSafe for Receiver<T> {}
impl<T> Receiver<T> {
/// Attempts to receive a message from the channel without blocking.
///
/// This method will either receive a message from the channel immediately or return an error
/// if the channel is empty.
///
/// If called on a zero-capacity channel, this method will receive a message only if there
/// happens to be a send operation on the other side of the channel at the same time.
pub fn try_recv(&self) -> Result<T, TryRecvError> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.try_recv(),
ReceiverFlavor::List(chan) => chan.try_recv(),
ReceiverFlavor::Zero(chan) => chan.try_recv(),
}
}
/// Blocks the current thread until a message is received or the channel is empty and
/// disconnected.
///
/// If the channel is empty and not disconnected, this call will block until the receive
/// operation can proceed. If the channel is empty and becomes disconnected, this call will
/// wake up and return an error.
///
/// If called on a zero-capacity channel, this method will wait for a send operation to appear
/// on the other side of the channel.
pub fn recv(&self) -> Result<T, RecvError> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.recv(None),
ReceiverFlavor::List(chan) => chan.recv(None),
ReceiverFlavor::Zero(chan) => chan.recv(None),
}
.map_err(|_| RecvError)
}
/// Waits for a message to be received from the channel, but only for a limited time.
///
/// If the channel is empty and not disconnected, this call will block until the receive
/// operation can proceed or the operation times out. If the channel is empty and becomes
/// disconnected, this call will wake up and return an error.
///
/// If called on a zero-capacity channel, this method will wait for a send operation to appear
/// on the other side of the channel.
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
match Instant::now().checked_add(timeout) {
Some(deadline) => self.recv_deadline(deadline),
// So far in the future that it's practically the same as waiting indefinitely.
None => self.recv().map_err(RecvTimeoutError::from),
}
}
/// Waits for a message to be received from the channel, but only for a limited time.
///
/// If the channel is empty and not disconnected, this call will block until the receive
/// operation can proceed or the operation times out. If the channel is empty and becomes
/// disconnected, this call will wake up and return an error.
///
/// If called on a zero-capacity channel, this method will wait for a send operation to appear
/// on the other side of the channel.
pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
}
}
}
// The methods below are not used by `sync::mpsc`, but
// are useful and we'll likely want to expose them
// eventually
#[allow(unused)]
impl<T> Receiver<T> {
/// Returns `true` if the channel is empty.
///
/// Note: Zero-capacity channels are always empty.
pub fn is_empty(&self) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.is_empty(),
ReceiverFlavor::List(chan) => chan.is_empty(),
ReceiverFlavor::Zero(chan) => chan.is_empty(),
}
}
/// Returns `true` if the channel is full.
///
/// Note: Zero-capacity channels are always full.
pub fn is_full(&self) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.is_full(),
ReceiverFlavor::List(chan) => chan.is_full(),
ReceiverFlavor::Zero(chan) => chan.is_full(),
}
}
/// Returns the number of messages in the channel.
pub fn len(&self) -> usize {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.len(),
ReceiverFlavor::List(chan) => chan.len(),
ReceiverFlavor::Zero(chan) => chan.len(),
}
}
/// If the channel is bounded, returns its capacity.
pub fn capacity(&self) -> Option<usize> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.capacity(),
ReceiverFlavor::List(chan) => chan.capacity(),
ReceiverFlavor::Zero(chan) => chan.capacity(),
}
}
/// Returns `true` if receivers belong to the same channel.
pub fn same_channel(&self, other: &Receiver<T>) -> bool {
match (&self.flavor, &other.flavor) {
(ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
(ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
(ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
_ => false,
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
}
}
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let flavor = match &self.flavor {
ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
};
Receiver { flavor }
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Receiver { .. }")
}
}