caio.co/de/foca

New Notification::Rename for addr conflict resolution

Id
1d1dd55b9c85b2e4b2b0411439eb8b44fd72ac3d
Author
Caio
Commit time
2024-03-15T11:35:56+01:00

Modified src/lib.rs

@@ -661,7 +661,7
member.incarnation() == incarnation
})
{
- self.handle_apply_summary(&summary, as_down, &mut runtime)?;
+ self.handle_apply_summary(summary, as_down, &mut runtime)?;
// Member went down we might need to adjust our internal state
self.adjust_connection_state(&mut runtime);

@@ -1091,18 +1091,20
.members
.apply_existing_if(as_suspect.clone(), |_member| true)
{
- self.handle_apply_summary(&summary, as_suspect, &mut runtime)?;
+ let is_active_now = summary.is_active_now;
+ let apply_successful = summary.apply_successful;
+ self.handle_apply_summary(summary, as_suspect, &mut runtime)?;

// Now we ensure we change the member to Down if it
// isn't already inactive
- if summary.is_active_now {
+ if is_active_now {
// We check for summary.apply_successful prior to logging
// because we may pick a member multiple times before the
// timer runs out.
// May lead to not logging at all if our knowledge of this
// member was already set as State::Suspect
#[cfg(feature = "tracing")]
- if summary.apply_successful {
+ if apply_successful {
tracing::debug!(
member_id = ?failed.id(),
timeout = ?self.config.suspect_to_down_after,
@@ -1160,14 +1162,15
fn apply_update(&mut self, update: Member<T>, runtime: impl Runtime<T>) -> Result<bool> {
debug_assert_ne!(&self.identity, update.id());
let summary = self.members.apply(update.clone(), &mut self.rng);
- self.handle_apply_summary(&summary, update, runtime)?;
+ let active = summary.is_active_now;
+ self.handle_apply_summary(summary, update, runtime)?;

- Ok(summary.is_active_now)
+ Ok(active)
}

fn handle_apply_summary(
&mut self,
- summary: &ApplySummary,
+ summary: ApplySummary<T>,
update: Member<T>,
mut runtime: impl Runtime<T>,
) -> Result<()> {
@@ -1188,6 +1191,12
if !summary.is_active_now {
runtime.submit_after(Timer::RemoveDown(id.clone()), self.config.remove_down_after);
}
+ }
+
+ if let Some(old) = summary.replaced_id {
+ #[cfg(feature = "tracing")]
+ tracing::debug!(previous_id=?old, member_id=?id, "Renamed");
+ runtime.notify(Notification::Renamed(old, id.clone()));
}

if summary.changed_active_set {
@@ -4028,5 +4037,65
assert_eq!(foca.iter_members().next().unwrap(), &Member::alive(bumped));
}

- // FIXME test force apply behavior, summary and notifications
+ #[test]
+ fn notifies_on_conflict_resolution() {
+ let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
+ let mut runtime = InMemoryRuntime::new();
+
+ // Given a known member
+ let member = ID::new(2).rejoinable();
+ assert_eq!(Ok(()), foca.apply(Member::alive(member), &mut runtime));
+ assert_eq!(1, foca.num_members());
+
+ runtime.clear();
+
+ // Learning about its renewd id
+ let renewed = member.renew().expect("bumped");
+ assert_eq!(Ok(()), foca.apply(Member::alive(renewed), &mut runtime));
+ assert_eq!(1, foca.num_members());
+ // Should notify the runtime about the change
+ expect_notification!(runtime, Notification::Renamed(member, renewed));
+ // But no MemberUp notification should be fired, since
+ // previous addr was already active
+ reject_notification!(runtime, Notification::MemberUp(member));
+ reject_notification!(runtime, Notification::MemberUp(renewed));
+
+ runtime.clear();
+
+ // But if the renewed id is not active
+ let inactive = renewed.renew().expect("bumped");
+ assert_eq!(Ok(()), foca.apply(Member::down(inactive), &mut runtime));
+ assert_eq!(0, foca.num_members());
+ // We get notified of the rename
+ expect_notification!(runtime, Notification::Renamed(renewed, inactive));
+ // AND about the member going down with its new identity
+ expect_notification!(runtime, Notification::MemberDown(inactive));
+ // but nothing about the (now overriden, forgotten) previous one
+ reject_notification!(runtime, Notification::MemberDown(renewed));
+
+ runtime.clear();
+
+ // The inverse behaves similarly:
+ // Learning about a renewed active
+ let active = inactive.renew().expect("bumped");
+ assert_eq!(Ok(()), foca.apply(Member::suspect(active), &mut runtime));
+ assert_eq!(1, foca.num_members());
+ // Should notify about the rename
+ expect_notification!(runtime, Notification::Renamed(inactive, active));
+ // And about the member being active
+ expect_notification!(runtime, Notification::MemberUp(active));
+
+ runtime.clear();
+ // And if it learns about the previous ids again, regardless
+ // of their state, nothing happens
+ for m in [member, renewed, inactive] {
+ assert_eq!(Ok(()), foca.apply(Member::alive(m), &mut runtime));
+ assert_eq!(1, foca.num_members());
+ assert!(runtime.is_empty());
+
+ assert_eq!(Ok(()), foca.apply(Member::down(m), &mut runtime));
+ assert_eq!(1, foca.num_members());
+ assert!(runtime.is_empty());
+ }
+ }
}

Modified src/member.rs

@@ -263,9 +263,9

pub(crate) fn apply_existing_if<F: Fn(&Member<T>) -> bool>(
&mut self,
- update: Member<T>,
+ mut update: Member<T>,
condition: F,
- ) -> Option<ApplySummary> {
+ ) -> Option<ApplySummary<T>> {
if let Some(known_member) = self
.inner
.iter_mut()
@@ -288,6 +288,7
is_active_now: known_member.is_active(),
apply_successful: false,
changed_active_set: false,
+ replaced_id: None,
});
}
tracing::trace!(
@@ -303,11 +304,14
is_active_now: known_member.is_active(),
apply_successful: false,
changed_active_set: false,
+ replaced_id: None,
});
}
let was_active = known_member.is_active();
+ let mut replaced_id = None;
let apply_successful = if force_apply {
- known_member.id = update.id;
+ core::mem::swap(&mut known_member.id, &mut update.id);
+ replaced_id = Some(update.id);
known_member.state = update.state;
known_member.incarnation = update.incarnation;
true
@@ -330,13 +334,14
is_active_now,
apply_successful,
changed_active_set,
+ replaced_id,
})
} else {
None
}
}

- pub(crate) fn apply(&mut self, update: Member<T>, mut rng: impl Rng) -> ApplySummary {
+ pub(crate) fn apply(&mut self, update: Member<T>, mut rng: impl Rng) -> ApplySummary<T> {
self.apply_existing_if(update.clone(), |_member| true)
.unwrap_or_else(|| {
// Unknown member, we'll register it
@@ -361,6 +366,7
apply_successful: true,
// Registering a new active member changes the active set
changed_active_set: is_active_now,
+ replaced_id: None,
}
})
}
@@ -368,14 +374,17

#[derive(Debug, Clone, PartialEq)]
#[must_use]
-pub(crate) struct ApplySummary {
+pub(crate) struct ApplySummary<T> {
pub(crate) is_active_now: bool,
pub(crate) apply_successful: bool,
pub(crate) changed_active_set: bool,
+ pub(crate) replaced_id: Option<T>,
}

#[cfg(test)]
mod tests {
+
+ use crate::Identity;

use super::*;

@@ -596,7 +605,8
ApplySummary {
is_active_now: true,
apply_successful: true,
- changed_active_set: true
+ changed_active_set: true,
+ replaced_id: None,
},
res,
);
@@ -610,7 +620,8
ApplySummary {
is_active_now: true,
apply_successful: false,
- changed_active_set: false
+ changed_active_set: false,
+ replaced_id: None,
},
res,
);
@@ -623,7 +634,8
ApplySummary {
is_active_now: true,
apply_successful: true,
- changed_active_set: false
+ changed_active_set: false,
+ replaced_id: None,
},
res,
);
@@ -635,7 +647,8
ApplySummary {
is_active_now: false,
apply_successful: true,
- changed_active_set: true
+ changed_active_set: true,
+ replaced_id: None,
},
res,
);
@@ -648,7 +661,8
ApplySummary {
is_active_now: false,
apply_successful: true,
- changed_active_set: false
+ changed_active_set: false,
+ replaced_id: None,
},
res,
);
@@ -759,5 +773,33
member_id.0.parse::<usize>().expect("number") > 4
});
assert_eq!(vec![Member::suspect(Id("5"))], out);
+ }
+
+ #[test]
+ fn sets_replaced_id_on_addr_conflict() {
+ let id = crate::testing::ID::new(1).rejoinable();
+ let mut members = Members::new(Vec::from([
+ // 5 active members
+ Member::alive(id),
+ ]));
+
+ let renewed = id.renew().unwrap();
+ let summary = members
+ .apply_existing_if(Member::alive(renewed), |_| true)
+ .expect("member found");
+
+ assert!(summary.apply_successful);
+ assert_eq!(Some(id), summary.replaced_id);
+
+ let another = renewed.renew().unwrap();
+ let summary = members
+ .apply_existing_if(Member::alive(another), |_| false)
+ .expect("member found");
+
+ assert!(!summary.apply_successful);
+ assert_eq!(
+ None, summary.replaced_id,
+ "must not apply if condition fails"
+ );
}
}

Modified src/runtime.rs

@@ -74,6 +74,9
/// Can only happen if `MemberUp(T)` happened before.
MemberDown(T),

+ /// FIXME docs. better name?
+ Renamed(T, T),
+
/// Foca's current identity is known by at least one active member
/// of the cluster.
///

Modified src/testing.rs

@@ -379,6 +379,10
self.to_schedule.clear();
}

+ pub(crate) fn is_empty(&self) -> bool {
+ self.notifications.is_empty() && self.to_send.is_empty() && self.to_schedule.is_empty()
+ }
+
pub(crate) fn take_all_data(&mut self) -> Vec<(ID, Bytes)> {
core::mem::take(&mut self.to_send)
}