| //! Waking mechanism for threads blocked on channel operations. |
| |
| use super::context::Context; |
| use super::select::{Operation, Selected}; |
| |
| use crate::ptr; |
| use crate::sync::atomic::{AtomicBool, Ordering}; |
| use crate::sync::Mutex; |
| |
| /// Represents a thread blocked on a specific channel operation. |
| pub(crate) struct Entry { |
| /// The operation. |
| pub(crate) oper: Operation, |
| |
| /// Optional packet. |
| pub(crate) packet: *mut (), |
| |
| /// Context associated with the thread owning this operation. |
| pub(crate) cx: Context, |
| } |
| |
| /// A queue of threads blocked on channel operations. |
| /// |
| /// This data structure is used by threads to register blocking operations and get woken up once |
| /// an operation becomes ready. |
| pub(crate) struct Waker { |
| /// A list of select operations. |
| selectors: Vec<Entry>, |
| |
| /// A list of operations waiting to be ready. |
| observers: Vec<Entry>, |
| } |
| |
| impl Waker { |
| /// Creates a new `Waker`. |
| #[inline] |
| pub(crate) fn new() -> Self { |
| Waker { selectors: Vec::new(), observers: Vec::new() } |
| } |
| |
| /// Registers a select operation. |
| #[inline] |
| pub(crate) fn register(&mut self, oper: Operation, cx: &Context) { |
| self.register_with_packet(oper, ptr::null_mut(), cx); |
| } |
| |
| /// Registers a select operation and a packet. |
| #[inline] |
| pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) { |
| self.selectors.push(Entry { oper, packet, cx: cx.clone() }); |
| } |
| |
| /// Unregisters a select operation. |
| #[inline] |
| pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> { |
| if let Some((i, _)) = |
| self.selectors.iter().enumerate().find(|&(_, entry)| entry.oper == oper) |
| { |
| let entry = self.selectors.remove(i); |
| Some(entry) |
| } else { |
| None |
| } |
| } |
| |
| /// Attempts to find another thread's entry, select the operation, and wake it up. |
| #[inline] |
| pub(crate) fn try_select(&mut self) -> Option<Entry> { |
| self.selectors |
| .iter() |
| .position(|selector| { |
| // Does the entry belong to a different thread? |
| selector.cx.thread_id() != current_thread_id() |
| && selector // Try selecting this operation. |
| .cx |
| .try_select(Selected::Operation(selector.oper)) |
| .is_ok() |
| && { |
| // Provide the packet. |
| selector.cx.store_packet(selector.packet); |
| // Wake the thread up. |
| selector.cx.unpark(); |
| true |
| } |
| }) |
| // Remove the entry from the queue to keep it clean and improve |
| // performance. |
| .map(|pos| self.selectors.remove(pos)) |
| } |
| |
| /// Notifies all operations waiting to be ready. |
| #[inline] |
| pub(crate) fn notify(&mut self) { |
| for entry in self.observers.drain(..) { |
| if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() { |
| entry.cx.unpark(); |
| } |
| } |
| } |
| |
| /// Notifies all registered operations that the channel is disconnected. |
| #[inline] |
| pub(crate) fn disconnect(&mut self) { |
| for entry in self.selectors.iter() { |
| if entry.cx.try_select(Selected::Disconnected).is_ok() { |
| // Wake the thread up. |
| // |
| // Here we don't remove the entry from the queue. Registered threads must |
| // unregister from the waker by themselves. They might also want to recover the |
| // packet value and destroy it, if necessary. |
| entry.cx.unpark(); |
| } |
| } |
| |
| self.notify(); |
| } |
| } |
| |
| impl Drop for Waker { |
| #[inline] |
| fn drop(&mut self) { |
| debug_assert_eq!(self.selectors.len(), 0); |
| debug_assert_eq!(self.observers.len(), 0); |
| } |
| } |
| |
| /// A waker that can be shared among threads without locking. |
| /// |
| /// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization. |
| pub(crate) struct SyncWaker { |
| /// The inner `Waker`. |
| inner: Mutex<Waker>, |
| |
| /// `true` if the waker is empty. |
| is_empty: AtomicBool, |
| } |
| |
| impl SyncWaker { |
| /// Creates a new `SyncWaker`. |
| #[inline] |
| pub(crate) fn new() -> Self { |
| SyncWaker { inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true) } |
| } |
| |
| /// Registers the current thread with an operation. |
| #[inline] |
| pub(crate) fn register(&self, oper: Operation, cx: &Context) { |
| let mut inner = self.inner.lock().unwrap(); |
| inner.register(oper, cx); |
| self.is_empty |
| .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); |
| } |
| |
| /// Unregisters an operation previously registered by the current thread. |
| #[inline] |
| pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> { |
| let mut inner = self.inner.lock().unwrap(); |
| let entry = inner.unregister(oper); |
| self.is_empty |
| .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); |
| entry |
| } |
| |
| /// Attempts to find one thread (not the current one), select its operation, and wake it up. |
| #[inline] |
| pub(crate) fn notify(&self) { |
| if !self.is_empty.load(Ordering::SeqCst) { |
| let mut inner = self.inner.lock().unwrap(); |
| if !self.is_empty.load(Ordering::SeqCst) { |
| inner.try_select(); |
| inner.notify(); |
| self.is_empty.store( |
| inner.selectors.is_empty() && inner.observers.is_empty(), |
| Ordering::SeqCst, |
| ); |
| } |
| } |
| } |
| |
| /// Notifies all threads that the channel is disconnected. |
| #[inline] |
| pub(crate) fn disconnect(&self) { |
| let mut inner = self.inner.lock().unwrap(); |
| inner.disconnect(); |
| self.is_empty |
| .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); |
| } |
| } |
| |
| impl Drop for SyncWaker { |
| #[inline] |
| fn drop(&mut self) { |
| debug_assert!(self.is_empty.load(Ordering::SeqCst)); |
| } |
| } |
| |
| /// Returns a unique id for the current thread. |
| #[inline] |
| pub fn current_thread_id() -> usize { |
| // `u8` is not drop so this variable will be available during thread destruction, |
| // whereas `thread::current()` would not be |
| thread_local! { static DUMMY: u8 = 0 } |
| DUMMY.with(|x| (x as *const u8).addr()) |
| } |