blob: 7a47cc6696a34908a0bea6f832dbf14e8b68087d [file] [log] [blame]
//! POSIX conditional variable implementation based on user-space wait queues.
use super::{abi, error::expect_success_aborting, spin::SpinMutex, task, time::with_tmos_strong};
use crate::{mem::replace, ptr::NonNull, sys::locks::Mutex, time::Duration};
// The implementation is inspired by the queue-based implementation shown in
// Andrew D. Birrell's paper "Implementing Condition Variables with Semaphores"
pub struct Condvar {
waiters: SpinMutex<waiter_queue::WaiterQueue>,
}
unsafe impl Send for Condvar {}
unsafe impl Sync for Condvar {}
impl Condvar {
#[inline]
pub const fn new() -> Condvar {
Condvar { waiters: SpinMutex::new(waiter_queue::WaiterQueue::new()) }
}
pub fn notify_one(&self) {
self.waiters.with_locked(|waiters| {
if let Some(task) = waiters.pop_front() {
// Unpark the task
match unsafe { abi::wup_tsk(task) } {
// The task already has a token.
abi::E_QOVR => {}
// Can't undo the effect; abort the program on failure
er => {
expect_success_aborting(er, &"wup_tsk");
}
}
}
});
}
pub fn notify_all(&self) {
self.waiters.with_locked(|waiters| {
while let Some(task) = waiters.pop_front() {
// Unpark the task
match unsafe { abi::wup_tsk(task) } {
// The task already has a token.
abi::E_QOVR => {}
// Can't undo the effect; abort the program on failure
er => {
expect_success_aborting(er, &"wup_tsk");
}
}
}
});
}
pub unsafe fn wait(&self, mutex: &Mutex) {
// Construct `Waiter`.
let mut waiter = waiter_queue::Waiter::new();
let waiter = NonNull::from(&mut waiter);
self.waiters.with_locked(|waiters| unsafe {
waiters.insert(waiter);
});
unsafe { mutex.unlock() };
// Wait until `waiter` is removed from the queue
loop {
// Park the current task
expect_success_aborting(unsafe { abi::slp_tsk() }, &"slp_tsk");
if !self.waiters.with_locked(|waiters| unsafe { waiters.is_queued(waiter) }) {
break;
}
}
mutex.lock();
}
pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
// Construct and pin `Waiter`
let mut waiter = waiter_queue::Waiter::new();
let waiter = NonNull::from(&mut waiter);
self.waiters.with_locked(|waiters| unsafe {
waiters.insert(waiter);
});
unsafe { mutex.unlock() };
// Park the current task and do not wake up until the timeout elapses
// or the task gets woken up by `notify_*`
match with_tmos_strong(dur, |tmo| {
let er = unsafe { abi::tslp_tsk(tmo) };
if er == 0 {
// We were unparked. Are we really dequeued?
if self.waiters.with_locked(|waiters| unsafe { waiters.is_queued(waiter) }) {
// No we are not. Continue waiting.
return abi::E_TMOUT;
}
}
er
}) {
abi::E_TMOUT => {}
er => {
expect_success_aborting(er, &"tslp_tsk");
}
}
// Remove `waiter` from `self.waiters`. If `waiter` is still in
// `waiters`, it means we woke up because of a timeout. Otherwise,
// we woke up because of `notify_*`.
let success = self.waiters.with_locked(|waiters| unsafe { !waiters.remove(waiter) });
mutex.lock();
success
}
}
mod waiter_queue {
use super::*;
pub struct WaiterQueue {
head: Option<ListHead>,
}
#[derive(Copy, Clone)]
struct ListHead {
first: NonNull<Waiter>,
last: NonNull<Waiter>,
}
unsafe impl Send for ListHead {}
unsafe impl Sync for ListHead {}
pub struct Waiter {
// These fields are only accessed through `&[mut] WaiterQueue`.
/// The waiting task's ID. Will be zeroed when the task is woken up
/// and removed from a queue.
task: abi::ID,
priority: abi::PRI,
prev: Option<NonNull<Waiter>>,
next: Option<NonNull<Waiter>>,
}
unsafe impl Send for Waiter {}
unsafe impl Sync for Waiter {}
impl Waiter {
#[inline]
pub fn new() -> Self {
let task = task::current_task_id();
let priority = task::task_priority(abi::TSK_SELF);
// Zeroness of `Waiter::task` indicates whether the `Waiter` is
// linked to a queue or not. This invariant is important for
// the correctness.
debug_assert_ne!(task, 0);
Self { task, priority, prev: None, next: None }
}
}
impl WaiterQueue {
#[inline]
pub const fn new() -> Self {
Self { head: None }
}
/// # Safety
///
/// - The caller must own `*waiter_ptr`. The caller will lose the
/// ownership until `*waiter_ptr` is removed from `self`.
///
/// - `*waiter_ptr` must be valid until it's removed from the queue.
///
/// - `*waiter_ptr` must not have been previously inserted to a `WaiterQueue`.
///
pub unsafe fn insert(&mut self, mut waiter_ptr: NonNull<Waiter>) {
unsafe {
let waiter = waiter_ptr.as_mut();
debug_assert!(waiter.prev.is_none());
debug_assert!(waiter.next.is_none());
if let Some(head) = &mut self.head {
// Find the insertion position and insert `waiter`
let insert_after = {
let mut cursor = head.last;
loop {
if waiter.priority >= cursor.as_ref().priority {
// `cursor` and all previous waiters have the same or higher
// priority than `current_task_priority`. Insert the new
// waiter right after `cursor`.
break Some(cursor);
}
cursor = if let Some(prev) = cursor.as_ref().prev {
prev
} else {
break None;
};
}
};
if let Some(mut insert_after) = insert_after {
// Insert `waiter` after `insert_after`
let insert_before = insert_after.as_ref().next;
waiter.prev = Some(insert_after);
insert_after.as_mut().next = Some(waiter_ptr);
waiter.next = insert_before;
if let Some(mut insert_before) = insert_before {
insert_before.as_mut().prev = Some(waiter_ptr);
} else {
head.last = waiter_ptr;
}
} else {
// Insert `waiter` to the front
waiter.next = Some(head.first);
head.first.as_mut().prev = Some(waiter_ptr);
head.first = waiter_ptr;
}
} else {
// `waiter` is the only element
self.head = Some(ListHead { first: waiter_ptr, last: waiter_ptr });
}
}
}
/// Given a `Waiter` that was previously inserted to `self`, remove
/// it from `self` if it's still there.
#[inline]
pub unsafe fn remove(&mut self, mut waiter_ptr: NonNull<Waiter>) -> bool {
unsafe {
let waiter = waiter_ptr.as_mut();
if waiter.task != 0 {
let head = self.head.as_mut().unwrap();
match (waiter.prev, waiter.next) {
(Some(mut prev), Some(mut next)) => {
prev.as_mut().next = Some(next);
next.as_mut().prev = Some(prev);
}
(None, Some(mut next)) => {
head.first = next;
next.as_mut().prev = None;
}
(Some(mut prev), None) => {
prev.as_mut().next = None;
head.last = prev;
}
(None, None) => {
self.head = None;
}
}
waiter.task = 0;
true
} else {
false
}
}
}
/// Given a `Waiter` that was previously inserted to `self`, return a
/// flag indicating whether it's still in `self`.
#[inline]
pub unsafe fn is_queued(&self, waiter: NonNull<Waiter>) -> bool {
unsafe { waiter.as_ref().task != 0 }
}
#[inline]
pub fn pop_front(&mut self) -> Option<abi::ID> {
unsafe {
let head = self.head.as_mut()?;
let waiter = head.first.as_mut();
// Get the ID
let id = replace(&mut waiter.task, 0);
// Unlink the waiter
if let Some(mut next) = waiter.next {
head.first = next;
next.as_mut().prev = None;
} else {
self.head = None;
}
Some(id)
}
}
}
}