Extract scheduler loop so its easier to grok
- Id
- 2256fb70cd416ef38da79a717c453bc9303fd044
- Author
- Caio
- Commit time
- 2024-10-06T10:22:02+02:00
Modified examples/foca_insecure_udp_agent.rs
SendBroadcast,
}
+// The reason we use a scheduler instead of simply spawning
+// a task and sleeping within is to guarantee that events
+// are *delivered* in the correct order
+// See: https://github.com/caio/foca/issues/26
+// See: https://github.com/caio/foca/pull/36#issuecomment-2339965304
async fn launch_scheduler(
timer_tx: mpsc::Sender<Input<ID>>,
) -> mpsc::UnboundedSender<(Instant, Timer<ID>)> {
// being sure we're deadlock safe.
// Since the `timer_tx` handler is the only thing that submits events
// the buffer growth is effectivelly bound
- let (tx, mut rx) = mpsc::unbounded_channel::<(Instant, Timer<ID>)>();
+ let (tx, receiver) = mpsc::unbounded_channel::<(Instant, Timer<ID>)>();
- let mut queue = TimerQueue::new();
+ let mut scheduler = Scheduler {
+ receiver,
+ queue: TimerQueue::new(),
+ sender: timer_tx,
+ };
tokio::spawn(async move {
- 'handler: loop {
+ if let Err(err) = scheduler.run().await {
+ tracing::error!(
+ err = tracing::field::debug(err),
+ "Error encountered within scheduler. Shutting down"
+ );
+ }
+ tracing::info!("Scheduler is shut down")
+ });
+
+ tx
+}
+
+struct Scheduler {
+ // receives events to be dispatched at a given instant
+ receiver: mpsc::UnboundedReceiver<(Instant, Timer<ID>)>,
+ // queue of events to be dispatched
+ queue: TimerQueue,
+ // where events get sent when the time is right
+ sender: mpsc::Sender<Input<ID>>,
+}
+
+impl Scheduler {
+ async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
+ loop {
let now = Instant::now();
- macro_rules! submit_event {
- ($event:expr) => {
- if let Err(err) = timer_tx.send(Input::Event($event)).await {
- tracing::error!(
- err = tracing::field::debug(err),
- "Error submitting timer event. Shutting down timer task"
- );
- rx.close();
- break 'handler;
- }
- };
- ($when:expr, $event:expr) => {
- if $when < now {
- submit_event!($event);
- } else {
- queue.enqueue($when, $event);
- }
- };
- }
-
- // XXX Maybe watch for large `now - _ins` deltas to detect runtime lag
- while let Some((_ins, event)) = queue.pop_next(&now) {
- submit_event!(event);
+ while let Some((_ins, event)) = self.queue.pop_next(&now) {
+ self.sender.send(Input::Event(event)).await?;
}
// If the queue is not empty, we have a deadline: can only
// wait until we reach `wake_at`
- if let Some(wake_at) = queue.next_deadline() {
- // wait for input OR sleep
+ if let Some(wake_at) = self.queue.next_deadline() {
let sleep_fut = sleep_until(*wake_at);
- let recv_fut = rx.recv();
+ let recv_fut = self.receiver.recv();
tokio::select! {
_ = sleep_fut => {
// woke up after deadline, time to handle events
- continue 'handler;
+ continue;
},
maybe = recv_fut => {
- if maybe.is_none() {
+ if let Some((when, event)) = maybe {
+ self.queue.enqueue(when,event);
+ } else {
// channel closed
- break 'handler;
+ return Ok(());
}
- let (when, event) = maybe.expect("checked for None already");
- submit_event!(when, event);
}
};
} else {
// Otherwise we'll wait until someone submits a new deadline
- if let Some((when, event)) = rx.recv().await {
- submit_event!(when, event);
+ if let Some((when, event)) = self.receiver.recv().await {
+ self.queue.enqueue(when, event);
} else {
// channel closed
- break 'handler;
+ return Ok(());
}
}
}
- });
-
- tx
+ }
}
// Just a (Instant, Timer) min-heap