Add parameter to skip broadcasts on apply_many
- Id
- 1770b3bf856883160e4595a79cf87b59198079e1
- Author
- Caio
- Commit time
- 2024-10-04T10:57:53+02:00
Modified CHANGELOG.md
- **BREAKING**: `Notification` now contains references to identities
instead of owned values so that introducing new discriminants is
less worrisome for complex types
+- **BREAKING**: `Foca::apply_many` now lets you decide wether to
+ broadcast the applied updates to other cluster members or not
- Notifications can be converted to the new `foca::OwnedNotification`,
for convenience
- New cargo feature `unstable-notification` which exposes the ability
Modified src/lib.rs
/// choose to unify their cluster knowledge (say: a streaming
/// join protocol or a periodic sync) and use [`Foca::apply_many`]
/// as a way to feed Foca this new (external) knowledge.
+ ///
+ /// The `do_broadcast` parameter flags wether the updates should
+ /// shared with cluster (when relevant). In general, `true` is
+ /// the correct value. Not broadcasting is useful when you're
+ /// restoring knowledge after going offline and you don't want
+ /// to broadcast data that you know (or rather: assume) the rest
+ /// of the cluster already knows about.
pub fn apply_many(
&mut self,
updates: impl Iterator<Item = Member<T>>,
+ do_broadcast: bool,
mut runtime: impl Runtime<T>,
) -> Result<()> {
for update in updates {
"update about identity with same prefix as ours, declaring it down"
);
}
- self.apply_update(Member::down(update.into_identity()), &mut runtime)?;
+ self.apply_update(
+ Member::down(update.into_identity()),
+ do_broadcast,
+ &mut runtime,
+ )?;
} else {
- self.apply_update(update, &mut runtime)?;
+ self.apply_update(update, do_broadcast, &mut runtime)?;
}
}
member.incarnation() == incarnation
})
{
- self.handle_apply_summary(summary, as_down, &mut runtime)?;
+ self.handle_apply_summary(summary, as_down, true, &mut runtime)?;
// Member went down we might need to adjust our internal state
self.adjust_connection_state(&mut runtime);
// talk)
.apply_update(
Member::new(src.clone(), src_incarnation, State::Alive),
+ true,
&mut runtime,
)?;
// the borrow checker. And then put it back in its place, so
// that we can keep reusing its already-allocated space.
let mut updates = mem::take(&mut self.member_buf);
- self.apply_many(updates.drain(..), &mut runtime)?;
+ self.apply_many(updates.drain(..), true, &mut runtime)?;
debug_assert_eq!(
0,
self.member_buf.capacity(),
let is_active_now = summary.is_active_now;
#[cfg_attr(not(feature = "tracing"), allow(unused_variables))]
let apply_successful = summary.apply_successful;
- self.handle_apply_summary(summary, as_suspect, &mut runtime)?;
+ self.handle_apply_summary(summary, as_suspect, true, &mut runtime)?;
// Now we ensure we change the member to Down if it
// isn't already inactive
}
// shortcut for apply + handle
- fn apply_update(&mut self, update: Member<T>, runtime: impl Runtime<T>) -> Result<bool> {
+ fn apply_update(
+ &mut self,
+ update: Member<T>,
+ do_broadcast: bool,
+ runtime: impl Runtime<T>,
+ ) -> Result<bool> {
debug_assert_ne!(&self.identity, update.id());
let summary = self.members.apply(update.clone(), &mut self.rng);
_ => summary.is_active_now,
};
- self.handle_apply_summary(summary, update, runtime)?;
+ self.handle_apply_summary(summary, update, do_broadcast, runtime)?;
Ok(update_is_active)
}
&mut self,
summary: ApplySummary<T>,
update: Member<T>,
+ do_broadcast: bool,
mut runtime: impl Runtime<T>,
) -> Result<()> {
let id = update.id().clone();
);
// Cluster state changed, start broadcasting it
- let addr = Addr(id.addr());
- let data = self.serialize_member(update)?;
- self.updates
- .add_or_replace(addr, data, self.config.max_transmissions.get().into());
+ if do_broadcast {
+ let addr = Addr(id.addr());
+ let data = self.serialize_member(update)?;
+ self.updates
+ .add_or_replace(addr, data, self.config.max_transmissions.get().into());
+ }
// Down is a terminal state, so set up a handler for removing
// the member so that it may rejoin later
}
pub(crate) fn apply(&mut self, member: Member<T>, mut runtime: impl Runtime<T>) -> Result<()> {
- self.apply_many(core::iter::once(member), &mut runtime)
+ self.apply_many(core::iter::once(member), true, &mut runtime)
}
}
// Prepare a foca instance with 3 known peers
assert_eq!(
Ok(()),
- foca.apply_many(updates.iter().cloned(), &mut runtime)
+ foca.apply_many(updates.iter().cloned(), true, &mut runtime)
);
// By now we should've gotten an event to schedule probing
.map(Member::down)
.collect::<Vec<_>>();
// But somehow all members "disappear"
- assert_eq!(Ok(()), foca.apply_many(updates.into_iter(), &mut runtime));
+ assert_eq!(
+ Ok(()),
+ foca.apply_many(updates.into_iter(), true, &mut runtime)
+ );
// Making the instance go idle
expect_notification!(runtime, Notification::<ID>::Idle);
for member in members.iter().rev() {
let mut foca = Foca::new(*member.id(), config(), rng(), codec());
- foca.apply_many(members.iter().cloned(), AccumulatingRuntime::new())?;
+ foca.apply_many(members.iter().cloned(), true, AccumulatingRuntime::new())?;
herd.push(foca);
}
assert_eq!(
Ok(()),
- foca.apply_many(updates.iter().cloned(), &mut runtime)
+ foca.apply_many(updates.iter().cloned(), true, &mut runtime)
);
// Change our identity
// Learning anything about its previous identity
assert_eq!(
Ok(()),
- foca.apply_many(core::iter::once(Member::alive(id)), &mut runtime)
+ foca.apply_many(core::iter::once(Member::alive(id)), true, &mut runtime)
);
// shouldn't change the cluster state
.map(|id| Member::alive(ID::new(id)))
.collect::<Vec<_>>();
- assert_eq!(Ok(()), foca.apply_many(cluster.into_iter(), &mut runtime));
+ assert_eq!(
+ Ok(()),
+ foca.apply_many(cluster.into_iter(), true, &mut runtime)
+ );
assert_eq!(foca.num_members(), usize::from(u8::MAX - 1));
// So when we send it an announce message
Ok(()),
foca.apply_many(
[Member::alive(old), Member::alive(new)].into_iter(),
+ true,
&mut runtime
)
);
Ok(()),
foca.apply_many(
[Member::alive(ID::new(2)), Member::alive(ID::new(3)),].into_iter(),
+ true,
&mut runtime
)
);
let _header = codec.decode_header(&mut payload).expect("payload is valid");
assert!(payload.has_remaining());
+ }
+
+ #[test]
+ fn apply_with_no_broadcast_doesnt_increase_backlog() {
+ let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
+ assert_eq!(0, foca.updates_backlog());
+
+ assert_eq!(
+ Ok(()),
+ foca.apply_many(
+ [
+ Member::suspect(ID::new(1)),
+ Member::alive(ID::new(1).bump()),
+ Member::alive(ID::new(2)),
+ Member::down(ID::new(3)),
+ ]
+ .into_iter(),
+ false,
+ AccumulatingRuntime::new()
+ )
+ );
+ assert_eq!(0, foca.updates_backlog());
}
}