Compare commits

...

2 Commits

Author SHA1 Message Date
4af59d5ae0 fixes: (read comment)
- `remove` lock better synchronization
- `remove` better drop impl
- `remove` fixed potential race condition / deadlock
- `append` forgor to modify first node's `prev`
- feat: add `pop` fn

note: for now test with `cargo test -- --nocapture`
2025-07-18 02:26:16 +02:00
326ad9822d rm: udeps 2025-07-18 01:00:15 +02:00
4 changed files with 237 additions and 127 deletions

47
Cargo.lock generated
View File

@@ -2,17 +2,6 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 4 version = 4
[[package]]
name = "atomic_enum"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99e1aca718ea7b89985790c94aad72d77533063fe00bc497bb79a7c2dae6a661"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.5.0" version = "1.5.0"
@@ -35,7 +24,6 @@ checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268"
name = "concurrent-linked-list" name = "concurrent-linked-list"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"atomic_enum",
"parking_lot", "parking_lot",
] ]
@@ -78,24 +66,6 @@ dependencies = [
"windows-targets", "windows-targets",
] ]
[[package]]
name = "proc-macro2"
version = "1.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d"
dependencies = [
"proc-macro2",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.5.13" version = "0.5.13"
@@ -117,23 +87,6 @@ version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
[[package]]
name = "syn"
version = "2.0.104"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "unicode-ident"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512"
[[package]] [[package]]
name = "windows-targets" name = "windows-targets"
version = "0.52.6" version = "0.52.6"

View File

@@ -4,5 +4,4 @@ version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
atomic_enum = "0.3.0"
parking_lot = { version = "0", default-features = false } parking_lot = { version = "0", default-features = false }

View File

@@ -1,14 +1,14 @@
#![feature(arbitrary_self_types)] #![feature(arbitrary_self_types, offset_of_enum, generic_const_items)]
#![warn(clippy::pedantic)] #![warn(clippy::pedantic)]
#![allow(incomplete_features)]
use std::{ use std::{
alloc::{Layout, dealloc},
hint::unreachable_unchecked, hint::unreachable_unchecked,
mem::MaybeUninit, mem::{MaybeUninit, offset_of},
ops::Deref, ops::Deref,
}; };
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use parking_lot::{RwLock, RwLockWriteGuard};
mod docs { mod docs {
//! Rules for soundness of modifications. //! Rules for soundness of modifications.
@@ -50,6 +50,47 @@ impl<T: 'static> NodeDiscr<T> {
}; };
head head
} }
/// # Safety
/// UB if [`self`] is not [`Self::Node`].
pub unsafe fn as_node_unchecked(&self) -> &Node<T> {
let Self::Node(node) = self else {
unsafe { unreachable_unchecked() }
};
node
}
fn try_write(&'static self) -> Option<NodeDiscrWriteLocks<'static, T>> {
match self {
NodeDiscr::Head(h) => {
let lock = h.try_write()?;
Some(NodeDiscrWriteLocks::Head(lock))
}
NodeDiscr::Node(n) => {
let lock = n.0.try_write()?;
Some(NodeDiscrWriteLocks::Node(lock))
}
}
}
/// # Safety
///
/// Will leak if not handled properly.
#[must_use]
#[allow(clippy::mut_from_ref)]
fn alloc_new_with_ptrs(
data: T,
next: Option<&'static Node<T>>,
prev: &'static NodeDiscr<T>,
) -> (&'static Self, &'static Node<T>) {
let discr = Box::leak(Box::new(NodeDiscr::Node(Node(RwLock::new(NodeInner {
next: MaybeUninit::new(next),
prev: MaybeUninit::new(prev),
isolated: false,
data,
})))));
(discr, unsafe { discr.as_node_unchecked() })
}
} }
#[allow(dead_code)] // We dont even read variants, just hold whatever lock #[allow(dead_code)] // We dont even read variants, just hold whatever lock
@@ -58,6 +99,15 @@ enum NodeDiscrWriteLocks<'a, T: 'static> {
Node(RwLockWriteGuard<'a, NodeInner<T>>), Node(RwLockWriteGuard<'a, NodeInner<T>>),
} }
impl<T: 'static> NodeDiscrWriteLocks<'_, T> {
fn set_next(&mut self, next: Option<&'static Node<T>>) {
match self {
Self::Head(h) => h.start = next,
Self::Node(n) => n.next = MaybeUninit::new(next),
}
}
}
#[repr(transparent)] #[repr(transparent)]
pub struct Node<T: 'static>(RwLock<NodeInner<T>>); pub struct Node<T: 'static>(RwLock<NodeInner<T>>);
@@ -66,6 +116,9 @@ pub struct Node<T: 'static>(RwLock<NodeInner<T>>);
struct NodeInner<T: 'static> { struct NodeInner<T: 'static> {
next: MaybeUninit<Option<&'static Node<T>>>, next: MaybeUninit<Option<&'static Node<T>>>,
prev: MaybeUninit<&'static NodeDiscr<T>>, prev: MaybeUninit<&'static NodeDiscr<T>>,
/// intended for removal, when the `RwLock` is being "drained" from waiters, there might be
/// another remover waiting, if it finds this it simply aborts
isolated: bool,
data: T, data: T,
} }
@@ -82,43 +135,63 @@ impl<T> NodeInner<T> {
unsafe { self.prev.assume_init() } unsafe { self.prev.assume_init() }
} }
/// Could also leak memory // /// Could also leak memory
/// // ///
/// # Safety // /// # Safety
/// The `prev` self ptr is valid as long as the write lock is held, as soon as it's dropped it // /// The `prev` self ptr is valid as long as the write lock is held, as soon as it's dropped it
/// becomes invalid. // /// becomes invalid.
fn update_prev(&self) -> NodeDiscrWriteLocks<'static, T> { // fn try_update_prev(&self) -> Option<NodeDiscrWriteLocks<'static, T>> {
match self.prev() { // match self.prev() {
NodeDiscr::Head(h) => { // NodeDiscr::Head(h) => {
let mut lock = h.write(); // let mut lock = h.try_write()?;
lock.start = unsafe { self.next.assume_init() }; // lock.start = unsafe { self.next.assume_init() };
NodeDiscrWriteLocks::Head(lock) // Some(NodeDiscrWriteLocks::Head(lock))
} // }
NodeDiscr::Node(n) => { // NodeDiscr::Node(n) => {
let mut lock = n.0.write(); // let mut lock = n.0.try_write()?;
lock.next = self.next; // lock.next = self.next;
NodeDiscrWriteLocks::Node(lock) // Some(NodeDiscrWriteLocks::Node(lock))
} // }
} // }
} // }
fn next(&self) -> Option<&'static Node<T>> { fn next(&self) -> Option<&'static Node<T>> {
unsafe { self.next.assume_init() } unsafe { self.next.assume_init() }
} }
/// Could also leak memory // /// Could also leak memory.
/// // ///
/// # Safety // /// First option is if theres any next, second one if it locked or not.
/// The `next`self ptr is valid as long as the write lock is held, as soon as it's dropped it // ///
/// becomes invalid. // /// # Safety
fn update_next(&self) -> Option<RwLockWriteGuard<'static, NodeInner<T>>> { // /// The `next` self ptr is valid as long as the write lock is held, as soon as it's dropped it
if let Some(next) = self.next() { // /// becomes invalid.
let mut lock = next.0.write(); // fn try_update_next(&self) -> Option<Option<RwLockWriteGuard<'static, NodeInner<T>>>> {
lock.prev = self.prev; // self.next().map(|next| {
Some(lock) // if let Some(mut lock) = next.0.try_write() {
// lock.prev = self.prev;
// Some(lock)
// } else {
// None
// }
// })
// }
#[allow(clippy::type_complexity)]
fn try_write_sides(
&self,
) -> Option<(
NodeDiscrWriteLocks<'static, T>,
Option<RwLockWriteGuard<'static, NodeInner<T>>>,
)> {
let prev_lock = self.prev().try_write()?;
let next_lock = if let Some(next_lock) = self.next() {
Some(next_lock.0.try_write()?)
} else { } else {
None None
} };
Some((prev_lock, next_lock))
} }
} }
@@ -134,23 +207,7 @@ impl<T> Node<T> {
Box::leak(Box::new(Node(RwLock::new(NodeInner { Box::leak(Box::new(Node(RwLock::new(NodeInner {
next: MaybeUninit::uninit(), next: MaybeUninit::uninit(),
prev: MaybeUninit::uninit(), prev: MaybeUninit::uninit(),
data, isolated: true,
}))))
}
/// # Safety
///
/// Will leak if not handled properly.
#[must_use]
#[allow(clippy::mut_from_ref)]
fn alloc_new_with_ptrs(
data: T,
next: Option<&'static Node<T>>,
prev: &'static NodeDiscr<T>,
) -> &'static mut Self {
Box::leak(Box::new(Node(RwLock::new(NodeInner {
next: MaybeUninit::new(next),
prev: MaybeUninit::new(prev),
data, data,
})))) }))))
} }
@@ -160,22 +217,39 @@ impl<T> Node<T> {
/// awaiting. Adyacent write locks are also sent back to prevent their modification since the /// awaiting. Adyacent write locks are also sent back to prevent their modification since the
/// isolation and make the pointers of self still valid. /// isolation and make the pointers of self still valid.
/// ///
/// If it returns None, the node was already isolated.
///
/// # Safety /// # Safety
/// ///
/// Its unsafe to access `next` and `prev` ptr's after the adge locks are dropped. /// Its unsafe to access `next` and `prev` ptr's after the edge locks are dropped.
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
fn isolate( fn isolate(
&'_ self, &'_ self,
) -> ( ) -> Option<(
RwLockReadGuard<'_, NodeInner<T>>, RwLockWriteGuard<'_, NodeInner<T>>,
( (
NodeDiscrWriteLocks<'static, T>, NodeDiscrWriteLocks<'static, T>,
Option<RwLockWriteGuard<'static, NodeInner<T>>>, Option<RwLockWriteGuard<'static, NodeInner<T>>>,
), ),
) { )> {
let node = self.0.read(); loop {
let edge_locks = (node.update_prev(), node.update_next()); let mut node = self.0.write();
(node, edge_locks) if node.isolated {
break None;
}
let Some(mut sides) = node.try_write_sides() else {
drop(node);
std::thread::yield_now();
continue;
};
node.isolated = true;
sides.0.set_next(node.next());
if let Some(next) = &mut sides.1 {
next.prev = node.prev;
}
break Some((node, sides));
}
} }
/// # Safety /// # Safety
@@ -188,35 +262,32 @@ impl<T> Node<T> {
/// lead to weird UB. /// lead to weird UB.
pub unsafe fn remove(&self) { pub unsafe fn remove(&self) {
unsafe { unsafe {
let (lock, edge_locks) = self.isolate(); let Some((node, edge_locks)) = self.isolate() else {
return;
};
// Drop the allocated data, edge ptrs remain valid meanwhile // Drop the allocated data, edge ptrs remain valid meanwhile
drop(lock); // let other readers/writers finish with this item drop(node); // let other readers/writers finish with this item
let data = loop { loop {
if self.0.is_locked() { if self.0.is_locked() {
std::thread::yield_now(); std::thread::yield_now();
} else { } else {
break self.0.data_ptr(); break;
} }
}; }
std::ptr::drop_in_place(data);
drop(edge_locks); // edge ptrs become invalid form now on
// Now that we are the only ref to ourselves its ok to take outselves as mutable // Now that we are the only ref to ourselves its ok to take outselves as mutable
let myself = std::ptr::from_ref(self).cast_mut(); let myself = std::ptr::from_ref(self).cast_mut();
#[allow(clippy::items_after_statements)]
const OFFSET<T>: usize = offset_of!(NodeDiscr<T>, Node.0);
let myself_discr = myself.wrapping_byte_sub(OFFSET::<T>).cast::<NodeDiscr<T>>();
// And free this drop(Box::from_raw(myself_discr));
dealloct(myself); drop(edge_locks); // edge ptrs become invalid form now on
} }
} }
} }
unsafe fn dealloct<T>(data: *mut T) {
unsafe {
dealloc(data.cast(), Layout::new::<T>());
}
}
pub struct LinkedList<T: 'static> { pub struct LinkedList<T: 'static> {
start: Option<&'static Node<T>>, start: Option<&'static Node<T>>,
} }
@@ -250,8 +321,12 @@ impl<T> LinkedList<T> {
debug_assert!(std::ptr::eq(RwLockWriteGuard::rwlock(&self), ll_head)); debug_assert!(std::ptr::eq(RwLockWriteGuard::rwlock(&self), ll_head));
} }
let first_node = self.start; let next = self.start;
let new_node = Node::alloc_new_with_ptrs(data, first_node, head_ref); let (new_node_discr, new_node) = NodeDiscr::alloc_new_with_ptrs(data, next, head_ref);
if let Some(next) = next {
let mut next = next.0.write();
next.prev = MaybeUninit::new(new_node_discr);
}
self.start = Some(new_node); self.start = Some(new_node);
} }
} }
@@ -279,6 +354,25 @@ impl<T: 'static> LinkedListWrapper<T> {
unsafe { self.inner.as_head_unchecked() } unsafe { self.inner.as_head_unchecked() }
} }
/// # Safety
///
/// Nothing external must point to the item about to be popped.
pub unsafe fn pop(&'static self) {
loop {
let head_read = self.as_head().read();
let Some(node) = head_read.start else {
std::thread::yield_now();
continue;
};
unsafe {
drop(head_read);
node.remove();
break;
}
}
}
pub fn prepend(&'static self, data: T) { pub fn prepend(&'static self, data: T) {
let lock = self.as_head().write(); let lock = self.as_head().write();
unsafe { unsafe {

View File

@@ -1,15 +1,79 @@
use std::{
fmt::Debug,
sync::{
Barrier,
atomic::{AtomicUsize, Ordering},
},
thread,
};
use super::*; use super::*;
static DROP_C: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone)]
#[repr(transparent)]
struct StringWithDrop(String);
impl From<String> for StringWithDrop {
fn from(value: String) -> Self {
Self(value)
}
}
impl Drop for StringWithDrop {
fn drop(&mut self) {
DROP_C.fetch_add(1, Ordering::Relaxed);
println!("drop {self:?}");
}
}
impl Debug for StringWithDrop {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
<String as Debug>::fmt(&self.0, f)
}
}
#[test] #[test]
fn it_works() { fn it_works() {
let ll = Box::leak(Box::new(LinkedListWrapper::new())); let ll = Box::leak(Box::new(LinkedListWrapper::<StringWithDrop>::new()));
let barrier = Barrier::new(3);
println!("{:#?}", ll.clone_into_vec()); println!("{:#?}", ll.clone_into_vec());
ll.prepend("test"); thread::scope(|s| {
ll.prepend("another test"); s.spawn(|| {
barrier.wait();
println!("{:#?}", ll.clone_into_vec()); for n in 0..100 {
ll.prepend(format!("A {n}").into());
}
});
s.spawn(|| {
barrier.wait();
for n in 0..100 {
ll.prepend(format!("B {n}").into());
}
});
s.spawn(|| {
barrier.wait();
for _ in 0..180 {
unsafe {
ll.pop();
}
}
});
});
let a = ll.clone_into_vec();
println!(
"{:?} len {} dropped {}",
a,
a.len(),
DROP_C.load(Ordering::Relaxed)
);
assert_eq!(4, 4); assert_eq!(4, 4);
} }