caio.co/de/foca

Ignore messages from known old identities

In a scenario where traffic is buffered and/or replayed this could
lead to noisy cluster behaviour
Id
1326f9affa0bf7eda61429ea78379f7d1f17cd5f
Author
Caio
Commit time
2024-04-24T09:59:35+02:00

Modified CHANGELOG.md

@@ -7,6 +7,9
No harm done to the cluster state, but would lead to noise in
the form of `Error::DataFromOurselves` in the member logs
See: https://github.com/caio/foca/issues/34
+- Bugfix: messages from known stale identities were being accepted
+ instead being discarded. Unlikely to have impacted anyone
+ that isn't replaying data

## v0.17.0 - 2024-03-20

Modified src/lib.rs

@@ -1228,10 +1228,15
fn apply_update(&mut self, update: Member<T>, runtime: impl Runtime<T>) -> Result<bool> {
debug_assert_ne!(&self.identity, update.id());
let summary = self.members.apply(update.clone(), &mut self.rng);
- let active = summary.is_active_now;
+
+ let update_is_active = match summary.conflict {
+ member::ConflictResult::Lost | member::ConflictResult::FailedCondition => false,
+ _ => summary.is_active_now,
+ };
+
self.handle_apply_summary(summary, update, runtime)?;

- Ok(active)
+ Ok(update_is_active)
}

fn handle_apply_summary(
@@ -1263,7 +1268,7
}
}

- if let Some(old) = summary.replaced_id {
+ if let member::ConflictResult::Replaced(old) = summary.conflict {
#[cfg(feature = "tracing")]
tracing::debug!(
previous_id = tracing::field::debug(&old),
@@ -4284,5 +4289,52
// Foca must not request `renewed` to probe `probed`
// on its behalf
assert!(runtime.take_all_data().is_empty());
+ }
+
+ #[test]
+ fn discards_known_old_identities_data() {
+ let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
+ let mut runtime = AccumulatingRuntime::new();
+
+ // ID=2 is a member that rejoined at least once
+ let old = ID::new(2);
+ let new = old.bump();
+ assert_eq!(
+ Ok(()),
+ foca.apply_many(
+ [Member::alive(old), Member::alive(new)].into_iter(),
+ &mut runtime
+ )
+ );
+ assert_eq!(1, foca.num_members());
+
+ // If `old` sends a message about ID=3 being alive
+ let msg = encode((
+ Header {
+ src: old,
+ src_incarnation: Incarnation::default(),
+ dst: ID::new(1),
+ message: Message::Feed,
+ },
+ vec![Member::alive(ID::new(3))],
+ ));
+
+ assert_eq!(Ok(()), foca.handle_data(&msg, &mut runtime));
+ // It should be ignored
+ assert_eq!(1, foca.num_members());
+
+ // The same message sent by `new` shouldn't
+ let msg = encode((
+ Header {
+ src: new,
+ src_incarnation: Incarnation::default(),
+ dst: ID::new(1),
+ message: Message::Feed,
+ },
+ vec![Member::alive(ID::new(3))],
+ ));
+
+ assert_eq!(Ok(()), foca.handle_data(&msg, &mut runtime));
+ assert_eq!(2, foca.num_members());
}
}

Modified src/member.rs

@@ -304,25 +304,31
is_active_now: known_member.is_active(),
apply_successful: false,
changed_active_set: false,
- replaced_id: None,
+ conflict: ConflictResult::Lost,
});
}
force_apply = true;
}

if !condition(known_member) {
+ let conflict = if force_apply {
+ ConflictResult::FailedCondition
+ } else {
+ ConflictResult::NoConflict
+ };
+
return Some(ApplySummary {
is_active_now: known_member.is_active(),
apply_successful: false,
changed_active_set: false,
- replaced_id: None,
+ conflict,
});
}
let was_active = known_member.is_active();
- let mut replaced_id = None;
+ let mut conflict = ConflictResult::NoConflict;
let apply_successful = if force_apply {
core::mem::swap(&mut known_member.id, &mut update.id);
- replaced_id = Some(update.id);
+ conflict = ConflictResult::Replaced(update.id);
known_member.state = update.state;
known_member.incarnation = update.incarnation;
true
@@ -345,7 +351,7
is_active_now,
apply_successful,
changed_active_set,
- replaced_id,
+ conflict,
})
} else {
None
@@ -377,7 +383,7
apply_successful: true,
// Registering a new active member changes the active set
changed_active_set: is_active_now,
- replaced_id: None,
+ conflict: ConflictResult::NoConflict,
}
})
}
@@ -389,7 +395,15
pub(crate) is_active_now: bool,
pub(crate) apply_successful: bool,
pub(crate) changed_active_set: bool,
- pub(crate) replaced_id: Option<T>,
+ pub(crate) conflict: ConflictResult<T>,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub(crate) enum ConflictResult<T> {
+ NoConflict,
+ Replaced(T),
+ Lost,
+ FailedCondition,
}

#[cfg(test)]
@@ -617,7 +631,7
is_active_now: true,
apply_successful: true,
changed_active_set: true,
- replaced_id: None,
+ conflict: ConflictResult::NoConflict,
},
res,
);
@@ -632,7 +646,7
is_active_now: true,
apply_successful: false,
changed_active_set: false,
- replaced_id: None,
+ conflict: ConflictResult::NoConflict,
},
res,
);
@@ -646,7 +660,7
is_active_now: true,
apply_successful: true,
changed_active_set: false,
- replaced_id: None,
+ conflict: ConflictResult::NoConflict,
},
res,
);
@@ -659,7 +673,7
is_active_now: false,
apply_successful: true,
changed_active_set: true,
- replaced_id: None,
+ conflict: ConflictResult::NoConflict,
},
res,
);
@@ -673,7 +687,7
is_active_now: false,
apply_successful: true,
changed_active_set: false,
- replaced_id: None,
+ conflict: ConflictResult::NoConflict,
},
res,
);
@@ -806,7 +820,7
.expect("member found");

assert!(summary.apply_successful);
- assert_eq!(Some(id), summary.replaced_id);
+ assert_eq!(ConflictResult::Replaced(id), summary.conflict);

let another = renewed.renew().unwrap();
let summary = members
@@ -815,8 +829,18

assert!(!summary.apply_successful);
assert_eq!(
- None, summary.replaced_id,
+ ConflictResult::FailedCondition,
+ summary.conflict,
"must not apply if condition fails"
);
+
+ // trying to go back to the past leads to
+ // conflict resolution failure
+ let summary = members
+ .apply_existing_if(Member::alive(id), |_| true)
+ .expect("member found");
+
+ assert_eq!(ConflictResult::Lost, summary.conflict,);
+ assert!(!summary.apply_successful, "must not apply if conflict lost");
}
}