caio.co/de/foca

promote AccumulatingRuntime to supported

It's useful enough, lets me get rid of yet another dummy runtime impl
and allows examples to be smaller
Id
f892da1e609b8a1793f0bb1c27b681537f858deb
Author
Caio
Commit time
2024-03-16T20:42:20+01:00

Modified examples/broadcasting.rs

@@ -7,7 +7,7
};

use bincode::Options;
-use bytes::{Buf, BufMut, Bytes, BytesMut};
+use bytes::Buf;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

Modified examples/foca_insecure_udp_agent.rs

@@ -1,8 +1,8
/* Any copyright is dedicated to the Public Domain.
* https://creativecommons.org/publicdomain/zero/1.0/ */
use std::{
cmp::Reverse, collections::BinaryHeap, fs::File, io::Write, net::SocketAddr, path::Path,
- str::FromStr, sync::Arc, time::Duration,
+ str::FromStr, sync::Arc,
};

use tracing_subscriber::{
@@ -20,7 +20,7
time::{sleep_until, Instant},
};

-use foca::{Config, Foca, Identity, Notification, PostcardCodec, Runtime, Timer};
+use foca::{AccumulatingRuntime, Config, Foca, Identity, Notification, PostcardCodec, Timer};

#[derive(Debug)]
struct CliParams {
@@ -155,51 +155,6
}
}

-struct AccumulatingRuntime<T> {
- pub to_send: Vec<(T, Bytes)>,
- pub to_schedule: Vec<(Duration, Timer<T>)>,
- pub notifications: Vec<Notification<T>>,
- buf: BytesMut,
-}
-
-impl<T: Identity> Runtime<T> for AccumulatingRuntime<T> {
- // Notice that we'll interact to these via pop(), so we're taking
- // them in reverse order of when it happened.
- // That's perfectly fine, the order of items from a single interaction
- // is irrelevant. A "nicer" implementation could use VecDeque or
- // react directly here instead of accumulating.
-
- fn notify(&mut self, notification: Notification<T>) {
- self.notifications.push(notification);
- }
-
- fn send_to(&mut self, to: T, data: &[u8]) {
- let mut packet = self.buf.split();
- packet.put_slice(data);
- self.to_send.push((to, packet.freeze()));
- }
-
- fn submit_after(&mut self, event: Timer<T>, after: Duration) {
- // We could spawn+sleep here
- self.to_schedule.push((after, event));
- }
-}
-
-impl<T> AccumulatingRuntime<T> {
- pub fn new() -> Self {
- Self {
- to_send: Vec::new(),
- to_schedule: Vec::new(),
- notifications: Vec::new(),
- buf: BytesMut::new(),
- }
- }
-
- pub fn backlog(&self) -> usize {
- self.to_send.len() + self.to_schedule.len() + self.notifications.len()
- }
-}
-
fn do_the_file_replace_dance<'a>(
fname: &str,
addrs: impl Iterator<Item = &'a SocketAddr>,
@@ -314,14 +269,14
// and then drain the runtime.

// First we submit everything that needs to go to the network
- while let Some((dst, data)) = runtime.to_send.pop() {
+ while let Some((dst, data)) = runtime.to_send() {
// ToSocketAddrs would be the fancy thing to use here
let _ignored_send_result = tx_send_data.send((dst.addr, data)).await;
}

// Then schedule what needs to be scheduled
let now = Instant::now();
- while let Some((delay, event)) = runtime.to_schedule.pop() {
+ while let Some((delay, event)) = runtime.to_schedule() {
scheduler
.send((now + delay, event))
.expect("error handling");
@@ -344,7 +299,7
// so other proccesses periodically open()/read()/close()
// to figure out the cluster members.
let mut active_list_has_changed = false;
- while let Some(notification) = runtime.notifications.pop() {
+ while let Some(notification) = runtime.to_notify() {
match notification {
Notification::MemberUp(_) | Notification::MemberDown(_) => {
active_list_has_changed = true

Modified src/lib.rs

@@ -153,7 +153,7
identity::Identity,
member::{Incarnation, Member, State},
payload::{Header, Message, ProbeNumber},
- runtime::{Notification, Runtime, Timer, TimerToken},
+ runtime::{AccumulatingRuntime, Notification, Runtime, Timer, TimerToken},
};

#[cfg(feature = "postcard-codec")]
@@ -1663,7 +1663,7
use bytes::{Buf, BufMut, Bytes};
use rand::{rngs::SmallRng, SeedableRng};

- use crate::testing::{BadCodec, InMemoryRuntime, ID};
+ use crate::testing::{BadCodec, ID};

fn rng() -> SmallRng {
SmallRng::seed_from_u64(0xF0CA)
@@ -1726,7 +1726,7

assert_eq!(Err(Error::NotUndead), foca.reuse_down_identity());

- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
assert_eq!(
Err(Error::SameIdentity),
foca.change_identity(identity, &mut runtime)
@@ -1764,7 +1764,7
fn cant_probe_when_not_connected() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());

- let runtime = InMemoryRuntime::new();
+ let runtime = AccumulatingRuntime::new();
let res = foca.handle_timer(Timer::ProbeRandomMember(foca.timer_token()), runtime);

assert_eq!(Err(Error::NotConnected), res);
@@ -1886,7 +1886,7
let mut foca_one = Foca::new(ID::new(1), config(), rng(), codec());
let mut foca_two = Foca::new(ID::new(2), config(), rng(), codec());

- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// Here foca_one will send an announce packet to foca_two
foca_one
@@ -1956,7 +1956,7
let one = ID::new(1);
let two = ID::new(2);
let mut foca_one = Foca::new(one, config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

assert_eq!(Ok(()), foca_one.announce(two, &mut runtime));
let data = runtime
@@ -2038,7 +2038,7
(header, updates)
};

- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
assert_eq!(Ok(()), foca.handle_data(&encode(data), &mut runtime));

expect_notification!(runtime, Notification::<ID>::Active);
@@ -2155,7 +2155,7
#[test]
fn new_down_member_triggers_remove_down_scheduling() -> Result<()> {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// ID::new(2) is new and down
foca.apply(Member::down(ID::new(2)), &mut runtime)?;
@@ -2196,7 +2196,7
#[test]
fn notification_triggers() -> Result<()> {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// Brand new member. The first in our set, so we should
// also be notified about going active
@@ -2290,7 +2290,7
// This test verifies that not submitting the second
// timer event causes an error.
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// Add an active member so that the probing can start
foca.apply(Member::alive(ID::new(2)), &mut runtime)?;
@@ -2335,7 +2335,7
// This test verifies that if someone ask us to talk to
// ourselves via this mechanism, an error occurrs.
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

let probe_number = foca.probe().probe_number();
let indirect_messages = vec![
@@ -2375,7 +2375,7
#[test]
fn cant_receive_data_from_same_identity() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

assert_eq!(
Err(Error::DataFromOurselves),
@@ -2398,7 +2398,7
fn cant_receive_data_from_same_addr() {
let id = ID::new(1);
let mut foca = Foca::new(id, config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// Just the address is the same now
assert_eq!(
@@ -2421,7 +2421,7
#[test]
fn cant_receive_announce_with_extra_data() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

assert_eq!(
Err(Error::MalformedPacket),
@@ -2473,7 +2473,7
let target_id = ID::new_with_bump(1, 255);
let codec = codec();
let mut foca = Foca::new(target_id, config(), rng(), codec);
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// Our goal is getting `src` to join `target_id`'s cluster.
let src_id = ID::new(2);
@@ -2521,7 +2521,7
#[test]
fn suspicion_refutal() -> Result<()> {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

let original_incarnation = foca.incarnation();

@@ -2553,7 +2553,7
#[test]
fn incarnation_does_not_increase_for_stale_suspicion() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

let suspected_incarnation = 10;
let update = Member::new(ID::new(1), suspected_incarnation, State::Suspect);
@@ -2571,7 +2571,7
#[test]
fn gossips_when_being_suspected() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// just one peer in the cluster, for simplificy's sake
assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(2)), &mut runtime));

@@ -2597,7 +2597,7
#[test]
fn change_identity_gossips_immediately() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// Introduce a new member so we have someone to gossip to
assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(2)), &mut runtime));
@@ -2621,7 +2621,7
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let orig_timer_token = foca.timer_token();

- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
assert_eq!(Ok(()), foca.change_identity(ID::new(2), &mut runtime));

assert_ne!(orig_timer_token, foca.timer_token());
@@ -2631,7 +2631,7
fn renew_during_probe_shouldnt_cause_errors() {
let id = ID::new(1).rejoinable();
let mut foca = Foca::new(id, config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

let updates = [
Member::alive(ID::new(2)),
@@ -2705,7 +2705,7
Timer<ID>,
) {
let mut foca = Foca::new(ID::new(1), config.clone(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

assert!(num_members > 0);
// Assume some members exist
@@ -2754,7 +2754,7

// A foca is probing
let (mut foca, _probed, _send_indirect_probe) = craft_probing_foca(2, config());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// Clippy gets it wrong here: can't use just the plain iterator
// otherwise foca remains borrowed
@@ -2778,7 +2778,7
#[test]
fn probe_ping_ack_cycle() {
let (mut foca, probed, send_indirect_probe) = craft_probing_foca(5, config());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// Now if probed replies before the timer fires, the probe
// should complete and the indirect probe cycle shouldn't
@@ -2805,7 +2805,7
#[test]
fn probe_cycle_requires_correct_probe_number() {
let (mut foca, probed, send_indirect_probe) = craft_probing_foca(5, config());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

let incorrect_probe_number = foca.probe().probe_number() + 1;
assert_ne!(incorrect_probe_number, foca.probe().probe_number());
@@ -2842,7 +2842,7
// we don't send more requests than the configured value.
let (mut foca, probed, send_indirect_probe) =
craft_probing_foca((num_indirect_probes + 2) as u8, config());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// `probed` did NOT reply with an Ack before the timer
assert_eq!(Ok(()), foca.handle_timer(send_indirect_probe, &mut runtime));
@@ -2942,7 +2942,7
#[test]
fn probe_receiving_ping_replies_with_ack() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

let probe_number = foca.probe().probe_number();
let data = (
@@ -2963,7 +2963,7
#[test]
fn probe_receiving_ping_req_sends_indirect_ping() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

let probe_number = foca.probe().probe_number();
let data = (
@@ -2993,7 +2993,7
#[test]
fn probe_receiving_indirect_ping_sends_indirect_ack() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

let probe_number = foca.probe().probe_number();
let data = (
@@ -3023,7 +3023,7
#[test]
fn probe_receiving_indirect_ack_sends_forwarded_ack() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

let probe_number = foca.probe().probe_number();
let data = (
@@ -3065,7 +3065,7

for member in members.iter().rev() {
let mut foca = Foca::new(*member.id(), config(), rng(), codec());
- foca.apply_many(members.iter().cloned(), InMemoryRuntime::new())?;
+ foca.apply_many(members.iter().cloned(), AccumulatingRuntime::new())?;
herd.push(foca);
}

@@ -3081,7 +3081,7
let three = *foca_three.identity();

// foca_one starts suspecting two and three
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
foca_one.apply(Member::suspect(two), &mut runtime)?;
foca_one.apply(Member::suspect(three), &mut runtime)?;
assert_eq!(2, foca_one.num_members());
@@ -3158,7 +3158,7
#[test]
fn leave_cluster_gossips_about_our_death() -> Result<()> {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

foca.apply(Member::alive(ID::new(2)), &mut runtime)?;

@@ -3192,7 +3192,7
};

let mut foca = Foca::new(ID::new(1), config, rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// And only have one
foca.apply(Member::alive(ID::new(2)), &mut runtime)?;
@@ -3214,7 +3214,7
#[test]
fn auto_rejoin_behaviour() {
let mut foca = Foca::new(ID::new(1).rejoinable(), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

let updates = [
// New known members
@@ -3257,7 +3257,7

assert_eq!(
Err(Error::DataTooBig),
- foca.handle_data(&large_data[..], InMemoryRuntime::new())
+ foca.handle_data(&large_data[..], AccumulatingRuntime::new())
);

assert_eq!(Err(Error::DataTooBig), foca.add_broadcast(&large_data[..]));
@@ -3286,7 +3286,7

assert_eq!(
Ok(()),
- foca.handle_data(valid_data.as_ref(), InMemoryRuntime::new()),
+ foca.handle_data(valid_data.as_ref(), AccumulatingRuntime::new()),
"valid_data should be valid :-)"
);

@@ -3299,7 +3299,7

assert_eq!(
Err(Error::MalformedPacket),
- foca.handle_data(bad_data.as_ref(), InMemoryRuntime::new()),
+ foca.handle_data(bad_data.as_ref(), AccumulatingRuntime::new()),
);
}

@@ -3460,7 +3460,7
);

// Teach the original foca about this new `other_foca`
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
assert_eq!(Ok(()), foca.apply(Member::alive(other_id), &mut runtime));

// Now foca will talk to other_foca. The encoded data
@@ -3572,7 +3572,7
// Here we get a foca in the middle of a probe cycle. The correct
// sequencing should submit `_send_indirect_probe`
let (mut foca, _probed, _send_indirect_probe) = craft_probing_foca(2, config());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
let old_probeno = foca.probe().probe_number();

// ... but we'll manually craft a ProbeRandomMember event instead
@@ -3618,7 +3618,7
};

let (mut foca, probed, send_indirect_probe) = craft_probing_foca(2, config);
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// `probed` did NOT reply with an Ack before the timer
assert_eq!(Ok(()), foca.handle_timer(send_indirect_probe, &mut runtime));
@@ -3649,7 +3649,7
c.notify_down_members = true;
c
};
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// We have a simple foca instance
let mut foca = Foca::new(ID::new(1), config, rng(), codec());
@@ -3699,7 +3699,7
config_setter(&mut config, params);

let mut foca = Foca::new(ID::new(1), config, rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// When it becomes active (i.e.: has at least one active member)
assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(2)), &mut runtime));
@@ -3808,7 +3808,7
// If we have an instance running with the renewed
// id as its identity
let mut foca = Foca::new(renewed, config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// Learning anything about its previous identity
assert_eq!(
@@ -3862,7 +3862,7
// announces to another and then verify that we can handle
// the reply with no errors
let mut foca_one = Foca::new(ID::new(1), config.clone(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// let's assume that foca_one knows about another member, ID=3'
// so that the feed reply contains at least one member
@@ -3899,7 +3899,7
fn feed_fits_as_many_as_it_can() {
// We prepare a foca cluster with a bunch of live members
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
let cluster = (2u8..=u8::MAX)
.map(|id| Member::alive(ID::new(id)))
.collect::<Vec<_>>();
@@ -3942,7 +3942,7
c
};
let mut foca = Foca::new(id_one, config, rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// that thinks ID=2 is down;
assert_eq!(Ok(()), foca.apply(Member::down(ID::new(2)), &mut runtime));

@@ -3979,7 +3979,7
#[test]
fn handles_member_addr_conflict() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// Given a known member ID=2,0
let original = ID::new_with_bump(2, 0);
@@ -4021,7 +4021,7
#[test]
fn does_not_mark_renewed_identity_as_down() {
let (mut foca, probed, send_indirect_probe) = craft_probing_foca(1, config());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
assert_eq!(1, foca.num_members());

// `probed` did NOT reply with an Ack before the timer
@@ -4056,7 +4056,7
#[test]
fn notifies_on_conflict_resolution() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();

// Given a known member
let member = ID::new(2).rejoinable();

Modified src/runtime.rs

@@ -1,7 +1,10
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+use alloc::collections::VecDeque;
use core::{cmp::Ordering, time::Duration};
+
+use bytes::{Bytes, BytesMut};

use crate::{Identity, Incarnation};

@@ -193,6 +196,121
///
/// Similar in spirit to [`crate::ProbeNumber`].
pub type TimerToken = u8;
+
+/// FIXME docs
+pub struct AccumulatingRuntime<T> {
+ to_send: VecDeque<(T, Bytes)>,
+ to_schedule: VecDeque<(Duration, Timer<T>)>,
+ notifications: VecDeque<Notification<T>>,
+ buf: BytesMut,
+}
+
+impl<T: Identity> Runtime<T> for AccumulatingRuntime<T> {
+ // Notice that we'll interact to these via pop(), so we're taking
+ // them in reverse order of when it happened.
+ // That's perfectly fine, the order of items from a single interaction
+ // is irrelevant. A "nicer" implementation could use VecDeque or
+ // react directly here instead of accumulating.
+
+ fn notify(&mut self, notification: Notification<T>) {
+ self.notifications.push_back(notification);
+ }
+
+ fn send_to(&mut self, to: T, data: &[u8]) {
+ self.buf.extend_from_slice(data);
+ let packet = self.buf.split().freeze();
+ self.to_send.push_back((to, packet));
+ }
+
+ fn submit_after(&mut self, event: Timer<T>, after: Duration) {
+ // We could spawn+sleep here
+ self.to_schedule.push_back((after, event));
+ }
+}
+
+impl<T> AccumulatingRuntime<T> {
+ /// FIXME docs
+ #[allow(clippy::new_without_default)] // wtf @ this lint
+ pub fn new() -> Self {
+ Self {
+ to_send: Default::default(),
+ to_schedule: Default::default(),
+ notifications: Default::default(),
+ buf: Default::default(),
+ }
+ }
+
+ /// FIXME docs
+ pub fn to_send(&mut self) -> Option<(T, Bytes)> {
+ self.to_send.pop_front()
+ }
+
+ /// FIXME docs
+ pub fn to_schedule(&mut self) -> Option<(Duration, Timer<T>)> {
+ self.to_schedule.pop_front()
+ }
+
+ /// FIXME docs
+ pub fn to_notify(&mut self) -> Option<Notification<T>> {
+ self.notifications.pop_front()
+ }
+
+ /// FIXME docs
+ pub fn backlog(&self) -> usize {
+ self.to_send.len() + self.to_schedule.len() + self.notifications.len()
+ }
+}
+
+#[cfg(test)]
+impl<T: PartialEq> AccumulatingRuntime<T> {
+ pub(crate) fn clear(&mut self) {
+ self.notifications.clear();
+ self.to_send.clear();
+ self.to_schedule.clear();
+ }
+
+ pub(crate) fn is_empty(&self) -> bool {
+ self.notifications.is_empty() && self.to_send.is_empty() && self.to_schedule.is_empty()
+ }
+
+ pub(crate) fn take_all_data(&mut self) -> VecDeque<(T, Bytes)> {
+ core::mem::take(&mut self.to_send)
+ }
+
+ pub(crate) fn take_data(&mut self, dst: T) -> Option<Bytes> {
+ let position = self.to_send.iter().position(|(to, _data)| to == &dst)?;
+
+ self.to_send.remove(position).map(|(_, data)| data)
+ }
+
+ pub(crate) fn take_notification(&mut self, wanted: Notification<T>) -> Option<Notification<T>> {
+ let position = self
+ .notifications
+ .iter()
+ .position(|notification| notification == &wanted)?;
+
+ self.notifications.remove(position)
+ }
+
+ pub(crate) fn take_scheduling(&mut self, timer: Timer<T>) -> Option<Duration> {
+ let position = self
+ .to_schedule
+ .iter()
+ .position(|(_when, event)| event == &timer)?;
+
+ self.to_schedule.remove(position).map(|(when, _)| when)
+ }
+
+ pub(crate) fn find_scheduling<F>(&self, predicate: F) -> Option<&Timer<T>>
+ where
+ F: Fn(&Timer<T>) -> bool,
+ {
+ self.to_schedule
+ .iter()
+ .find(|(_, timer)| predicate(timer))
+ .map(|(_, timer)| timer)
+ }
+}

#[cfg(test)]
mod tests {

Modified src/testing.rs

@@ -1,12 +1,11
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use alloc::vec::Vec;
-use core::time::Duration;

-use bytes::{Buf, BufMut, Bytes};
+use bytes::{Buf, BufMut};

-use crate::{Codec, Header, Identity, Member, Message, Notification, Runtime, State, Timer};
+use crate::{Codec, Header, Identity, Member, Message, State};

#[derive(Debug, Clone, Copy, PartialOrd, Ord)]
pub(crate) struct ID {
@@ -354,91 +353,6

fn decode_member(&mut self, mut buf: impl Buf) -> Result<Member<ID>, Self::Error> {
BadCodec::decode_member(self, &mut buf)
- }
-}
-
-#[derive(Debug, Clone)]
-pub(crate) struct InMemoryRuntime {
- notifications: Vec<Notification<ID>>,
- to_send: Vec<(ID, Bytes)>,
- to_schedule: Vec<(Timer<ID>, Duration)>,
-}
-
-impl InMemoryRuntime {
- pub(crate) fn new() -> Self {
- Self {
- notifications: Vec::new(),
- to_send: Vec::new(),
- to_schedule: Vec::new(),
- }
- }
-
- pub(crate) fn clear(&mut self) {
- self.notifications.clear();
- self.to_send.clear();
- self.to_schedule.clear();
- }
-
- pub(crate) fn is_empty(&self) -> bool {
- self.notifications.is_empty() && self.to_send.is_empty() && self.to_schedule.is_empty()
- }
-
- pub(crate) fn take_all_data(&mut self) -> Vec<(ID, Bytes)> {
- core::mem::take(&mut self.to_send)
- }
-
- pub(crate) fn take_data(&mut self, dst: ID) -> Option<Bytes> {
- let position = self.to_send.iter().position(|(to, _data)| to == &dst)?;
-
- let taken = self.to_send.swap_remove(position);
- Some(taken.1)
- }
-
- pub(crate) fn take_notification(
- &mut self,
- wanted: Notification<ID>,
- ) -> Option<Notification<ID>> {
- let position = self
- .notifications
- .iter()
- .position(|notification| notification == &wanted)?;
-
- let taken = self.notifications.swap_remove(position);
- Some(taken)
- }
-
- pub(crate) fn take_scheduling(&mut self, timer: Timer<ID>) -> Option<Duration> {
- let position = self
- .to_schedule
- .iter()
- .position(|(event, _when)| event == &timer)?;
-
- let taken = self.to_schedule.swap_remove(position);
- Some(taken.1)
- }
-
- pub(crate) fn find_scheduling<F>(&self, predicate: F) -> Option<&Timer<ID>>
- where
- F: Fn(&Timer<ID>) -> bool,
- {
- self.to_schedule
- .iter()
- .find(|(timer, _)| predicate(timer))
- .map(|(timer, _)| timer)
- }
-}
-
-impl Runtime<ID> for InMemoryRuntime {
- fn notify(&mut self, notification: Notification<ID>) {
- self.notifications.push(notification);
- }
-
- fn send_to(&mut self, to: ID, data: &[u8]) {
- self.to_send.push((to, Bytes::copy_from_slice(data)));
- }
-
- fn submit_after(&mut self, event: Timer<ID>, after: Duration) {
- self.to_schedule.push((event, after));
}
}