Skip to content

Commit d1322bc

Browse files
authored
Merge pull request #32 from Abso1ut3Zer0/investigate-channel-notification-bug
replace deduplication set with a notification queue for reliable waking
2 parents 1b066c9 + 98165cd commit d1322bc

File tree

3 files changed

+192
-25
lines changed

3 files changed

+192
-25
lines changed

wavelet/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "wavelet"
3-
version = "0.4.6"
3+
version = "0.5.0"
44
edition = "2024"
55
description = "High-performance graph-based stream processing runtime"
66
license = "MIT OR Apache-2.0"
@@ -27,6 +27,7 @@ exclude = [
2727
ahash = "0.8.12"
2828
crossbeam-queue = "0.3.12"
2929
crossbeam-utils = "0.8.21"
30+
derive_builder = "0.20.2"
3031
enum-as-inner = "0.6.1"
3132
mio = { version = "1.0.4", features = ["os-poll", "net"] }
3233
petgraph = "0.8.2"

wavelet/src/channel.rs

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ impl<T> Channel<T> {
168168
#[cfg(test)]
169169
mod tests {
170170
use crate::Control;
171-
use crate::channel::{Receiver, TryReceiveError};
171+
use crate::channel::{Receiver, TryReceiveError, TrySendError};
172172
use crate::prelude::{Executor, TestClock};
173173
use crate::runtime::NodeBuilder;
174174
use std::time::Duration;
@@ -511,4 +511,138 @@ mod tests {
511511
assert!(executor.has_mutated(&node));
512512
assert_eq!(*node.borrow(), "1");
513513
}
514+
515+
// spurious wakeups are normal, this is just to examine them
516+
#[ignore]
517+
#[test]
518+
fn test_single_producer_spurious_wakeup() {
519+
use std::sync::Arc;
520+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
521+
use std::thread;
522+
use std::time::{Duration, Instant};
523+
524+
let mut executor = Executor::new();
525+
let mut clock = TestClock::new();
526+
527+
// The key metric: how many times the node wakes up to find nothing
528+
let empty_wakeups = Arc::new(AtomicUsize::new(0));
529+
let successful_receives = Arc::new(AtomicUsize::new(0));
530+
let messages_sent = Arc::new(AtomicUsize::new(0));
531+
let still_sending = Arc::new(AtomicBool::new(true));
532+
533+
let empty = empty_wakeups.clone();
534+
let success = successful_receives.clone();
535+
536+
let (_node, tx) = NodeBuilder::new(0usize)
537+
.build_with_channel(&mut executor, 64, move |state, _ctx, rx| {
538+
let mut got_any = false;
539+
540+
// Try to receive messages
541+
for _ in 0..16 {
542+
// Process up to 16 messages per cycle
543+
match rx.try_receive() {
544+
Ok(value) => {
545+
got_any = true;
546+
// Verify sequential ordering from single producer
547+
assert_eq!(value, *state, "Expected {}, got {}", *state, value);
548+
*state += 1;
549+
success.fetch_add(1, Ordering::Relaxed);
550+
}
551+
Err(TryReceiveError::Empty) => {
552+
break;
553+
}
554+
Err(TryReceiveError::ChannelClosed) => {
555+
break;
556+
}
557+
}
558+
}
559+
560+
// THE BUG: If we were woken up but got nothing, that's wrong!
561+
if !got_any {
562+
empty.fetch_add(1, Ordering::Relaxed);
563+
println!("BUG at message {}: Node woke but channel empty!", *state);
564+
}
565+
566+
Control::Unchanged
567+
})
568+
.unwrap();
569+
570+
let sent = messages_sent.clone();
571+
let running = still_sending.clone();
572+
573+
// Single producer thread sending sequential messages
574+
let sender = thread::spawn(move || {
575+
for i in 0..10000 {
576+
// Send sequential numbers
577+
loop {
578+
match tx.try_send(i) {
579+
Ok(_) => {
580+
sent.fetch_add(1, Ordering::Relaxed);
581+
break;
582+
}
583+
Err(TrySendError::ChannelFull(_)) => {
584+
// Channel full, yield and retry
585+
thread::yield_now();
586+
continue;
587+
}
588+
Err(TrySendError::ChannelClosed(_)) => {
589+
return;
590+
}
591+
}
592+
}
593+
594+
// Vary the timing slightly
595+
if i % 100 == 0 {
596+
thread::yield_now();
597+
}
598+
}
599+
running.store(false, Ordering::Relaxed);
600+
});
601+
602+
// Run executor on main thread
603+
let start = Instant::now();
604+
let mut cycles = 0;
605+
606+
while still_sending.load(Ordering::Relaxed)
607+
|| successful_receives.load(Ordering::Relaxed) < messages_sent.load(Ordering::Relaxed)
608+
{
609+
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
610+
cycles += 1;
611+
612+
// Occasionally yield to create scheduling variance
613+
if cycles % 50 == 0 {
614+
thread::yield_now();
615+
}
616+
617+
if start.elapsed() > Duration::from_secs(5) {
618+
println!("Timeout!");
619+
break;
620+
}
621+
}
622+
623+
sender.join().unwrap();
624+
625+
// Final drain
626+
for _ in 0..10 {
627+
executor.cycle(&mut clock, Some(Duration::ZERO)).unwrap();
628+
}
629+
630+
let empty = empty_wakeups.load(Ordering::Relaxed);
631+
let success = successful_receives.load(Ordering::Relaxed);
632+
let sent = messages_sent.load(Ordering::Relaxed);
633+
634+
println!(
635+
"Sent: {}, Received: {}, Empty wakeups: {}",
636+
sent, success, empty
637+
);
638+
println!("Ran {} cycles in {:?}", cycles, start.elapsed());
639+
640+
// Critical assertion: should NEVER wake up to empty channel
641+
assert_eq!(
642+
empty, 0,
643+
"Memory ordering bug detected: {} empty wakeups",
644+
empty
645+
);
646+
assert_eq!(sent, success, "Lost {} messages", sent - success);
647+
}
514648
}

wavelet/src/runtime/event_driver.rs

Lines changed: 55 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,42 +8,45 @@ pub use crate::runtime::event_driver::yield_driver::*;
88
use crate::runtime::graph::Graph;
99
use crate::runtime::scheduler::Scheduler;
1010
use crate::runtime::{Clock, CycleTime};
11-
use ahash::{HashSet, HashSetExt};
11+
use crossbeam_queue::ArrayQueue;
12+
use derive_builder::Builder;
1213
use petgraph::prelude::NodeIndex;
1314
use std::io;
1415
use std::sync::Arc;
1516
use std::time::Duration;
1617

17-
const MINIMUM_TIMER_PRECISION: std::time::Duration = std::time::Duration::from_millis(1);
18-
const IO_CAPACITY: usize = 1024;
19-
const EVENT_CAPACITY: usize = 1024;
18+
const MINIMUM_TIMER_PRECISION: Duration = Duration::from_millis(1);
2019

2120
/// A handle for waking a node from external threads or contexts.
2221
///
2322
/// `Notifier` provides a thread-safe way to schedule a node for execution
2423
/// from outside the main event loop. Useful for integrating with external
2524
/// libraries, user input, or cross-thread communication.
2625
pub struct Notifier {
27-
raw_events: Arc<spin::Mutex<HashSet<NodeIndex>>>,
26+
notifications: Arc<ArrayQueue<NodeIndex>>,
2827
waker: Arc<mio::Waker>,
2928
node_index: NodeIndex,
3029
}
3130

3231
impl Clone for Notifier {
3332
fn clone(&self) -> Self {
34-
Self::new(self.raw_events.clone(), self.waker.clone(), self.node_index)
33+
Self::new(
34+
self.notifications.clone(),
35+
self.waker.clone(),
36+
self.node_index,
37+
)
3538
}
3639
}
3740

3841
impl Notifier {
3942
/// Creates a new notifier handle (internal use only).
4043
const fn new(
41-
raw_events: Arc<spin::Mutex<HashSet<NodeIndex>>>,
44+
notifications: Arc<ArrayQueue<NodeIndex>>,
4245
waker: Arc<mio::Waker>,
4346
node_index: NodeIndex,
4447
) -> Self {
4548
Self {
46-
raw_events,
49+
notifications,
4750
waker,
4851
node_index,
4952
}
@@ -53,13 +56,27 @@ impl Notifier {
5356
///
5457
/// This method is thread-safe and can be called from any thread.
5558
/// The node will be scheduled on the next polling cycle.
59+
///
60+
/// Note: we assume that notification events will coalesce,
61+
/// so we only attempt to write to the notification queue.
5662
#[inline(always)]
5763
pub fn notify(&self) {
58-
self.raw_events.lock().insert(self.node_index);
64+
self.notifications.push(self.node_index).ok();
5965
self.waker.wake().ok();
6066
}
6167
}
6268

69+
/// Driver configuration options.
70+
#[derive(Builder)]
71+
pub struct EventDriverConfig {
72+
#[builder(default = 256)]
73+
pub notification_capacity: usize,
74+
#[builder(default = 1024)]
75+
pub io_capacity: usize,
76+
#[builder(default = 16)]
77+
pub poll_limit: usize,
78+
}
79+
6380
/// Unified event management system that coordinates all event sources.
6481
///
6582
/// The `EventDriver` orchestrates the three core event types in the runtime:
@@ -80,23 +97,32 @@ pub struct EventDriver {
8097
/// Processes immediate yield requests
8198
yield_driver: YieldDriver,
8299

83-
/// Tracks deduplicated raw events that have been received
84-
raw_events: Arc<spin::Mutex<HashSet<NodeIndex>>>,
100+
/// Notification queue tracking events on node indices
101+
notifications: Arc<ArrayQueue<NodeIndex>>,
102+
103+
/// Poll limit for pulling node indices off of
104+
/// the notifications queue
105+
poll_limit: usize,
85106
}
86107

87108
impl EventDriver {
88109
/// Creates a new event driver with default I/O capacity.
89110
pub(crate) fn new() -> Self {
90-
Self::with_capacity(IO_CAPACITY)
111+
Self::with_config(
112+
EventDriverConfigBuilder::default()
113+
.build()
114+
.expect("expected default builder"),
115+
)
91116
}
92117

93-
/// Creates a new event driver with the specified I/O event capacity.
94-
pub(crate) fn with_capacity(capacity: usize) -> Self {
118+
/// Creates a new event driver with the specified notification capacity.
119+
pub(crate) fn with_config(cfg: EventDriverConfig) -> Self {
95120
Self {
96-
io_driver: IoDriver::with_capacity(capacity),
121+
io_driver: IoDriver::with_capacity(cfg.io_capacity),
97122
timer_driver: TimerDriver::new(),
98123
yield_driver: YieldDriver::new(),
99-
raw_events: Arc::new(spin::Mutex::new(HashSet::with_capacity(EVENT_CAPACITY))),
124+
notifications: Arc::new(ArrayQueue::new(cfg.notification_capacity)),
125+
poll_limit: cfg.poll_limit,
100126
}
101127
}
102128

@@ -118,7 +144,11 @@ impl EventDriver {
118144
/// Creates a new `Notifier` to inform the event driver of a raw event.
119145
#[inline(always)]
120146
pub fn register_notifier(&self, node_index: NodeIndex) -> Notifier {
121-
Notifier::new(self.raw_events.clone(), self.io_driver.waker(), node_index)
147+
Notifier::new(
148+
self.notifications.clone(),
149+
self.io_driver.waker(),
150+
node_index,
151+
)
122152
}
123153

124154
/// Polls all event sources and schedules ready nodes.
@@ -140,13 +170,15 @@ impl EventDriver {
140170
timeout: Option<Duration>,
141171
epoch: usize,
142172
) -> io::Result<CycleTime> {
143-
{
144-
let mut raw_events = self.raw_events.lock();
145-
raw_events.drain().for_each(|node_idx| {
146-
if let Some(depth) = graph.can_schedule(node_idx, epoch) {
147-
scheduler.schedule(node_idx, depth).ok();
173+
for _ in 0..self.poll_limit {
174+
match self.notifications.pop() {
175+
None => break,
176+
Some(node_idx) => {
177+
if let Some(depth) = graph.can_schedule(node_idx, epoch) {
178+
scheduler.schedule(node_idx, depth).ok();
179+
}
148180
}
149-
});
181+
}
150182
}
151183

152184
let cycle_time = clock.cycle_time();

0 commit comments

Comments
 (0)