Notification enum now holds references
I want to extend notifications without worrying too much about causing allocs for (potentially) unused data Not particularly nice, but I think this is still a better dev experience than slapping a bunch of "on_$something" methods in the Runtime trait with a default no-op impl
- Id
- d776c5b0f27010134104057cd963b28765e4d015
- Author
- Caio
- Commit time
- 2024-09-06T10:54:34+02:00
Modified CHANGELOG.md
# Changelog
## UNRELEASED
-- **BREAKING**: `foca::Error` now relies on `core::error::Error`
+- **BREAKING**: `Error` now relies on `core::error::Error`
instead of `anyhow::Error` to wrap Codec and Broadcast errors. The
dependency has been removed
- **BREAKING**: MSRV is now 1.81.0
+- **BREAKING**: `Notification` now contains references to identities
+ instead of owned values so that introducing new discriminants is
+ less worrisome for complex types
+- Notifications can be converted to the new `foca::OwnedNotification`,
+ for convenience
## v0.17.2 - 2024-06-04
Modified examples/foca_insecure_udp_agent.rs
time::{sleep_until, Instant},
};
-use foca::{BincodeCodec, Config, Foca, Notification, Timer};
+use foca::{BincodeCodec, Config, Foca, OwnedNotification, Timer};
#[derive(Debug)]
struct CliParams {
let mut active_list_has_changed = false;
while let Some(notification) = runtime.to_notify() {
match notification {
- Notification::MemberUp(_) | Notification::MemberDown(_) => {
+ OwnedNotification::MemberUp(_) | OwnedNotification::MemberDown(_) => {
active_list_has_changed = true;
last_change_at = secs_since_epoch();
}
- Notification::Idle => {
+ OwnedNotification::Idle => {
tracing::info!("cluster empty");
}
- Notification::Rename(old, new) => {
+ OwnedNotification::Rename(old, new) => {
tracing::info!("member {old:?} is now known as {new:?}");
}
Modified src/lib.rs
identity::Identity,
member::{Incarnation, Member, State},
payload::{Header, Message, ProbeNumber},
- runtime::{AccumulatingRuntime, Notification, Runtime, Timer, TimerToken},
+ runtime::{AccumulatingRuntime, Notification, OwnedNotification, Runtime, Timer, TimerToken},
};
#[cfg(feature = "postcard-codec")]
member_id = tracing::field::debug(&id),
"Renamed"
);
- runtime.notify(Notification::Rename(old, id.clone()));
+ runtime.notify(Notification::Rename(&old, &id));
}
if summary.changed_active_set {
if summary.is_active_now {
#[cfg(feature = "tracing")]
tracing::debug!(member_id = tracing::field::debug(&id), "Member up");
- runtime.notify(Notification::MemberUp(id));
+ runtime.notify(Notification::MemberUp(&id));
} else {
#[cfg(feature = "tracing")]
tracing::debug!(member_id = tracing::field::debug(&id), "Member down");
- runtime.notify(Notification::MemberDown(id));
+ runtime.notify(Notification::MemberDown(&id));
}
}
} else {
self.change_identity(new_identity.clone(), &mut runtime)?;
- runtime.notify(Notification::Rejoin(new_identity));
+ runtime.notify(Notification::Rejoin(&new_identity));
Ok(true)
}
struct NoopRuntime;
impl Runtime<ID> for NoopRuntime {
- fn notify(&mut self, _notification: Notification<ID>) {}
+ fn notify(&mut self, _notification: Notification<'_, ID>) {}
fn send_to(&mut self, _to: ID, _data: &[u8]) {}
fn submit_after(&mut self, _event: Timer<ID>, _after: Duration) {}
}
macro_rules! expect_notification {
($runtime: expr, $notification: expr) => {
$runtime
- .take_notification($notification)
+ .take_notification($notification.to_owned())
.unwrap_or_else(|| panic!("Notification {:?} not found", $notification));
};
}
macro_rules! reject_notification {
($runtime: expr, $notification: expr) => {
assert!(
- $runtime.take_notification($notification).is_none(),
+ $runtime
+ .take_notification($notification.to_owned())
+ .is_none(),
"Unwanted notification {:?} found",
$notification
);
// So we should have gotten a notification about going online
expect_notification!(runtime, Notification::<ID>::Active);
- expect_notification!(runtime, Notification::MemberUp(ID::new(1)));
+ expect_notification!(runtime, Notification::MemberUp(&ID::new(1)));
// And a event to trigger a probe should've been
// scheduled
assert_eq!(Ok(()), foca_one.handle_data(&encode(data), &mut runtime));
expect_notification!(runtime, Notification::<ID>::Active);
- expect_notification!(runtime, Notification::MemberUp(ID::new(2)));
+ expect_notification!(runtime, Notification::MemberUp(&ID::new(2)));
assert_eq!(1, foca_one.num_members());
}
expect_notification!(runtime, Notification::<ID>::Active);
// We didn't know about any mentioned in the packet
- expect_notification!(runtime, Notification::MemberUp(ID::new(2)));
- expect_notification!(runtime, Notification::MemberUp(ID::new(3)));
- expect_notification!(runtime, Notification::MemberUp(ID::new(4)));
+ expect_notification!(runtime, Notification::MemberUp(&ID::new(2)));
+ expect_notification!(runtime, Notification::MemberUp(&ID::new(3)));
+ expect_notification!(runtime, Notification::MemberUp(&ID::new(4)));
// But an update about a Down member that we didn't know
// about is cluster metadata only and shouldn't trigger
// a notification
- reject_notification!(runtime, Notification::MemberDown(ID::new(5)));
+ reject_notification!(runtime, Notification::MemberDown(&ID::new(5)));
// It should, however, trigger a scheduling for forgetting
// the member, so that they may rejoin the cluster
expect_scheduling!(
// Brand new member. The first in our set, so we should
// also be notified about going active
foca.apply(Member::alive(ID::new(2)), &mut runtime)?;
- expect_notification!(runtime, Notification::MemberUp(ID::new(2)));
+ expect_notification!(runtime, Notification::MemberUp(&ID::new(2)));
expect_notification!(runtime, Notification::<ID>::Active);
// Updated/stale knowledge about an active member shouldn't
foca.apply(Member::alive(ID::new(2)), &mut runtime)?;
foca.apply(Member::suspect(ID::new(2)), &mut runtime)?;
foca.apply(Member::new(ID::new(2), 10, State::Alive), &mut runtime)?;
- reject_notification!(runtime, Notification::MemberUp(ID::new(2)));
- reject_notification!(runtime, Notification::MemberDown(ID::new(2)));
+ reject_notification!(runtime, Notification::MemberUp(&ID::new(2)));
+ reject_notification!(runtime, Notification::MemberDown(&ID::new(2)));
// Another new member
runtime.clear();
foca.apply(Member::suspect(ID::new(3)), &mut runtime)?;
- expect_notification!(runtime, Notification::MemberUp(ID::new(3)));
+ expect_notification!(runtime, Notification::MemberUp(&ID::new(3)));
reject_notification!(runtime, Notification::<ID>::Active);
// Existing member going down
runtime.clear();
foca.apply(Member::down(ID::new(3)), &mut runtime)?;
- expect_notification!(runtime, Notification::MemberDown(ID::new(3)));
+ expect_notification!(runtime, Notification::MemberDown(&ID::new(3)));
// A stale update should trigger no notification
runtime.clear();
foca.apply(Member::down(ID::new(3)), &mut runtime)?;
- reject_notification!(runtime, Notification::MemberDown(ID::new(3)));
+ reject_notification!(runtime, Notification::MemberDown(&ID::new(3)));
// A new member, but already down, so no notification
runtime.clear();
foca.apply(Member::down(ID::new(4)), &mut runtime)?;
- reject_notification!(runtime, Notification::MemberDown(ID::new(4)));
+ reject_notification!(runtime, Notification::MemberDown(&ID::new(4)));
// Last active member going down, we're going idle
runtime.clear();
assert_eq!(1, foca.num_members());
foca.apply(Member::down(ID::new(2)), &mut runtime)?;
- expect_notification!(runtime, Notification::MemberDown(ID::new(2)));
+ expect_notification!(runtime, Notification::MemberDown(&ID::new(2)));
expect_notification!(runtime, Notification::<ID>::Idle);
// New active member, going back to active
runtime.clear();
foca.apply(Member::alive(ID::new(5)), &mut runtime)?;
- expect_notification!(runtime, Notification::MemberUp(ID::new(5)));
+ expect_notification!(runtime, Notification::MemberUp(&ID::new(5)));
expect_notification!(runtime, Notification::<ID>::Active);
// Now someone declared us (ID=1) down, we should
expect_notification!(runtime, Notification::<ID>::Defunct);
// But since we're not part of the member list, there shouldn't
// be a notification about our id going down
- reject_notification!(runtime, Notification::MemberDown(ID::new(1)));
+ reject_notification!(runtime, Notification::MemberDown(&ID::new(1)));
// While defunct, we can still maintain members,
runtime.clear();
foca.apply(Member::down(ID::new(5)), &mut runtime)?;
- expect_notification!(runtime, Notification::MemberDown(ID::new(5)));
+ expect_notification!(runtime, Notification::MemberDown(&ID::new(5)));
foca.apply(Member::alive(ID::new(6)), &mut runtime)?;
- expect_notification!(runtime, Notification::MemberUp(ID::new(6)));
+ expect_notification!(runtime, Notification::MemberUp(&ID::new(6)));
// But until manual intervention happens, we are not active
reject_notification!(runtime, Notification::<ID>::Active);
);
// foca_two hasn't refuted the suspicion, so `foca_one` should
// have marked it as down
- expect_notification!(runtime, Notification::MemberDown(two));
+ expect_notification!(runtime, Notification::MemberDown(&two));
assert_eq!(1, foca_one.num_members());
assert!(
foca_one.iter_members().all(|m| m.id() != &two),
// Change our identity
let expected_new_id = ID::new_with_bump(1, 1);
assert_eq!(&expected_new_id, foca.identity());
- expect_notification!(runtime, Notification::Rejoin(expected_new_id));
+ expect_notification!(runtime, Notification::Rejoin(&expected_new_id));
reject_notification!(runtime, Notification::<ID>::Defunct);
// And disseminate our new identity to K members
assert_eq!(Ok(()), foca.apply(Member::alive(renewed), &mut runtime));
assert_eq!(1, foca.num_members());
// Should notify the runtime about the change
- expect_notification!(runtime, Notification::Rename(member, renewed));
+ expect_notification!(runtime, Notification::Rename(&member, &renewed));
// But no MemberUp notification should be fired, since
// previous addr was already active
- reject_notification!(runtime, Notification::MemberUp(member));
- reject_notification!(runtime, Notification::MemberUp(renewed));
+ reject_notification!(runtime, Notification::MemberUp(&member));
+ reject_notification!(runtime, Notification::MemberUp(&renewed));
runtime.clear();
assert_eq!(Ok(()), foca.apply(Member::down(inactive), &mut runtime));
assert_eq!(0, foca.num_members());
// We get notified of the rename
- expect_notification!(runtime, Notification::Rename(renewed, inactive));
+ expect_notification!(runtime, Notification::Rename(&renewed, &inactive));
// AND about the member going down with its new identity
- expect_notification!(runtime, Notification::MemberDown(inactive));
+ expect_notification!(runtime, Notification::MemberDown(&inactive));
// but nothing about the (now overriden, forgotten) previous one
- reject_notification!(runtime, Notification::MemberDown(renewed));
+ reject_notification!(runtime, Notification::MemberDown(&renewed));
runtime.clear();
assert_eq!(Ok(()), foca.apply(Member::suspect(active), &mut runtime));
assert_eq!(1, foca.num_members());
// Should notify about the rename
- expect_notification!(runtime, Notification::Rename(inactive, active));
+ expect_notification!(runtime, Notification::Rename(&inactive, &active));
// And about the member being active
- expect_notification!(runtime, Notification::MemberUp(active));
+ expect_notification!(runtime, Notification::MemberUp(&active));
runtime.clear();
// And if it learns about the previous ids again, regardless
Modified src/runtime.rs
/// without having direct access to the running Foca instance.
///
/// Implementations may completely disregard this if desired.
- fn notify(&mut self, notification: Notification<T>);
+ fn notify(&mut self, notification: Notification<'_, T>);
/// This is how Foca connects to an actual transport.
///
T: Identity,
R: Runtime<T>,
{
- fn notify(&mut self, notification: Notification<T>) {
+ fn notify(&mut self, notification: Notification<'_, T>) {
R::notify(self, notification);
}
/// A Notification contains information about high-level relevant
/// state changes in the cluster or Foca itself.
-#[derive(Debug, Clone, PartialEq, Eq)]
-#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
-pub enum Notification<T> {
+#[derive(Debug, PartialEq, Eq)]
+pub enum Notification<'a, T> {
/// Foca discovered a new active member with identity T.
- MemberUp(T),
+ MemberUp(&'a T),
/// A previously active member has been declared down by the cluster.
///
/// If Foca detects a down member but didn't know about its activity
/// before, this notification will not be emitted.
///
/// Can only happen if `MemberUp(T)` happened before.
- MemberDown(T),
+ MemberDown(&'a T),
/// Foca has learned that there's a more recent identity with
/// the same address and chose to use it instead of the previous
///
/// However, if there's no liveness change (both are active
/// or both are down), you'll only get the `Rename` notification
- Rename(T, T),
+ Rename(&'a T, &'a T),
/// Foca's current identity is known by at least one active member
/// of the cluster.
///
/// This happens instead of `Defunct` when identities opt-in on
/// `Identity::renew()` functionality.
+ Rejoin(&'a T),
+}
+
+impl<'a, T> Notification<'a, T>
+where
+ T: Clone,
+{
+ /// Converts self into a [`OwnedNotification`]
+ pub fn to_owned(self) -> OwnedNotification<T> {
+ match self {
+ Notification::MemberUp(m) => OwnedNotification::MemberUp(m.clone()),
+ Notification::MemberDown(m) => OwnedNotification::MemberDown(m.clone()),
+ Notification::Rename(before, after) => {
+ OwnedNotification::Rename(before.clone(), after.clone())
+ }
+ Notification::Active => OwnedNotification::Active,
+ Notification::Idle => OwnedNotification::Idle,
+ Notification::Defunct => OwnedNotification::Defunct,
+ Notification::Rejoin(id) => OwnedNotification::Rejoin(id.clone()),
+ }
+ }
+}
+
+/// An owned `Notification`, for convenience.
+///
+/// See [`Notification`] for details
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum OwnedNotification<T> {
+ /// See [`Notification::MemberUp`]
+ MemberUp(T),
+ /// See [`Notification::MemberDown`]
+ MemberDown(T),
+ /// See [`Notification::Rename`]
+ Rename(T, T),
+ /// See [`Notification::Active`]
+ Active,
+ /// See [`Notification::Idle`]
+ Idle,
+ /// See [`Notification::Defunct`]
+ Defunct,
+ /// See [`Notification::Rejoin`]
Rejoin(T),
}
pub struct AccumulatingRuntime<T> {
to_send: VecDeque<(T, Bytes)>,
to_schedule: VecDeque<(Duration, Timer<T>)>,
- notifications: VecDeque<Notification<T>>,
+ notifications: VecDeque<OwnedNotification<T>>,
buf: BytesMut,
}
}
impl<T: Identity> Runtime<T> for AccumulatingRuntime<T> {
- fn notify(&mut self, notification: Notification<T>) {
- self.notifications.push_back(notification);
+ fn notify(&mut self, notification: Notification<'_, T>) {
+ self.notifications.push_back(notification.to_owned());
}
fn send_to(&mut self, to: T, data: &[u8]) {
}
fn submit_after(&mut self, event: Timer<T>, after: Duration) {
- // We could spawn+sleep here
self.to_schedule.push_back((after, event));
}
}
///
/// Users are expected to drain it until it yields `None`
/// after every interaction with `crate::Foca`
- pub fn to_notify(&mut self) -> Option<Notification<T>> {
+ pub fn to_notify(&mut self) -> Option<OwnedNotification<T>> {
self.notifications.pop_front()
}
self.to_send.remove(position).map(|(_, data)| data)
}
- pub(crate) fn take_notification(&mut self, wanted: Notification<T>) -> Option<Notification<T>> {
+ pub(crate) fn take_notification(
+ &mut self,
+ wanted: OwnedNotification<T>,
+ ) -> Option<OwnedNotification<T>> {
let position = self
.notifications
.iter()