Properly avoid allocs when refuting suspicion
This patch fixes a case where foca would unecessarily alloc when handling a message. Debug builds would crash under this scenario but went uncaught for a long while because my watchdog simply restarted it :x The data flow is still a tree instead of a graph so things are still sane, but this is starting to look like code from a language without RAII :clown_face:
- Id
- 13bbf356e8ce86dc71a7bc9882872bf934d2f67f
- Author
- Caio
- Commit time
- 2024-10-06T12:52:58+02:00
Modified src/lib.rs
members: Members<T>,
probe: Probe<T>,
- // Used to buffer up members/updates when receiving and
- // sending data
- member_buf: Vec<Member<T>>,
+ // Used to buffer cluster updates
+ updates_buf: Vec<Member<T>>,
+ // Used when selecting (random) members
+ choice_buf: Vec<Member<T>>,
send_buf: Vec<u8>,
timer_token: TimerToken::default(),
members: Members::new(Vec::new()),
probe: Probe::new(Vec::with_capacity(max_indirect_probes)),
- member_buf: Vec::new(),
+ updates_buf: Vec::new(),
+ choice_buf: Vec::new(),
connection_state: ConnectionState::Disconnected,
updates: Broadcasts::new(),
send_buf: Vec::with_capacity(max_bytes),
}
fn announce_to_down(&mut self, num_members: usize, mut runtime: impl Runtime<T>) -> Result<()> {
- self.member_buf.clear();
+ self.updates_buf.clear();
self.members
- .choose_down_members(num_members, &mut self.member_buf, &mut self.rng);
+ .choose_down_members(num_members, &mut self.updates_buf, &mut self.rng);
- while let Some(chosen) = self.member_buf.pop() {
+ while let Some(chosen) = self.updates_buf.pop() {
self.send_message(chosen.into_identity(), Message::Announce, &mut runtime)?;
}
msg: Message<T>,
mut runtime: impl Runtime<T>,
) -> Result<()> {
- self.member_buf.clear();
+ self.choice_buf.clear();
self.members.choose_active_members(
num_members,
- &mut self.member_buf,
+ &mut self.choice_buf,
&mut self.rng,
|_| true,
);
- while let Some(chosen) = self.member_buf.pop() {
+ while let Some(chosen) = self.choice_buf.pop() {
self.send_message(chosen.into_identity(), msg.clone(), &mut runtime)?;
}
return Ok(());
}
- self.member_buf.clear();
+ self.updates_buf.clear();
self.members.choose_active_members(
self.config.num_indirect_probes.get(),
- &mut self.member_buf,
+ &mut self.updates_buf,
&mut self.rng,
|member| self.broadcast_handler.should_add_broadcast_data(member),
);
- while let Some(chosen) = self.member_buf.pop() {
+ while let Some(chosen) = self.updates_buf.pop() {
self.send_message(chosen.into_identity(), Message::Broadcast, &mut runtime)?;
// Crafting the message above left the backlog empty,
return Ok(());
}
- self.member_buf.clear();
+ self.updates_buf.clear();
self.members.choose_active_members(
self.config.num_indirect_probes.get(),
- &mut self.member_buf,
+ &mut self.updates_buf,
&mut self.rng,
|candidate| candidate != &probed_id,
);
"Member didn't respond to ping in time, starting indirect probe cycle"
);
- while let Some(chosen) = self.member_buf.pop() {
+ while let Some(chosen) = self.updates_buf.pop() {
let indirect = chosen.into_identity();
self.probe.expect_indirect_ack(indirect.clone());
// We can skip this buffering is we assume that reaching here
// means the packet is valid. But that doesn't seem like a very
// good idea...
- self.member_buf.clear();
+ self.updates_buf.clear();
if remaining >= 2 && header.message != Message::Broadcast {
let num_updates = data.get_u16();
#[cfg(feature = "tracing")]
span.record("num_updates", num_updates);
for _i in 0..num_updates {
- self.member_buf.push(
+ self.updates_buf.push(
self.codec
.decode_member(&mut data)
.map_err(|e| Error::Decode(Box::new(e)))?,
// Here we take the Vec so we can drain it without upsetting
// the borrow checker. And then put it back in its place, so
// that we can keep reusing its already-allocated space.
- let mut updates = mem::take(&mut self.member_buf);
+ let mut updates = mem::take(&mut self.updates_buf);
self.apply_many(updates.drain(..), true, &mut runtime)?;
debug_assert_eq!(
0,
- self.member_buf.capacity(),
+ self.updates_buf.capacity(),
"member_buf modified while taken"
);
- self.member_buf = updates;
+ self.updates_buf = updates;
// Right now there might still be some data left to read in the
// buffer (custom broadcasts). We'll handle those before we
let mut num_items = 0;
if only_active_members {
- self.member_buf.clear();
+ self.updates_buf.clear();
self.members.choose_active_members(
// Done in order to prevent copying and sorting a large
// set of members just to not use them at all because
// they don't fit the remaining buffer
self.estimate_feed_capacity(buf.remaining_mut()),
- &mut self.member_buf,
+ &mut self.updates_buf,
&mut self.rng,
|member| member != &dst,
);
- while let Some(chosen) = self.member_buf.pop() {
+ while let Some(chosen) = self.updates_buf.pop() {
let pos = buf.get_ref().len();
if let Err(_ignored) = self.codec.encode_member(&chosen, &mut buf) {
// encoding the member might have advanced the cursor
)
);
assert_eq!(0, foca.updates_backlog());
+ }
+
+ // Foca shares a buffer internally to minimize allocs
+ // and does some questionable things to make it easy
+ // to juggle with it (grep for mem::take)
+ //
+ // Debug builds are able to catch the case where
+ // attempting to reuse the buffer actually ended up
+ // alloc'ing a new one (the capacity() asserts)
+ //
+ // This verifies that one such case doesn't happen
+ // anymore.
+ #[test]
+ fn debug_no_panic_when_handling_data_with_self_updates() {
+ let id = ID::new(1);
+
+ let mut foca = Foca::new(id, config(), rng(), codec());
+
+ // A message from ID=2 to ID=1 with an update
+ // declaring ID=1 as suspect
+ let msg = encode((
+ Header {
+ src: ID::new(2),
+ src_incarnation: Incarnation::default(),
+ dst: id,
+ message: Message::Gossip,
+ },
+ vec![Member::suspect(id)],
+ ));
+
+ assert_eq!(Ok(()), foca.handle_data(&msg, AccumulatingRuntime::new()));
}
}