caio.co/de/foca

conflict resolution within Members via Identity

I really don't like the `win_addr_conflict` name

... not like `has_same_prefix` was any better
Id
6e6ba5280e2f3dc8d537ec14d3e068d80073ea1d
Author
Caio
Commit time
2024-03-14T20:17:16+01:00

Modified examples/foca_insecure_udp_agent.rs

@@ -155,6 +155,11
fn addr(&self) -> SocketAddr {
self.addr
}
+
+ // FIXME explain
+ fn win_addr_conflict(&self, adversary: &Self) -> bool {
+ self.bump > adversary.bump
+ }
}

struct AccumulatingRuntime<T> {

Modified examples/identity_golf.rs

@@ -55,6 +55,11
fn addr(&self) -> SocketAddrV4 {
self.addr
}
+
+ // FIXME explain
+ fn win_addr_conflict(&self, adversary: &Self) -> bool {
+ self.extra > adversary.extra
+ }
}

// So now Foca will happily attempt to rejoin a cluster
@@ -101,9 +106,10
}
}

- // And implementing identity is as trivial as it always is:
+ // And implementing identity is as simple as it always is:
impl Identity for SubnetFixedPortId {
type Addr = (u8, u8);
+
fn renew(&self) -> Option<Self> {
Some(Self {
addr: self.addr,
@@ -113,6 +119,10

fn addr(&self) -> (u8, u8) {
self.addr
+ }
+
+ fn win_addr_conflict(&self, adversary: &Self) -> bool {
+ self.extra > adversary.extra
}
}

Modified src/identity.rs

@@ -29,6 +29,7
pub trait Identity: Clone + Eq + fmt::Debug {
/// FIXME docs
type Addr: PartialEq;
+
/// Opt-in on auto-rejoining by providing a new identity.
///
/// When Foca detects it's been declared Down by another member
@@ -40,6 +41,11

/// FIXME missing docs
fn addr(&self) -> Self::Addr;
+
+ /// FIXME docs
+ // decide which identity to keep in case of conflicts
+ // ffs this name
+ fn win_addr_conflict(&self, _adversary: &Self) -> bool;
}

#[cfg(feature = "std")]
@@ -54,6 +60,10

fn addr(&self) -> $type {
*self
+ }
+
+ fn win_addr_conflict(&self, _adversary: &Self) -> bool {
+ panic!("addr is self, there'll never be a conflict");
}
}
};

Modified src/lib.rs

@@ -658,10 +658,6
// Checking only incarnation is sufficient because to refute
// suspicion the member must increment its own incarnation
.apply_existing_if(as_down.clone(), |member| {
- // FIXME test that this doesn't kill a member after renew()
- // strict identity check, otherwise we'd end up declaring
- // a renewed identity as down
- // member.id() == &member_id &&
member.incarnation() == incarnation
})
{
@@ -3933,19 +3929,41
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let mut runtime = InMemoryRuntime::new();

- assert_eq!(
- Ok(()),
- foca.apply(Member::alive(ID::new_with_bump(2, 0)), &mut runtime)
- );
-
- assert_eq!(
- Ok(()),
- foca.apply(Member::alive(ID::new_with_bump(2, 1)), &mut runtime)
- );
-
+ // Given a known member ID=2,0
+ let original = ID::new_with_bump(2, 0);
+ assert_eq!(Ok(()), foca.apply(Member::alive(original), &mut runtime));
assert_eq!(1, foca.num_members());

- todo!("continue");
+ // When foca learns about a new member with same address ID=2,1
+ // that wins its conflict resolution round
+ let conflicted = ID::new_with_bump(2, 1);
+ assert_eq!(original.addr(), conflicted.addr());
+ assert!(conflicted.win_addr_conflict(&original));
+ assert_eq!(Ok(()), foca.apply(Member::alive(conflicted), &mut runtime));
+
+ // It should replace the original state
+ assert_eq!(1, foca.num_members());
+ assert_eq!(
+ foca.iter_members().next().unwrap(),
+ &Member::alive(conflicted)
+ );
+
+ // Conversely, if it learns about a member with same address
+ // that loses the conflict
+ assert!(!original.win_addr_conflict(&conflicted));
+ // nothing changes
+ for m in [
+ Member::alive(original),
+ Member::suspect(original),
+ Member::down(original),
+ ] {
+ assert_eq!(Ok(()), foca.apply(m, &mut runtime));
+ assert_eq!(1, foca.num_members());
+ assert_eq!(
+ foca.iter_members().next().unwrap(),
+ &Member::alive(conflicted)
+ );
+ }
}

#[test]
@@ -3982,4 +4000,6
assert_eq!(1, foca.num_members());
assert_eq!(foca.iter_members().next().unwrap(), &Member::alive(bumped));
}
+
+ // FIXME test force apply behavior, summary and notifications
}

Modified src/member.rs

@@ -261,9 +261,6
self.inner.iter().filter(|m| m.is_active())
}

- // XXX will probably need a specialized mark-as-down else it gets
- // awkward
-
pub(crate) fn apply_existing_if<F: Fn(&Member<T>) -> bool>(
&mut self,
update: Member<T>,
@@ -274,8 +271,31
.iter_mut()
.find(|member| member.id.addr() == update.id().addr())
{
+ // if there's a conflict and the update wins, the member
+ // state is fully replaced
+ let mut force_apply = false;
if known_member.id != update.id {
- todo!("GOTTA HANDLE SUM CONFLICT");
+ // If the update wins the conflict, the full member
+ // state is replaced (it's essentially a rejoin)
+ if known_member.id.win_addr_conflict(&update.id) {
+ // update lost conflict, it's junk
+ tracing::trace!(
+ update = tracing::field::debug(&update),
+ existing = tracing::field::debug(&known_member),
+ "existing won conflict resolution, nothing to do"
+ );
+ return Some(ApplySummary {
+ is_active_now: known_member.is_active(),
+ apply_successful: false,
+ changed_active_set: false,
+ });
+ }
+ tracing::trace!(
+ update = tracing::field::debug(&update),
+ existing = tracing::field::debug(&known_member),
+ "update won conflict resolution"
+ );
+ force_apply = true;
}

if !condition(known_member) {
@@ -286,7 +306,14
});
}
let was_active = known_member.is_active();
- let apply_successful = known_member.change_state(update.incarnation(), update.state());
+ let apply_successful = if force_apply {
+ known_member.id = update.id;
+ known_member.state = update.state;
+ known_member.incarnation = update.incarnation;
+ true
+ } else {
+ known_member.change_state(update.incarnation, update.state)
+ };
let is_active_now = known_member.is_active();
let changed_active_set = is_active_now != was_active;

@@ -366,6 +393,10

fn addr(&self) -> Self::Addr {
self.0
+ }
+
+ fn win_addr_conflict(&self, _adversary: &Self) -> bool {
+ panic!("addr is self, there'll never be a conflict");
}
}

Modified src/testing.rs

@@ -92,6 +92,11
fn addr(&self) -> u8 {
self.addr
}
+
+ fn win_addr_conflict(&self, adversary: &Self) -> bool {
+ debug_assert_ne!(self, adversary);
+ self.bump > adversary.bump
+ }
}

pub(crate) struct BadCodec;