fixes: (read comment)
- `remove` lock better synchronization - `remove` better drop impl - `remove` fixed potential race condition / deadlock - `append` forgor to modify first node's `prev` - feat: add `pop` fn note: for now test with `cargo test -- --nocapture`
This commit is contained in:
244
src/lib.rs
244
src/lib.rs
@@ -1,14 +1,14 @@
|
||||
#![feature(arbitrary_self_types)]
|
||||
#![feature(arbitrary_self_types, offset_of_enum, generic_const_items)]
|
||||
#![warn(clippy::pedantic)]
|
||||
#![allow(incomplete_features)]
|
||||
|
||||
use std::{
|
||||
alloc::{Layout, dealloc},
|
||||
hint::unreachable_unchecked,
|
||||
mem::MaybeUninit,
|
||||
mem::{MaybeUninit, offset_of},
|
||||
ops::Deref,
|
||||
};
|
||||
|
||||
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use parking_lot::{RwLock, RwLockWriteGuard};
|
||||
|
||||
mod docs {
|
||||
//! Rules for soundness of modifications.
|
||||
@@ -50,6 +50,47 @@ impl<T: 'static> NodeDiscr<T> {
|
||||
};
|
||||
head
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
/// UB if [`self`] is not [`Self::Node`].
|
||||
pub unsafe fn as_node_unchecked(&self) -> &Node<T> {
|
||||
let Self::Node(node) = self else {
|
||||
unsafe { unreachable_unchecked() }
|
||||
};
|
||||
node
|
||||
}
|
||||
|
||||
fn try_write(&'static self) -> Option<NodeDiscrWriteLocks<'static, T>> {
|
||||
match self {
|
||||
NodeDiscr::Head(h) => {
|
||||
let lock = h.try_write()?;
|
||||
Some(NodeDiscrWriteLocks::Head(lock))
|
||||
}
|
||||
NodeDiscr::Node(n) => {
|
||||
let lock = n.0.try_write()?;
|
||||
Some(NodeDiscrWriteLocks::Node(lock))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// Will leak if not handled properly.
|
||||
#[must_use]
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
fn alloc_new_with_ptrs(
|
||||
data: T,
|
||||
next: Option<&'static Node<T>>,
|
||||
prev: &'static NodeDiscr<T>,
|
||||
) -> (&'static Self, &'static Node<T>) {
|
||||
let discr = Box::leak(Box::new(NodeDiscr::Node(Node(RwLock::new(NodeInner {
|
||||
next: MaybeUninit::new(next),
|
||||
prev: MaybeUninit::new(prev),
|
||||
isolated: false,
|
||||
data,
|
||||
})))));
|
||||
(discr, unsafe { discr.as_node_unchecked() })
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // We dont even read variants, just hold whatever lock
|
||||
@@ -58,6 +99,15 @@ enum NodeDiscrWriteLocks<'a, T: 'static> {
|
||||
Node(RwLockWriteGuard<'a, NodeInner<T>>),
|
||||
}
|
||||
|
||||
impl<T: 'static> NodeDiscrWriteLocks<'_, T> {
|
||||
fn set_next(&mut self, next: Option<&'static Node<T>>) {
|
||||
match self {
|
||||
Self::Head(h) => h.start = next,
|
||||
Self::Node(n) => n.next = MaybeUninit::new(next),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(transparent)]
|
||||
pub struct Node<T: 'static>(RwLock<NodeInner<T>>);
|
||||
|
||||
@@ -66,6 +116,9 @@ pub struct Node<T: 'static>(RwLock<NodeInner<T>>);
|
||||
struct NodeInner<T: 'static> {
|
||||
next: MaybeUninit<Option<&'static Node<T>>>,
|
||||
prev: MaybeUninit<&'static NodeDiscr<T>>,
|
||||
/// intended for removal, when the `RwLock` is being "drained" from waiters, there might be
|
||||
/// another remover waiting, if it finds this it simply aborts
|
||||
isolated: bool,
|
||||
data: T,
|
||||
}
|
||||
|
||||
@@ -82,43 +135,63 @@ impl<T> NodeInner<T> {
|
||||
unsafe { self.prev.assume_init() }
|
||||
}
|
||||
|
||||
/// Could also leak memory
|
||||
///
|
||||
/// # Safety
|
||||
/// The `prev` self ptr is valid as long as the write lock is held, as soon as it's dropped it
|
||||
/// becomes invalid.
|
||||
fn update_prev(&self) -> NodeDiscrWriteLocks<'static, T> {
|
||||
match self.prev() {
|
||||
NodeDiscr::Head(h) => {
|
||||
let mut lock = h.write();
|
||||
lock.start = unsafe { self.next.assume_init() };
|
||||
NodeDiscrWriteLocks::Head(lock)
|
||||
}
|
||||
NodeDiscr::Node(n) => {
|
||||
let mut lock = n.0.write();
|
||||
lock.next = self.next;
|
||||
NodeDiscrWriteLocks::Node(lock)
|
||||
}
|
||||
}
|
||||
}
|
||||
// /// Could also leak memory
|
||||
// ///
|
||||
// /// # Safety
|
||||
// /// The `prev` self ptr is valid as long as the write lock is held, as soon as it's dropped it
|
||||
// /// becomes invalid.
|
||||
// fn try_update_prev(&self) -> Option<NodeDiscrWriteLocks<'static, T>> {
|
||||
// match self.prev() {
|
||||
// NodeDiscr::Head(h) => {
|
||||
// let mut lock = h.try_write()?;
|
||||
// lock.start = unsafe { self.next.assume_init() };
|
||||
// Some(NodeDiscrWriteLocks::Head(lock))
|
||||
// }
|
||||
// NodeDiscr::Node(n) => {
|
||||
// let mut lock = n.0.try_write()?;
|
||||
// lock.next = self.next;
|
||||
// Some(NodeDiscrWriteLocks::Node(lock))
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
fn next(&self) -> Option<&'static Node<T>> {
|
||||
unsafe { self.next.assume_init() }
|
||||
}
|
||||
|
||||
/// Could also leak memory
|
||||
///
|
||||
/// # Safety
|
||||
/// The `next`self ptr is valid as long as the write lock is held, as soon as it's dropped it
|
||||
/// becomes invalid.
|
||||
fn update_next(&self) -> Option<RwLockWriteGuard<'static, NodeInner<T>>> {
|
||||
if let Some(next) = self.next() {
|
||||
let mut lock = next.0.write();
|
||||
lock.prev = self.prev;
|
||||
Some(lock)
|
||||
// /// Could also leak memory.
|
||||
// ///
|
||||
// /// First option is if theres any next, second one if it locked or not.
|
||||
// ///
|
||||
// /// # Safety
|
||||
// /// The `next` self ptr is valid as long as the write lock is held, as soon as it's dropped it
|
||||
// /// becomes invalid.
|
||||
// fn try_update_next(&self) -> Option<Option<RwLockWriteGuard<'static, NodeInner<T>>>> {
|
||||
// self.next().map(|next| {
|
||||
// if let Some(mut lock) = next.0.try_write() {
|
||||
// lock.prev = self.prev;
|
||||
// Some(lock)
|
||||
// } else {
|
||||
// None
|
||||
// }
|
||||
// })
|
||||
// }
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn try_write_sides(
|
||||
&self,
|
||||
) -> Option<(
|
||||
NodeDiscrWriteLocks<'static, T>,
|
||||
Option<RwLockWriteGuard<'static, NodeInner<T>>>,
|
||||
)> {
|
||||
let prev_lock = self.prev().try_write()?;
|
||||
let next_lock = if let Some(next_lock) = self.next() {
|
||||
Some(next_lock.0.try_write()?)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
Some((prev_lock, next_lock))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,23 +207,7 @@ impl<T> Node<T> {
|
||||
Box::leak(Box::new(Node(RwLock::new(NodeInner {
|
||||
next: MaybeUninit::uninit(),
|
||||
prev: MaybeUninit::uninit(),
|
||||
data,
|
||||
}))))
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// Will leak if not handled properly.
|
||||
#[must_use]
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
fn alloc_new_with_ptrs(
|
||||
data: T,
|
||||
next: Option<&'static Node<T>>,
|
||||
prev: &'static NodeDiscr<T>,
|
||||
) -> &'static mut Self {
|
||||
Box::leak(Box::new(Node(RwLock::new(NodeInner {
|
||||
next: MaybeUninit::new(next),
|
||||
prev: MaybeUninit::new(prev),
|
||||
isolated: true,
|
||||
data,
|
||||
}))))
|
||||
}
|
||||
@@ -160,22 +217,39 @@ impl<T> Node<T> {
|
||||
/// awaiting. Adyacent write locks are also sent back to prevent their modification since the
|
||||
/// isolation and make the pointers of self still valid.
|
||||
///
|
||||
/// If it returns None, the node was already isolated.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// Its unsafe to access `next` and `prev` ptr's after the adge locks are dropped.
|
||||
/// Its unsafe to access `next` and `prev` ptr's after the edge locks are dropped.
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn isolate(
|
||||
&'_ self,
|
||||
) -> (
|
||||
RwLockReadGuard<'_, NodeInner<T>>,
|
||||
) -> Option<(
|
||||
RwLockWriteGuard<'_, NodeInner<T>>,
|
||||
(
|
||||
NodeDiscrWriteLocks<'static, T>,
|
||||
Option<RwLockWriteGuard<'static, NodeInner<T>>>,
|
||||
),
|
||||
) {
|
||||
let node = self.0.read();
|
||||
let edge_locks = (node.update_prev(), node.update_next());
|
||||
(node, edge_locks)
|
||||
)> {
|
||||
loop {
|
||||
let mut node = self.0.write();
|
||||
if node.isolated {
|
||||
break None;
|
||||
}
|
||||
let Some(mut sides) = node.try_write_sides() else {
|
||||
drop(node);
|
||||
std::thread::yield_now();
|
||||
continue;
|
||||
};
|
||||
|
||||
node.isolated = true;
|
||||
sides.0.set_next(node.next());
|
||||
if let Some(next) = &mut sides.1 {
|
||||
next.prev = node.prev;
|
||||
}
|
||||
break Some((node, sides));
|
||||
}
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
@@ -188,35 +262,32 @@ impl<T> Node<T> {
|
||||
/// lead to weird UB.
|
||||
pub unsafe fn remove(&self) {
|
||||
unsafe {
|
||||
let (lock, edge_locks) = self.isolate();
|
||||
let Some((node, edge_locks)) = self.isolate() else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Drop the allocated data, edge ptrs remain valid meanwhile
|
||||
drop(lock); // let other readers/writers finish with this item
|
||||
let data = loop {
|
||||
drop(node); // let other readers/writers finish with this item
|
||||
loop {
|
||||
if self.0.is_locked() {
|
||||
std::thread::yield_now();
|
||||
} else {
|
||||
break self.0.data_ptr();
|
||||
break;
|
||||
}
|
||||
};
|
||||
std::ptr::drop_in_place(data);
|
||||
drop(edge_locks); // edge ptrs become invalid form now on
|
||||
}
|
||||
|
||||
// Now that we are the only ref to ourselves its ok to take outselves as mutable
|
||||
let myself = std::ptr::from_ref(self).cast_mut();
|
||||
#[allow(clippy::items_after_statements)]
|
||||
const OFFSET<T>: usize = offset_of!(NodeDiscr<T>, Node.0);
|
||||
let myself_discr = myself.wrapping_byte_sub(OFFSET::<T>).cast::<NodeDiscr<T>>();
|
||||
|
||||
// And free this
|
||||
dealloct(myself);
|
||||
drop(Box::from_raw(myself_discr));
|
||||
drop(edge_locks); // edge ptrs become invalid form now on
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn dealloct<T>(data: *mut T) {
|
||||
unsafe {
|
||||
dealloc(data.cast(), Layout::new::<T>());
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LinkedList<T: 'static> {
|
||||
start: Option<&'static Node<T>>,
|
||||
}
|
||||
@@ -250,8 +321,12 @@ impl<T> LinkedList<T> {
|
||||
debug_assert!(std::ptr::eq(RwLockWriteGuard::rwlock(&self), ll_head));
|
||||
}
|
||||
|
||||
let first_node = self.start;
|
||||
let new_node = Node::alloc_new_with_ptrs(data, first_node, head_ref);
|
||||
let next = self.start;
|
||||
let (new_node_discr, new_node) = NodeDiscr::alloc_new_with_ptrs(data, next, head_ref);
|
||||
if let Some(next) = next {
|
||||
let mut next = next.0.write();
|
||||
next.prev = MaybeUninit::new(new_node_discr);
|
||||
}
|
||||
self.start = Some(new_node);
|
||||
}
|
||||
}
|
||||
@@ -279,6 +354,25 @@ impl<T: 'static> LinkedListWrapper<T> {
|
||||
unsafe { self.inner.as_head_unchecked() }
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// Nothing external must point to the item about to be popped.
|
||||
pub unsafe fn pop(&'static self) {
|
||||
loop {
|
||||
let head_read = self.as_head().read();
|
||||
let Some(node) = head_read.start else {
|
||||
std::thread::yield_now();
|
||||
continue;
|
||||
};
|
||||
|
||||
unsafe {
|
||||
drop(head_read);
|
||||
node.remove();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn prepend(&'static self, data: T) {
|
||||
let lock = self.as_head().write();
|
||||
unsafe {
|
||||
|
72
src/tests.rs
72
src/tests.rs
@@ -1,15 +1,79 @@
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
sync::{
|
||||
Barrier,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
},
|
||||
thread,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
static DROP_C: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
#[derive(Clone)]
|
||||
#[repr(transparent)]
|
||||
struct StringWithDrop(String);
|
||||
|
||||
impl From<String> for StringWithDrop {
|
||||
fn from(value: String) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for StringWithDrop {
|
||||
fn drop(&mut self) {
|
||||
DROP_C.fetch_add(1, Ordering::Relaxed);
|
||||
println!("drop {self:?}");
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for StringWithDrop {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
<String as Debug>::fmt(&self.0, f)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn it_works() {
|
||||
let ll = Box::leak(Box::new(LinkedListWrapper::new()));
|
||||
let ll = Box::leak(Box::new(LinkedListWrapper::<StringWithDrop>::new()));
|
||||
let barrier = Barrier::new(3);
|
||||
|
||||
println!("{:#?}", ll.clone_into_vec());
|
||||
|
||||
ll.prepend("test");
|
||||
ll.prepend("another test");
|
||||
thread::scope(|s| {
|
||||
s.spawn(|| {
|
||||
barrier.wait();
|
||||
|
||||
println!("{:#?}", ll.clone_into_vec());
|
||||
for n in 0..100 {
|
||||
ll.prepend(format!("A {n}").into());
|
||||
}
|
||||
});
|
||||
s.spawn(|| {
|
||||
barrier.wait();
|
||||
|
||||
for n in 0..100 {
|
||||
ll.prepend(format!("B {n}").into());
|
||||
}
|
||||
});
|
||||
s.spawn(|| {
|
||||
barrier.wait();
|
||||
|
||||
for _ in 0..180 {
|
||||
unsafe {
|
||||
ll.pop();
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
let a = ll.clone_into_vec();
|
||||
println!(
|
||||
"{:?} len {} dropped {}",
|
||||
a,
|
||||
a.len(),
|
||||
DROP_C.load(Ordering::Relaxed)
|
||||
);
|
||||
|
||||
assert_eq!(4, 4);
|
||||
}
|
||||
|
Reference in New Issue
Block a user