caio.co/de/foca

Don't add custom broadcasts to TurnUndead messages

Id
8f85710f1905aae33f91834943354ffd353f1fd9
Author
Caio
Commit time
2024-06-04T09:54:50+02:00

Modified CHANGELOG.md

@@ -1,5 +1,13
# Changelog

+## UNRELEASED
+
+- Bugfix: foca could incorrectly attach custom broadcasts to
+ messages supposed to be lightweight leading to strange decode
+ errors in the logs and slowing down cluster self-healing, but
+ no further impact on functionality
+ See: https://github.com/caio/foca/issues/35
+
## v0.17.1 - 2024-04-25

- Bugfix: when restarting members, there was a chance foca would

Modified src/lib.rs

@@ -1505,7 +1505,9
let add_custom_broadcast = buf.has_remaining_mut()
// Every message but Announce includes custom broadcasts by
// default
- && !matches!(header.message, Message::Announce)
+ // NEEDSWORK: this + piggyback logic should be put in the type so
+ // it doesn't go out of sync
+ && !matches!(header.message, Message::Announce|Message::TurnUndead)
// Unless the broadcast handler says no
&& self.broadcast_handler.should_add_broadcast_data(&dst);

@@ -4336,5 +4338,82

assert_eq!(Ok(()), foca.handle_data(&msg, &mut runtime));
assert_eq!(2, foca.num_members());
+ }
+
+ struct DumbHandler;
+
+ struct GrowOnly(u8);
+
+ impl Invalidates for GrowOnly {
+ fn invalidates(&self, other: &Self) -> bool {
+ self.0 > other.0
+ }
+ }
+
+ impl BroadcastHandler<ID> for DumbHandler {
+ type Key = GrowOnly;
+
+ type Error = &'static str;
+
+ fn receive_item(
+ &mut self,
+ mut data: &[u8],
+ _sender: Option<&ID>,
+ ) -> core::result::Result<Option<Self::Key>, Self::Error> {
+ if data.has_remaining() {
+ // bad: will never stop emitting
+ // good enough for testing tho
+ Ok(Some(GrowOnly(data.get_u8())))
+ } else {
+ Ok(None)
+ }
+ }
+ }
+
+ #[test]
+ fn lightweight_messages() {
+ let mut foca =
+ Foca::with_custom_broadcast(ID::new(1), config(), rng(), codec(), DumbHandler);
+
+ let mut runtime = AccumulatingRuntime::new();
+
+ // Add some members so there is a backlog of updates
+ assert_eq!(
+ Ok(()),
+ foca.apply_many(
+ [Member::alive(ID::new(2)), Member::alive(ID::new(3)),].into_iter(),
+ &mut runtime
+ )
+ );
+ assert!(foca.updates_backlog() > 0);
+
+ // And some custom broadcasts
+ assert_eq!(Ok(true), foca.add_broadcast(b"0hello"));
+ assert_eq!(Ok(true), foca.add_broadcast(b"1world"));
+ assert!(foca.custom_broadcast_backlog() > 0);
+
+ let dst = ID::new(4);
+ let mut codec = codec();
+ for msg in [Message::Announce, Message::TurnUndead] {
+ assert_eq!(Ok(()), foca.send_message(dst, msg.clone(), &mut runtime));
+ let mut payload = runtime.take_data(dst).expect("must contain message to dst");
+
+ let header = codec.decode_header(&mut payload).expect("payload is valid");
+ assert_eq!(header.message, msg);
+ assert_eq!(header.dst, dst);
+ // Lightweight messages should not include anything after
+ // the header (no updates, no custom broadcasts)
+ assert!(payload.is_empty(), "message {msg:?} contains trailing data");
+ }
+
+ // Whereas a normal message should
+ assert_eq!(
+ Ok(()),
+ foca.send_message(dst, Message::Ping(0), &mut runtime)
+ );
+ let mut payload = runtime.take_data(dst).expect("must contain message to dst");
+
+ let _header = codec.decode_header(&mut payload).expect("payload is valid");
+ assert!(payload.has_remaining());
}
}