Introduces Config::periodic_announce_to_down_members
New feature to (try to) recorver from network partitions
- Id
- 25defef9e6cbc33d69c8d832d377fcd789a57832
- Author
- Caio
- Commit time
- 2024-03-19T10:56:02+01:00
Modified CHANGELOG.md
handler implementations now only need to emit an identifier (Key)
for each value being broadcast instead of managing the allocation
See the `BroadcastHandler` documentation and examples for details
+- `Config::periodic_announce_to_down_members`: Foca periodically
+ tries to join with members it considers down, as an attempt to
+ recover from a network partition. This setting is **enabled** by
+ default for `Config::new_wan` and `Config::new_lan`
- There's now `Notification::Rename` that signals whenever an
identity with a conflicting `Addr` in the cluster gets replaced
by a newer one
- There's no need to manage the list of members externally anymore:
- foca does it all for you and `Foca::iter_members` only lists
+ Foca does it all for you and `Foca::iter_members` only lists
the unique (by `Identity::Addr`), freshest identities
- `examples/foca_insecure_udp_agent.rs` now comes with a fully working
custom broadcast example
Modified src/config.rs
/// disabled if you're aiming at pure SWIM behavior.
pub periodic_announce: Option<PeriodicParams>,
+ /// How often should foca send an announce message to members it currently
+ /// considers [`crate::State::Down`]
+ ///
+ /// This setting instructs foca to try and talk to members that are down
+ /// so that it can (eventually) recover from network partitions without
+ /// additional hand-holding.
+ ///
+ /// It's particularly useful when used in tandem with identities that
+ /// can auto-rejoin (`crate::Identity::renew`) and with
+ /// `Self::notify_down_members` enabled.
+ ///
+ /// This feature is an extension to the SWIM protocol and should be left
+ /// disabled if you're aiming at pure SWIM behavior.
+ pub periodic_announce_to_down_members: Option<PeriodicParams>,
+
/// How often should foca send cluster updates to peers
///
/// By default, SWIM disseminates cluster updates during the direct and
notify_down_members: false,
periodic_announce: None,
+ periodic_announce_to_down_members: None,
periodic_gossip: None,
}
}
frequency: Duration::from_secs(30),
num_members: NonZeroUsize::new(1).unwrap(),
}),
+
+ periodic_announce_to_down_members: Some(PeriodicParams {
+ frequency: Duration::from_secs(65),
+ num_members: NonZeroUsize::new(2).unwrap(),
+ }),
+
periodic_gossip: Some(PeriodicParams {
frequency: Duration::from_millis(200),
num_members: NonZeroUsize::new(3).unwrap(),
frequency: Duration::from_secs(60),
num_members: NonZeroUsize::new(2).unwrap(),
}),
+
+ periodic_announce_to_down_members: Some(PeriodicParams {
+ frequency: Duration::from_secs(125),
+ num_members: NonZeroUsize::new(3).unwrap(),
+ }),
+
periodic_gossip: Some(PeriodicParams {
frequency: Duration::from_millis(500),
num_members: NonZeroUsize::new(4).unwrap(),
Modified src/lib.rs
)
}
+ fn announce_to_down(&mut self, num_members: usize, mut runtime: impl Runtime<T>) -> Result<()> {
+ self.member_buf.clear();
+ self.members
+ .choose_down_members(num_members, &mut self.member_buf, &mut self.rng);
+
+ while let Some(chosen) = self.member_buf.pop() {
+ self.send_message(chosen.into_identity(), Message::Announce, &mut runtime)?;
+ }
+
+ Ok(())
+ }
+
// Pick `num_members` random active members and send `msg` to them
fn choose_and_send(
&mut self,
}
Ok(())
}
+ Timer::PeriodicAnnounceDown(token) => {
+ if token == self.timer_token && self.connection_state == ConnectionState::Connected
+ {
+ if let Some(ref params) = self.config.periodic_announce_to_down_members {
+ runtime.submit_after(
+ Timer::PeriodicAnnounceDown(self.timer_token),
+ params.frequency,
+ );
+ self.announce_to_down(params.num_members.get(), runtime)?;
+ }
+ }
+ Ok(())
+ }
}
}
/// normal operations, changing the configuration parameters is a
/// nicer alternative to recreating the Foca instance.
///
- /// Presently, attempting to change [`Config::probe_period`] or
- /// [`Config::probe_rtt`] results in [`Error::InvalidConfig`]; For
- /// such cases it's recommended to recreate your Foca instance. When
- /// an error occurs, every configuration parameter remains
+ /// Changing [`Config::probe_period`], [`Config::probe_rtt`] or
+ /// trying to _enable_ any `periodic_` setting results in
+ /// [`Error::InvalidConfig`]; For such cases it's recommended to
+ /// recreate your Foca instance.
+ ///
+ /// When an error occurs, every configuration parameter remains
/// unchanged.
pub fn set_config(&mut self, config: Config) -> Result<()> {
if self.config.probe_period != config.probe_period
|| self.config.probe_rtt != config.probe_rtt
|| (self.config.periodic_announce.is_none() && config.periodic_announce.is_some())
+ || (self.config.periodic_announce_to_down_members.is_none()
+ && config.periodic_announce_to_down_members.is_some())
|| (self.config.periodic_gossip.is_none() && config.periodic_gossip.is_some())
{
Err(Error::InvalidConfig)
if let Some(ref params) = self.config.periodic_announce {
runtime.submit_after(Timer::PeriodicAnnounce(self.timer_token), params.frequency);
+ }
+
+ if let Some(ref params) = self.config.periodic_announce_to_down_members {
+ runtime.submit_after(
+ Timer::PeriodicAnnounceDown(self.timer_token),
+ params.frequency,
+ );
}
if let Some(ref params) = self.config.periodic_gossip {
}
// There are multiple "do this thing periodically" settings. This
- // helps test those. Takes:
- // - something that knows which configuration to set
- // - something that knows which event should be sent
- // - the message that should be sent
- fn check_periodic_behaviour<F, G>(config_setter: F, mut event_maker: G, message: Message<ID>)
+ // helps test those.
+ // It creates a Foca instance (ID=1) with 2 active members (IDs 2 and 3)
+ // and 2 down members (IDs 4 and 5), then allows the caller to
+ // verify the runtime afterwards
+ fn check_periodic_behaviour<F, G, V>(config_setter: F, mut event_maker: G, validator: V)
where
+ // something that knows which configuration to set
F: Fn(&mut Config, config::PeriodicParams),
+ // something that knows which event should be sent
G: FnMut(TimerToken) -> Timer<ID>,
+ // something to inspect the runtime for expected events
+ V: Fn(AccumulatingRuntime<ID>),
{
let frequency = Duration::from_millis(500);
let num_members = NonZeroUsize::new(2).unwrap();
// When it becomes active (i.e.: has at least one active member)
assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(2)), &mut runtime));
- assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(3)), &mut runtime));
+ assert_eq!(
+ Ok(()),
+ foca.apply(Member::suspect(ID::new(3)), &mut runtime)
+ );
+ assert_eq!(Ok(()), foca.apply(Member::down(ID::new(4)), &mut runtime));
+ assert_eq!(Ok(()), foca.apply(Member::down(ID::new(5)), &mut runtime));
+
+ assert_eq!(2, foca.num_members());
// Should schedule the given event
expect_scheduling!(runtime, event_maker(foca.timer_token()), frequency);
// It should've scheduled the event again
expect_scheduling!(runtime, event_maker(foca.timer_token()), frequency);
+ validator(runtime);
// And sent the message to `num_members` random members
// (since num_members=2 and this instance only knows about two, we know
// which should've been picked)
- expect_message!(runtime, ID::new(2), message);
- expect_message!(runtime, ID::new(3), message);
}
#[test]
c.periodic_gossip = Some(p);
},
|t: TimerToken| -> Timer<ID> { Timer::PeriodicGossip(t) },
- Message::Gossip,
+ |mut runtime| {
+ expect_message!(runtime, ID::new(2), Message::<ID>::Gossip);
+ expect_message!(runtime, ID::new(3), Message::<ID>::Gossip);
+ },
);
}
c.periodic_announce = Some(p);
},
|t: TimerToken| -> Timer<ID> { Timer::PeriodicAnnounce(t) },
- Message::Announce,
+ |mut runtime| {
+ expect_message!(runtime, ID::new(2), Message::<ID>::Announce);
+ expect_message!(runtime, ID::new(3), Message::<ID>::Announce);
+ },
+ );
+ }
+
+ #[test]
+ fn periodic_announce_to_down_members_behaviour() {
+ check_periodic_behaviour(
+ |c: &mut Config, p: config::PeriodicParams| {
+ c.periodic_announce_to_down_members = Some(p);
+ },
+ |t: TimerToken| -> Timer<ID> { Timer::PeriodicAnnounceDown(t) },
+ |mut runtime| {
+ expect_message!(runtime, ID::new(4), Message::<ID>::Announce);
+ expect_message!(runtime, ID::new(5), Message::<ID>::Announce);
+ },
);
}
let mut foca = Foca::new(ID::new(1), c.clone(), rng(), codec());
c.periodic_announce = Some(config::PeriodicParams {
+ frequency: Duration::from_secs(5),
+ num_members: NonZeroUsize::new(1).unwrap(),
+ });
+
+ // Must not be able to enable it during runtime
+ assert_eq!(Err(Error::InvalidConfig), foca.set_config(c.clone()));
+
+ // However, a foca that starts with periodic announce enabled
+ let mut foca = Foca::new(ID::new(1), c, rng(), codec());
+
+ // Is able to turn it off
+ assert_eq!(Ok(()), foca.set_config(config()));
+ }
+
+ #[test]
+ fn periodic_announce_to_down_members_cannot_be_enabled_at_runtime() {
+ let mut c = config();
+ assert!(c.periodic_announce_to_down_members.is_none());
+
+ // A foca instance that's running without periodic announce
+ let mut foca = Foca::new(ID::new(1), c.clone(), rng(), codec());
+
+ c.periodic_announce_to_down_members = Some(config::PeriodicParams {
frequency: Duration::from_secs(5),
num_members: NonZeroUsize::new(1).unwrap(),
});
Modified src/member.rs
}
}
- /// XXX This used to be a `next_members()` which would make use of the
- /// already shuffled state and then simply advance the cursor
- /// to trigger the next shuffle-after-round-robin that `next()`
- /// does. However I'm not sure it was a good idea: the point
- /// of what `next()` does is giving some sort of determinism giving
- /// a high chance that every member will be *pinged* periodically
- /// and using the same logic for other "pick random member"
- /// mechanisms might break the math.
- pub(crate) fn choose_active_members<F>(
+ fn choose_members<F>(
&self,
wanted: usize,
output: &mut Vec<Member<T>>,
mut rng: impl Rng,
picker: F,
) where
- F: Fn(&T) -> bool,
+ F: Fn(&Member<T>) -> bool,
{
// Basic reservoir sampling
let mut num_chosen = 0;
let mut num_seen = 0;
- for member in self.iter_active() {
- if !picker(member.id()) {
+ for member in &self.inner {
+ if !picker(member) {
continue;
}
}
}
}
+ }
+
+ pub(crate) fn choose_down_members(
+ &self,
+ wanted: usize,
+ output: &mut Vec<Member<T>>,
+ rng: impl Rng,
+ ) {
+ self.choose_members(wanted, output, rng, |member| !member.is_active());
+ }
+
+ pub(crate) fn choose_active_members<F>(
+ &self,
+ wanted: usize,
+ output: &mut Vec<Member<T>>,
+ rng: impl Rng,
+ picker: F,
+ ) where
+ F: Fn(&T) -> bool,
+ {
+ self.choose_members(wanted, output, rng, |member| {
+ member.is_active() && picker(member.id())
+ });
}
pub(crate) fn remove_if_down(&mut self, id: &T) -> Option<Member<T>> {
member_id.0.parse::<usize>().expect("number") > 4
});
assert_eq!(vec![Member::suspect(Id("5"))], out);
+
+ out.clear();
+ members.choose_down_members(3, &mut out, &mut rng);
+ assert_eq!(2, out.len());
+ assert!(out.iter().any(|m| m.id == Id("7")));
+ assert!(out.iter().any(|m| m.id == Id("6")));
}
#[test]
Modified src/runtime.rs
/// specified by [`crate::Config::periodic_announce`]
PeriodicAnnounce(TimerToken),
+ /// Sends a [`crate::Message::Announce`] to randomly chosen members
+ /// that are condidered [`crate::State::Down`] as specified by
+ /// [`crate::Config::periodic_announce_to_down_members`]
+ PeriodicAnnounceDown(TimerToken),
+
/// Sends a [`crate::Message::Gossip`] to randomly chosen members as
/// specified by [`crate::Config::periodic_gossip`]
PeriodicGossip(TimerToken),
Timer::PeriodicAnnounce(_) => 3,
Timer::PeriodicGossip(_) => 4,
Timer::RemoveDown(_) => 5,
+ Timer::PeriodicAnnounceDown(_) => 6,
}
}
}