blob: bbfc6ce00ffc2144da81a115f905b4f7c580a087 [file] [log] [blame]
//! Thread-local channel context.
use super::select::Selected;
use super::waker::current_thread_id;
use crate::cell::Cell;
use crate::ptr;
use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use crate::sync::Arc;
use crate::thread::{self, Thread};
use crate::time::Instant;
/// Thread-local context.
#[derive(Debug, Clone)]
pub struct Context {
inner: Arc<Inner>,
}
/// Inner representation of `Context`.
#[derive(Debug)]
struct Inner {
/// Selected operation.
select: AtomicUsize,
/// A slot into which another thread may store a pointer to its `Packet`.
packet: AtomicPtr<()>,
/// Thread handle.
thread: Thread,
/// Thread id.
thread_id: usize,
}
impl Context {
/// Creates a new context for the duration of the closure.
#[inline]
pub fn with<F, R>(f: F) -> R
where
F: FnOnce(&Context) -> R,
{
thread_local! {
/// Cached thread-local context.
static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
}
let mut f = Some(f);
let mut f = |cx: &Context| -> R {
let f = f.take().unwrap();
f(cx)
};
CONTEXT
.try_with(|cell| match cell.take() {
None => f(&Context::new()),
Some(cx) => {
cx.reset();
let res = f(&cx);
cell.set(Some(cx));
res
}
})
.unwrap_or_else(|_| f(&Context::new()))
}
/// Creates a new `Context`.
#[cold]
fn new() -> Context {
Context {
inner: Arc::new(Inner {
select: AtomicUsize::new(Selected::Waiting.into()),
packet: AtomicPtr::new(ptr::null_mut()),
thread: thread::current(),
thread_id: current_thread_id(),
}),
}
}
/// Resets `select` and `packet`.
#[inline]
fn reset(&self) {
self.inner.select.store(Selected::Waiting.into(), Ordering::Release);
self.inner.packet.store(ptr::null_mut(), Ordering::Release);
}
/// Attempts to select an operation.
///
/// On failure, the previously selected operation is returned.
#[inline]
pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
self.inner
.select
.compare_exchange(
Selected::Waiting.into(),
select.into(),
Ordering::AcqRel,
Ordering::Acquire,
)
.map(|_| ())
.map_err(|e| e.into())
}
/// Stores a packet.
///
/// This method must be called after `try_select` succeeds and there is a packet to provide.
#[inline]
pub fn store_packet(&self, packet: *mut ()) {
if !packet.is_null() {
self.inner.packet.store(packet, Ordering::Release);
}
}
/// Waits until an operation is selected and returns it.
///
/// If the deadline is reached, `Selected::Aborted` will be selected.
#[inline]
pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
loop {
// Check whether an operation has been selected.
let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
if sel != Selected::Waiting {
return sel;
}
// If there's a deadline, park the current thread until the deadline is reached.
if let Some(end) = deadline {
let now = Instant::now();
if now < end {
thread::park_timeout(end - now);
} else {
// The deadline has been reached. Try aborting select.
return match self.try_select(Selected::Aborted) {
Ok(()) => Selected::Aborted,
Err(s) => s,
};
}
} else {
thread::park();
}
}
}
/// Unparks the thread this context belongs to.
#[inline]
pub fn unpark(&self) {
self.inner.thread.unpark();
}
/// Returns the id of the thread this context belongs to.
#[inline]
pub fn thread_id(&self) -> usize {
self.inner.thread_id
}
}