Merge branch 'strict-identity'
It's looking good from shallow testing and usage. I'll stress it a bit more over the next few days if I can before cutting a release
- Id
- c24a6cfe78f8db87a51e4285a33303799b37ec4b
- Author
- Caio
- Commit time
- 2024-03-17T21:25:15+01:00
Modified CHANGELOG.md
# Changelog
+## v0.17.0 - UNRELEASED
+
+This release contains significant changes aimed at freeing users
+from having to manage their own version of a unique list of members.
+
+Whilst upgrading won't be just trivially bumping the version, it's
+expected to be very easy and users will probably find themselves
+deleting more code than writing new one when doing it.
+
+Memory and CPU usage should go down since members and cluster
+updates are now bound by the number of distinct addresses whereas
+previously it would grow with the number of distinct identities.
+
+As this is a large change, users should be more cautious when
+upgrading and are welcome to open an issue in case of questions
+or problems.
+
+- **BREAKING**: `foca::Identity` has been revamped and now requires
+ a unique cluster-wide identifier (typically a socket address or a
+ hostname). When multiple identities appear with the same `Addr`
+ the conflict is handled by `Identity::win_addr_conflict`
+- **BREAKING**: _Custom_ broadcasts wire format has changed and
+ handler implementations now only need to emit an identifier (Key)
+ for each value being broadcast instead of managing the allocation
+ See the `BroadcastHandler` documentation and examples for details
+- There's now `Notification::Rename` that signals whenever an
+ identity with a conflicting `Addr` in the cluster gets replaced
+ by a newer one
+- There's no need to manage the list of members externally anymore:
+ foca does it all for you and `Foca::iter_members` only lists
+ the unique (by `Identity::Addr`), freshest identities
+- `examples/foca_insecure_udp_agent.rs` now comes with a fully working
+ custom broadcast example
+- There's now a simple `runtime::AccumulatingRuntime` that's good
+ enough for basic usage if you don't want to implement your own
+
## v0.16.0 - 2023-10-01
- Introduce `Foca::iter_membership_state` that provides a view into the
## v0.14.0 - 2023-09-03
- **BREAKING** Custom broadcast handlers now know which member sent the
- data they're handling to facilitate anti-entropy usecases.
+ data they're handling to facilitate anti-entropy use-cases.
See: https://github.com/caio/foca/issues/28
- Foca now only emits DEBUG and TRACE level traces when using the
`tracing` feature
- Foca will now gossip upon receiving messages that flag their identity
as suspect
-- Foca now resumes probing more quickly when recoving from an incorrect
+- Foca now resumes probing more quickly when recovering from an incorrect
sequence of Timer events
- The Timer enum now has a very simple Ord implementation to facilitate
dealing with out-of-order delivery. Sorting a slice of Timer events
structs, previously if would yield `Member::id`. This allows
users to bootstrap a foca instance with existing cluster state
by feeding its output directly to `Foca::apply_many`
-- **BREAKING**: `Config::remove_down_after` has been increated to
+- **BREAKING**: `Config::remove_down_after` has been increased to
24h. The previous default value of 2 minutes was still too small
and would lead to unreasonably large updates backlog on large
clusters. See: https://github.com/caio/foca/issues/19
Modified Cargo.toml
[package]
name = "foca"
version = "0.16.0"
+rust-version = "1.70.0"
authors = ["Caio <contact@caio.co>"]
edition = "2021"
license = "MPL-2.0"
description = "Gossip-based cluster membership discovery, based on SWIM"
keywords = ["swim", "gossip", "service-discovery", "memberlist"]
categories = ["network-programming", "no-std"]
-repository = "https://github.com/caio/foca"
-homepage = "https://github.com/caio/foca"
+repository = "https://caio.co/de/foca/"
+homepage = "https://caio.co/de/foca/"
documentation = "https://docs.rs/foca"
[package.metadata.docs.rs]
[[example]]
name = "foca_insecure_udp_agent"
-required-features = ["std", "tracing", "postcard-codec"]
-
-[[example]]
-name = "broadcasting"
-required-features = ["bincode-codec"]
+required-features = ["std", "tracing", "bincode-codec"]
[dev-dependencies]
clap = { version = "2", default-features = false }
Modified README.md
you actually run and see Foca swimming.
~~~
-$ cargo run --features std,tracing,postcard-codec --example foca_insecure_udp_agent -- --help
+$ cargo run --features std,tracing,bincode-codec --example foca_insecure_udp_agent -- --help
foca_insecure_udp_agent
USAGE:
Modified examples/foca_insecure_udp_agent.rs
path::Path,
str::FromStr,
sync::Arc,
- time::Duration,
};
+use bincode::Options;
use tracing_subscriber::{
filter::{EnvFilter, LevelFilter},
fmt,
prelude::*,
};
-use bytes::{BufMut, Bytes, BytesMut};
+use bytes::{Buf, BufMut, Bytes, BytesMut};
use clap::{App, Arg};
use rand::{rngs::StdRng, SeedableRng};
use tokio::{
time::{sleep_until, Instant},
};
-use foca::{Config, Foca, Identity, Notification, PostcardCodec, Runtime, Timer};
+use foca::{BincodeCodec, Config, Foca, Notification, Timer};
#[derive(Debug)]
struct CliParams {
struct ID {
addr: SocketAddr,
// An extra field to allow fast rejoin
- bump: u16,
+ bump: u64,
}
// We implement a custom, simpler Debug format just to make the tracing
fn new(addr: SocketAddr) -> Self {
Self {
addr,
- bump: rand::random(),
+ bump: secs_since_epoch(),
}
}
}
-impl Identity for ID {
- // Since a client outside the cluster will not be aware of our
- // `bump` field, we implement the optional trait method
- // `has_same_prefix` to allow anyone that knows our `addr`
- // to join our cluster.
- fn has_same_prefix(&self, other: &Self) -> bool {
- self.addr.eq(&other.addr)
- }
+impl foca::Identity for ID {
+ type Addr = SocketAddr;
// And by implementing `renew` we enable automatic rejoining:
// when another member declares us as down, Foca immediatelly
bump: self.bump.wrapping_add(1),
})
}
-}
-struct AccumulatingRuntime<T> {
- pub to_send: Vec<(T, Bytes)>,
- pub to_schedule: Vec<(Duration, Timer<T>)>,
- pub notifications: Vec<Notification<T>>,
- buf: BytesMut,
-}
-
-impl<T: Identity> Runtime<T> for AccumulatingRuntime<T> {
- // Notice that we'll interact to these via pop(), so we're taking
- // them in reverse order of when it happened.
- // That's perfectly fine, the order of items from a single interaction
- // is irrelevant. A "nicer" implementation could use VecDeque or
- // react directly here instead of accumulating.
-
- fn notify(&mut self, notification: Notification<T>) {
- self.notifications.push(notification);
+ fn addr(&self) -> SocketAddr {
+ self.addr
}
- fn send_to(&mut self, to: T, data: &[u8]) {
- let mut packet = self.buf.split();
- packet.put_slice(data);
- self.to_send.push((to, packet.freeze()));
- }
-
- fn submit_after(&mut self, event: Timer<T>, after: Duration) {
- // We could spawn+sleep here
- self.to_schedule.push((after, event));
- }
-}
-
-impl<T> AccumulatingRuntime<T> {
- pub fn new() -> Self {
- Self {
- to_send: Vec::new(),
- to_schedule: Vec::new(),
- notifications: Vec::new(),
- buf: BytesMut::new(),
- }
- }
-
- pub fn backlog(&self) -> usize {
- self.to_send.len() + self.to_schedule.len() + self.notifications.len()
- }
-}
-
-// Our identity is a composite of a socket address and extra
-// stuff, but downstream consumers likely only care about
-// the address part.
-//
-// It's perfectly valid to temprarily have more than one member
-// pointing at the same address (with a different `bump`): one
-// could, for example: join the cluster, ^C the program and
-// immediatelly join again. Before Foca detects that the previous
-// identity is down we'll receive a notification about this new
-// identity going up.
-//
-// So what we maintain here is a HashMap of addresses to an
-// occurence count:
-//
-// * The count will most of the time be 1;
-// * But in scenarios like above it may reach 2. Meaning:
-// something made the address change identities, but
-// it's still active
-// * And when the count reaches 0 the address is actually
-// down, so we remove it
-//
-struct Members(HashMap<SocketAddr, u8>);
-
-impl Members {
- fn new() -> Self {
- Self(HashMap::new())
- }
-
- // A result of `true` means that the effective list of
- // cluster member addresses has changed
- fn add_member(&mut self, member: ID) -> bool {
- // Notice how we don't care at all about the `bump` part.
- // It's only useful for Foca.
- let counter = self.0.entry(member.addr).or_insert(0);
-
- *counter += 1;
-
- counter == &1
- }
-
- // A result of `true` means that the effective list of
- // cluster member addresses has changed
- fn remove_member(&mut self, member: ID) -> bool {
- let effectivelly_down = if let Some(counter) = self.0.get_mut(&member.addr) {
- *counter -= 1;
-
- counter == &0
- } else {
- // Shouldn't happen
- false
- };
-
- if effectivelly_down {
- self.0.remove(&member.addr);
- }
-
- effectivelly_down
- }
-
- fn addrs(&self) -> impl Iterator<Item = &SocketAddr> {
- self.0.keys()
+ // This teaches every member how to compare two identities
+ // with the same Addr value
+ // In our case, the one with the larger bump always wins
+ fn win_addr_conflict(&self, adversary: &Self) -> bool {
+ self.bump > adversary.bump
}
}
)
.init();
- tracing::info!(?params, "Started");
+ tracing::info!(params = tracing::field::debug(¶ms), "Started");
let CliParams {
bind_addr,
let buf_len = config.max_packet_size.get();
let mut recv_buf = vec![0u8; buf_len];
- let mut foca = Foca::new(identity, config, rng, PostcardCodec);
+ let mut foca = Foca::with_custom_broadcast(
+ identity,
+ config,
+ rng,
+ BincodeCodec(bincode::DefaultOptions::new()),
+ Handler::new(),
+ );
let socket = Arc::new(UdpSocket::bind(bind_addr).await?);
// We'll create a task responsible to sending data through the
// until the necessary time to submit the events it receives
let scheduler = launch_scheduler(tx_foca.clone()).await;
- let mut runtime = AccumulatingRuntime::new();
- let mut members = Members::new();
+ // Periodically instruct foca to send a custom broadcast
+ let broadcast_tx_foca = tx_foca.clone();
tokio::spawn(async move {
+ let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
+ loop {
+ interval.tick().await;
+ if broadcast_tx_foca.send(Input::SendBroadcast).await.is_err() {
+ break;
+ }
+ }
+ });
+
+ let mut runtime = foca::AccumulatingRuntime::new();
+ tokio::spawn(async move {
+ let mut ser_buf = Vec::new();
+ let mut last_change_at = 0;
+
while let Some(input) = rx_foca.recv().await {
debug_assert_eq!(0, runtime.backlog());
Input::Event(timer) => foca.handle_timer(timer, &mut runtime),
Input::Data(data) => foca.handle_data(&data, &mut runtime),
Input::Announce(dst) => foca.announce(dst, &mut runtime),
+ Input::SendBroadcast => {
+ let msg = format!(
+ "Hello from {:?}! I have {} peers",
+ foca.identity(),
+ foca.num_members()
+ );
+
+ let key = BroadcastKey {
+ addr: foca.identity().addr,
+ version: last_change_at,
+ };
+ ser_buf.clear();
+ bincode::DefaultOptions::new()
+ .serialize_into(&mut ser_buf, &Broadcast { key, msg })
+ .expect("ser error handling");
+
+ // Notice that we're unconditionally adding a custom
+ // broadcast to the backlog, so there will always be some
+ // data being passed around (i.e. it's akin to a heartbeat)
+ // A complex system would have multiple kinds of broadcasts
+ // some heartbeat-like (service advertisement, node status)
+ // and some more message-like (leadership election, anti-
+ // entropy)
+ foca.add_broadcast(&ser_buf).map(|_| ())
+ }
};
// Every public foca result yields `()` on success, so there's
// And we'd decide what to do with each error, but Foca
// is pretty tolerant so we just log them and pretend
// all is fine
- tracing::error!(?error, "Ignored error");
+ tracing::error!(error = tracing::field::debug(error), "Ignored error");
}
// Now we react to what happened.
// and then drain the runtime.
// First we submit everything that needs to go to the network
- while let Some((dst, data)) = runtime.to_send.pop() {
+ while let Some((dst, data)) = runtime.to_send() {
// ToSocketAddrs would be the fancy thing to use here
let _ignored_send_result = tx_send_data.send((dst.addr, data)).await;
}
// Then schedule what needs to be scheduled
let now = Instant::now();
- while let Some((delay, event)) = runtime.to_schedule.pop() {
+ while let Some((delay, event)) = runtime.to_schedule() {
scheduler
.send((now + delay, event))
.expect("error handling");
// so other proccesses periodically open()/read()/close()
// to figure out the cluster members.
let mut active_list_has_changed = false;
- while let Some(notification) = runtime.notifications.pop() {
+ while let Some(notification) = runtime.to_notify() {
match notification {
- Notification::MemberUp(id) => active_list_has_changed |= members.add_member(id),
- Notification::MemberDown(id) => {
- active_list_has_changed |= members.remove_member(id)
+ Notification::MemberUp(_) | Notification::MemberDown(_) => {
+ active_list_has_changed = true;
+ last_change_at = secs_since_epoch();
}
Notification::Idle => {
tracing::info!("cluster empty");
}
+ Notification::Rename(old, new) => {
+ tracing::info!("member {old:?} is now known as {new:?}");
+ }
other => {
- tracing::debug!(notification = ?other, "Unhandled")
+ tracing::debug!(notification = tracing::field::debug(other), "Unhandled")
}
}
}
if active_list_has_changed {
- do_the_file_replace_dance(&filename, members.addrs())
+ do_the_file_replace_dance(&filename, foca.iter_members().map(|m| &m.id().addr))
.expect("Can write the file alright");
}
}
Data(Bytes),
Announce(T),
Event(Timer<T>),
+ SendBroadcast,
}
async fn launch_scheduler(
($event:expr) => {
if let Err(err) = timer_tx.send(Input::Event($event)).await {
tracing::error!(
- ?err,
+ err = tracing::field::debug(err),
"Error submitting timer event. Shutting down timer task"
);
rx.close();
};
}
- // XXX Maybe watch for lange `now - _ins` deltas
+ // XXX Maybe watch for large `now - _ins` deltas to detect runtime lag
while let Some((_ins, event)) = queue.pop_next(&now) {
submit_event!(event);
}
self.0.pop().map(|Reverse(inner)| inner)
} else {
None
+ }
+ }
+}
+
+fn secs_since_epoch() -> u64 {
+ std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap()
+ .as_secs()
+}
+
+#[derive(serde::Deserialize, serde::Serialize)]
+struct BroadcastKey {
+ addr: SocketAddr,
+ version: u64,
+}
+
+#[derive(serde::Deserialize, serde::Serialize)]
+struct Broadcast {
+ key: BroadcastKey,
+ msg: String,
+}
+
+impl foca::Invalidates for BroadcastKey {
+ fn invalidates(&self, other: &Self) -> bool {
+ self.addr == other.addr && self.version > other.version
+ }
+}
+
+struct Handler {
+ messages: HashMap<SocketAddr, (u64, String)>,
+ opts: bincode::DefaultOptions,
+}
+
+impl Handler {
+ fn new() -> Self {
+ Self {
+ messages: Default::default(),
+ opts: bincode::DefaultOptions::new(),
+ }
+ }
+}
+
+impl foca::BroadcastHandler<ID> for Handler {
+ type Key = BroadcastKey;
+
+ type Error = String;
+
+ fn receive_item(
+ &mut self,
+ data: &[u8],
+ _sender: Option<&ID>,
+ ) -> Result<Option<Self::Key>, Self::Error> {
+ let mut reader = data.reader();
+
+ // In this contrived example, we decode the whole broadcast
+ // directly. Ideally, one would first decode just the key
+ // so that you can quickly verify if there's a need to
+ // decode the rest of the payload.
+ let Broadcast { key, msg }: Broadcast = self
+ .opts
+ .deserialize_from(&mut reader)
+ .map_err(|err| format!("bad broadcast: {err}"))?;
+
+ let is_new_message = self
+ .messages
+ .get(&key.addr)
+ // If we already have info about the node, check if the version
+ // is newer
+ .map(|(cur_version, _)| cur_version < &key.version)
+ .unwrap_or(true);
+
+ if is_new_message {
+ tracing::info!(
+ payload = tracing::field::debug(&msg),
+ "new custom broadcast",
+ );
+
+ if let Some(previous) = self.messages.insert(key.addr, (key.version, msg)) {
+ tracing::debug!(previous = tracing::field::debug(&previous), "old node data");
+ }
+
+ Ok(Some(key))
+ } else {
+ tracing::trace!(
+ node = tracing::field::debug(key.addr),
+ version = tracing::field::debug(key.version),
+ payload = tracing::field::debug(&msg),
+ "discarded previously seen message"
+ );
+ Ok(None)
}
}
}
Modified examples/identity_golf.rs
}
impl Identity for FatIdentity {
+ type Addr = SocketAddrV4;
+
// We want fast rejoins, so we simply bump the extra field
// maintaining the actual network address intact
fn renew(&self) -> Option<Self> {
})
}
- // And we ensure that members can Announce to us without
- // knowing our (randomized) extra field
- fn has_same_prefix(&self, other: &Self) -> bool {
- self.addr.eq(&other.addr)
+ fn addr(&self) -> SocketAddrV4 {
+ self.addr
+ }
+
+ // When an identity is renew()ed, the cluster will start
+ // seeing two distinct FatIdentity with the exact same
+ // Addr.
+ // This teaches foca to choose the right one to keep
+ // In this case, the right one is simply the one with
+ // the higher `extra` field
+ fn win_addr_conflict(&self, adversary: &Self) -> bool {
+ self.extra > adversary.extra
}
}
}
}
- // And implementing identity is as trivial as it always is:
+ // And implementing identity is as simple as it always is:
impl Identity for SubnetFixedPortId {
+ type Addr = (u8, u8);
+
fn renew(&self) -> Option<Self> {
Some(Self {
addr: self.addr,
})
}
- // And we ensure that members can Announce to us without
- // knowing our (randomized) extra field
- fn has_same_prefix(&self, other: &Self) -> bool {
- self.addr.eq(&other.addr)
+ fn addr(&self) -> (u8, u8) {
+ self.addr
+ }
+
+ fn win_addr_conflict(&self, adversary: &Self) -> bool {
+ self.extra > adversary.extra
}
}
Modified src/broadcast.rs
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use alloc::vec::Vec;
use core::{cmp::Ordering, fmt};
-use bytes::{Buf, BufMut};
+use bytes::BufMut;
/// A type capable of decoding a (associated) broadcast from a buffer
/// and deciding whether to keep disseminating it for other members
/// of the cluster (when it's new information) or to discard it (when
/// its outdated/stale).
pub trait BroadcastHandler<T> {
- /// Concrete type that will be disseminated to all cluster members.
+ /// A unique identifier for the broadcasts this handler manages
///
/// It should be able to compare itself against an arbitrary number
- /// of other [`Self::Broadcast`] instances and decide wether it
+ /// of other [`Self::Key`] instances and decide wether it
/// replaces it or not so conflicting/stale information isn't
/// disseminated.
- ///
- /// The `AsRef<[u8]>` part is what gets sent over the wire, which
- /// [`Self::receive_item`] is supposed to decode.
- type Broadcast: Invalidates + AsRef<[u8]>;
+ type Key: Invalidates;
/// The error type that `receive_item` may emit. Will be wrapped
- /// by [`crate::Error`].
+ /// by [`crate::Error::CustomBroadcast`].
type Error: fmt::Debug + fmt::Display + Send + Sync + 'static;
- /// Decodes a [`Self::Broadcast`] from a buffer and either discards
+ /// Decodes a [`Self::Key`] from a buffer and either discards
/// it or tells Foca to persist and disseminate it.
///
/// `Sender` is `None` when you're adding broadcast data directly,
/// contains the Key-Value pair; If it didn't, you yield
/// `Some`, otherwise the operation is stale, so you yield `None`.
///
- /// Implementations MUST read a single [`Self::Broadcast`] from the
- /// buffer and advance the cursor accordingly.
- ///
- /// Implementations may assume the data in the buffer is contiguous.
+ /// The `data` parameter is the exact data provided to
+ /// `crate::Foca::add_broadcast`. When Foca receives N custom
+ /// broadcasts at once, this gets called N times.
fn receive_item(
&mut self,
- data: impl Buf,
+ data: &[u8],
sender: Option<&T>,
- ) -> Result<Option<Self::Broadcast>, Self::Error>;
+ ) -> Result<Option<Self::Key>, Self::Error>;
/// Decides whether Foca should add broadcast data to the message
/// it's about to send to active member `T`.
}
}
+#[allow(dead_code)]
pub(crate) struct Broadcasts<V> {
- storage: Vec<Entry<V>>,
+ flip: alloc::collections::BinaryHeap<Entry<V>>,
+ flop: alloc::collections::BinaryHeap<Entry<V>>,
}
impl<T> Broadcasts<T>
where
- T: Invalidates + AsRef<[u8]>,
+ T: Invalidates,
{
pub(crate) fn new() -> Self {
Self {
- storage: Vec::new(),
+ flip: Default::default(),
+ flop: Default::default(),
}
}
pub(crate) fn len(&self) -> usize {
- self.storage.len()
+ self.flip.len()
}
- pub(crate) fn add_or_replace(&mut self, value: T, max_tx: usize) {
- let new_node = Entry {
+ pub(crate) fn is_empty(&self) -> bool {
+ self.flip.is_empty()
+ }
+
+ pub(crate) fn add_or_replace(&mut self, item: T, data: Vec<u8>, max_tx: usize) {
+ debug_assert!(max_tx > 0);
+ self.flip.retain(|node| !item.invalidates(&node.item));
+ self.flip.push(Entry {
remaining_tx: max_tx,
- value,
- };
-
- // Can I be smarter here?
- if let Some(position) = self
- .storage
- .iter()
- .position(|node| new_node.value.invalidates(&node.value))
- {
- self.storage.remove(position);
- }
-
- // Find where to insert whilst keeping the storage sorted
- // Searching from the right may be better since there is a
- // bound and default value for `remaining_tx`
- let position = self
- .storage
- .binary_search(&new_node)
- .unwrap_or_else(|pos| pos);
- self.storage.insert(position, new_node);
+ item,
+ data,
+ });
}
pub(crate) fn fill(&mut self, mut buffer: impl BufMut, max_items: usize) -> usize {
- if self.storage.is_empty() {
+ if self.flip.is_empty() {
return 0;
}
+ debug_assert!(self.flop.is_empty());
let mut num_taken = 0;
- let mut num_removed = 0;
- let starting_len = self.storage.len();
let mut remaining = max_items;
- // We fill the buffer giving priority to the largest
- // least sent items.
- for idx in (0..starting_len).rev() {
- if !buffer.has_remaining_mut() || remaining == 0 {
+ while buffer.has_remaining_mut() && remaining > 0 {
+ let Some(mut node) = self.flip.pop() else {
break;
- }
-
- let node = &mut self.storage[idx];
- let value_len = node.value.as_ref().len();
+ };
debug_assert!(node.remaining_tx > 0);
- if buffer.remaining_mut() >= value_len {
+ if buffer.remaining_mut() >= node.data.len() {
num_taken += 1;
remaining -= 1;
- buffer.put_slice(node.value.as_ref());
+ buffer.put_slice(&node.data);
+ node.remaining_tx -= 1;
+ }
- if node.remaining_tx == 1 {
- // Last transmission, gotta remove the node.
- // It's ok to swap_remove because we're walking
- // the storage from the right to the left
- self.storage.swap_remove(idx);
- num_removed += 1;
- } else {
- node.remaining_tx -= 1;
- }
+ if node.remaining_tx > 0 {
+ self.flop.push(node);
}
}
- if num_removed > 0 {
- self.storage.truncate(starting_len - num_removed);
- }
-
- // XXX Any other easy "bail out" scenario?
- let skip_resort = {
- // If we took all the nodes without removing any
- (num_taken == starting_len && num_removed == 0)
- // Or ignored them all
- || num_taken == 0
- };
-
- if !skip_resort {
- self.storage.sort_unstable();
- }
-
- debug_assert!(!skip_resort || self.is_sorted());
+ self.flip.append(&mut self.flop);
num_taken
}
- pub(crate) fn is_sorted(&self) -> bool {
- // Future: `is_sorted` from https://github.com/rust-lang/rfcs/pull/2351
- self.storage[..]
- .windows(2)
- .all(|w| w[0].remaining_tx <= w[1].remaining_tx)
- }
+ pub(crate) fn fill_with_len_prefix(
+ &mut self,
+ mut buffer: impl BufMut,
+ max_items: usize,
+ ) -> usize {
+ if self.flip.is_empty() {
+ return 0;
+ }
+ debug_assert!(self.flop.is_empty());
- pub(crate) fn is_empty(&self) -> bool {
- self.storage.is_empty()
+ let mut num_taken = 0;
+ let mut remaining = max_items;
+
+ while buffer.has_remaining_mut() && remaining > 0 {
+ let Some(mut node) = self.flip.pop() else {
+ break;
+ };
+ debug_assert!(node.remaining_tx > 0);
+
+ if buffer.remaining_mut() >= node.data.len() + 2 {
+ num_taken += 1;
+ remaining -= 1;
+
+ debug_assert!(node.data.len() <= core::u16::MAX as usize);
+ buffer.put_u16(node.data.len() as u16);
+ buffer.put_slice(&node.data);
+ node.remaining_tx -= 1;
+ }
+
+ if node.remaining_tx > 0 {
+ self.flop.push(node);
+ }
+ }
+
+ self.flip.append(&mut self.flop);
+
+ num_taken
}
}
#[derive(Debug, Clone)]
struct Entry<T> {
remaining_tx: usize,
- value: T,
+ // XXX could be Bytes, or keep a pool in parent
+ data: Vec<u8>,
+ // ignored for eq/ord. sorting is unstable
+ item: T,
}
-impl<T: AsRef<[u8]>> PartialEq for Entry<T> {
+impl<T> PartialEq for Entry<T> {
fn eq(&self, other: &Self) -> bool {
- self.remaining_tx == other.remaining_tx
- && self.value.as_ref().len() == other.value.as_ref().len()
+ self.cmp(other).is_eq()
}
}
-impl<T: AsRef<[u8]>> Eq for Entry<T> {}
+impl<T> Eq for Entry<T> {}
-impl<T: AsRef<[u8]>> Ord for Entry<T> {
- fn cmp(&self, other: &Self) -> Ordering {
- let ordering = self.remaining_tx.cmp(&other.remaining_tx);
-
- if ordering == Ordering::Equal {
- self.value.as_ref().len().cmp(&other.value.as_ref().len())
- } else {
- ordering
- }
- }
-}
-
-impl<T: AsRef<[u8]>> PartialOrd for Entry<T> {
+impl<T> PartialOrd for Entry<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
+ }
+}
+
+impl<T> Ord for Entry<T> {
+ fn cmp(&self, other: &Self) -> Ordering {
+ self.remaining_tx
+ .cmp(&other.remaining_tx)
+ .then_with(|| self.data.len().cmp(&other.data.len()))
}
}
use super::*;
- struct TwoByteKey(Vec<u8>);
+ struct Key(&'static str);
- impl TwoByteKey {
- fn new(data: impl AsRef<[u8]>) -> Self {
- assert!(
- data.as_ref().len() > 2,
- "first two bytes are used as key for invalidation"
- );
- Self(Vec::from(data.as_ref()))
- }
- }
-
- impl Invalidates for TwoByteKey {
+ impl Invalidates for Key {
fn invalidates(&self, other: &Self) -> bool {
- self.0[..2] == other.0[..2]
- }
- }
-
- impl AsRef<[u8]> for TwoByteKey {
- fn as_ref(&self) -> &[u8] {
- self.0.as_ref()
+ self.0 == other.0
}
}
assert!(piggyback.is_empty(), "Piggyback starts empty");
- piggyback.add_or_replace(TwoByteKey::new(b"AAabc"), max_tx);
+ piggyback.add_or_replace(Key("AA"), b"AAabc".to_vec(), max_tx);
assert_eq!(1, piggyback.len());
- piggyback.add_or_replace(TwoByteKey::new(b"AAcba"), max_tx);
+ piggyback.add_or_replace(Key("AA"), b"AAcba".to_vec(), max_tx);
assert_eq!(
1,
#[test]
fn fill_does_nothing_if_buffer_full() {
let mut piggyback = Broadcasts::new();
- piggyback.add_or_replace(TwoByteKey::new(b"a super long value"), 1);
+ piggyback.add_or_replace(Key("a "), b"a super long value".to_vec(), 1);
let buf = bytes::BytesMut::new();
let mut limited = buf.limit(5);
let max_tx = 10;
let mut piggyback = Broadcasts::new();
- piggyback.add_or_replace(TwoByteKey::new(b"00hi"), max_tx);
- piggyback.add_or_replace(TwoByteKey::new(b"01hello"), max_tx);
- piggyback.add_or_replace(TwoByteKey::new(b"02hey"), max_tx);
+ piggyback.add_or_replace(Key("00"), b"00hi".to_vec(), max_tx);
+ piggyback.add_or_replace(Key("01"), b"01hello".to_vec(), max_tx);
+ piggyback.add_or_replace(Key("02"), b"02hey".to_vec(), max_tx);
let mut buf = Vec::new();
let num_items = piggyback.fill(&mut buf, usize::MAX);
let mut piggyback = Broadcasts::new();
// 3 items, same byte size, distinct max_tx
- piggyback.add_or_replace(TwoByteKey::new(b"100"), 1);
- piggyback.add_or_replace(TwoByteKey::new(b"200"), 2);
- piggyback.add_or_replace(TwoByteKey::new(b"300"), 3);
+ piggyback.add_or_replace(Key("10"), b"100".to_vec(), 1);
+ piggyback.add_or_replace(Key("20"), b"200".to_vec(), 2);
+ piggyback.add_or_replace(Key("30"), b"300".to_vec(), 3);
let mut buf = Vec::new();
piggyback.fill(&mut buf, usize::MAX);
let max_tx = 10;
let mut piggyback = Broadcasts::new();
- piggyback.add_or_replace(TwoByteKey::new(b"foo"), max_tx);
- piggyback.add_or_replace(TwoByteKey::new(b"bar"), max_tx);
- piggyback.add_or_replace(TwoByteKey::new(b"baz"), max_tx);
+ piggyback.add_or_replace(Key("fo"), b"foo".to_vec(), max_tx);
+ piggyback.add_or_replace(Key("ba"), b"bar".to_vec(), max_tx);
+ piggyback.add_or_replace(Key("ba"), b"baz".to_vec(), max_tx);
let mut buf = Vec::new();
let num_items = piggyback.fill(&mut buf, 0);
let num_items = piggyback.fill(&mut buf, 2);
assert_eq!(2, num_items);
+ }
+
+ #[test]
+ fn fill_with_len_prefix() {
+ let mut bcs = Broadcasts::new();
+
+ bcs.add_or_replace(Key("fo"), b"foo".to_vec(), 10);
+ bcs.add_or_replace(Key("ba"), b"barr".to_vec(), 10);
+ bcs.add_or_replace(Key("ba"), b"bazz".to_vec(), 10);
+
+ let mut buf = Vec::new();
+ let num_items = bcs.fill_with_len_prefix(&mut buf, 0);
+
+ assert_eq!(0, num_items);
+ assert!(buf.is_empty());
+
+ let num_items = bcs.fill_with_len_prefix(&mut buf, 2);
+ assert_eq!(2, num_items);
+
+ use bytes::Buf;
+ let mut buf = &buf[..];
+ assert_eq!(4, buf.get_u16());
+ assert_eq!(&b"bazz"[..], &buf[..4]);
+ buf.advance(4);
+ assert_eq!(3, buf.get_u16());
+ assert_eq!(&b"foo"[..], &buf[..3]);
+ buf.advance(3);
+ assert!(buf.is_empty());
}
}
Modified src/identity.rs
/// See `examples/identity_golf.rs` for ideas
///
pub trait Identity: Clone + Eq + fmt::Debug {
+ /// The type of the unique (cluster-wide) address of this identity
+ ///
+ /// A plain identity that cannot auto-rejoin (see `Identity::renew`)
+ /// could have `Addr` the same as `Self` (`std::net::SocketAddr` is
+ /// an example of one)
+ ///
+ /// It's a good idea to have this type as lean as possible
+ type Addr: PartialEq;
+
/// Opt-in on auto-rejoining by providing a new identity.
///
/// When Foca detects it's been declared Down by another member
/// identity and if it yields a new one will immediately
/// switch to it and notify the cluster so that downtime is
/// minimized.
+ ///
+ /// **NOTE** The new identity must win the conflict
fn renew(&self) -> Option<Self>;
- /// Optionally accept Announce messages addressed to an identity
- /// that isn't exactly the same as ours.
+ /// Return this identity's unique address
///
- /// Foca discards messages that aren't addressed to its exact
- /// identity. This means that if your identity has an unpredictable
- /// field (a UUID or a random number, for example), nobody will
- /// be able to join with us directly.
+ /// Typically a socket address, a hostname or similar
///
- /// The [`Self::has_same_prefix`] method is how we teach Foca to
- /// relax this restriction: Upon receiving an Announce message it
- /// will call `current_id.has_same_prefix(dst)` and if it yields
- /// `true` the message will be accepted and the new member will
- /// be allowed to join the cluster.
- fn has_same_prefix(&self, other: &Self) -> bool;
+ /// On previous versions of this crate, there was a `has_same_prefix()`
+ /// method. This serves the same purpose. Having a concrete type
+ /// instead of just a yes/no allows Foca to fully manage the
+ /// cluster members and keep its memory bound by the number of nodes
+ /// instead of the number of identities
+ fn addr(&self) -> Self::Addr;
+
+ /// Decides which to keep when Foca encounters multiple identities
+ /// sharing the same address
+ ///
+ /// Returning `true` means that self will be kept
+ fn win_addr_conflict(&self, _adversary: &Self) -> bool;
}
#[cfg(feature = "std")]
macro_rules! impl_basic_identity {
($type: ty) => {
impl Identity for $type {
+ type Addr = $type;
+
fn renew(&self) -> Option<Self> {
None
}
- fn has_same_prefix(&self, _other: &Self) -> bool {
- false
+ fn addr(&self) -> $type {
+ *self
+ }
+
+ fn win_addr_conflict(&self, _adversary: &Self) -> bool {
+ panic!("addr is self, there'll never be a conflict");
}
}
};
Modified src/lib.rs
use core::{cmp::Ordering, convert::TryFrom, fmt, iter::ExactSizeIterator, mem};
-use bytes::{Buf, BufMut, Bytes, BytesMut};
+use bytes::{Buf, BufMut};
use rand::Rng;
mod broadcast;
identity::Identity,
member::{Incarnation, Member, State},
payload::{Header, Message, ProbeNumber},
- runtime::{Notification, Runtime, Timer, TimerToken},
+ runtime::{AccumulatingRuntime, Notification, Runtime, Timer, TimerToken},
};
#[cfg(feature = "postcard-codec")]
/// operation inside out (think callbacks, or an out parameter like
/// `void* out`). This allows Foca to avoid deciding anything related
/// to how it interacts with the operating system.
-pub struct Foca<T, C, RNG, B: BroadcastHandler<T>> {
+pub struct Foca<T: Identity, C, RNG, B: BroadcastHandler<T>> {
identity: T,
codec: C,
rng: RNG,
// sending data
member_buf: Vec<Member<T>>,
- // Since we emit data via `Runtime::send_to`, this could
- // easily be a Vec, but `BytesMut::limit` is quite handy
- send_buf: BytesMut,
+ send_buf: Vec<u8>,
// Holds (serialized) cluster updates, which may live for a
// while until they get disseminated `Config::max_transmissions`
// times or replaced by fresher updates.
- updates_buf: BytesMut,
- updates: Broadcasts<ClusterUpdate<T>>,
+ updates: Broadcasts<Addr<T::Addr>>,
broadcast_handler: B,
- custom_broadcasts: Broadcasts<B::Broadcast>,
+ custom_broadcasts: Broadcasts<B::Key>,
}
impl<T, C, RNG> Foca<T, C, RNG, NoCustomBroadcast>
}
#[cfg(feature = "tracing")]
-impl<T: Identity, C, RNG, B: BroadcastHandler<T>> fmt::Debug for Foca<T, C, RNG, B> {
+impl<T, C, RNG, B> fmt::Debug for Foca<T, C, RNG, B>
+where
+ T: Identity,
+ B: BroadcastHandler<T>,
+{
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
// Assuming that when tracing comes into play the cluster is actually
// uniform. Meaning: everything is configured the same, including
member_buf: Vec::new(),
connection_state: ConnectionState::Disconnected,
updates: Broadcasts::new(),
- send_buf: BytesMut::with_capacity(max_bytes),
+ send_buf: Vec::with_capacity(max_bytes),
custom_broadcasts: Broadcasts::new(),
- updates_buf: BytesMut::new(),
broadcast_handler,
}
}
self.reset();
#[cfg(feature = "tracing")]
- tracing::debug!(?self, ?previous_id, "changed identity");
+ tracing::debug!(
+ self = tracing::field::debug(&self),
+ previous_id = tracing::field::debug(&previous_id),
+ "changed identity"
+ );
// If our previous identity wasn't known as Down already,
// we'll declare it ourselves
if !previous_is_down {
- let data = self.serialize_member(Member::down(previous_id.clone()))?;
- self.updates.add_or_replace(
- ClusterUpdate {
- member_id: previous_id,
- data,
- },
- self.config.max_transmissions.get().into(),
- );
+ let addr = Addr(previous_id.addr());
+ let data = self.serialize_member(Member::down(previous_id))?;
+ self.updates
+ .add_or_replace(addr, data, self.config.max_transmissions.get().into());
}
self.gossip(runtime)?;
/// that have been declared down.
///
/// This is for advanced usage, to be used in tandem with
- /// [`Foca::apply_many`]. The main usecase for this is
+ /// [`Foca::apply_many`]. The main use-case for this is
/// state replication:
///
/// 1. You may want to send it to another node so that it knows
for update in updates {
if update.id() == &self.identity {
self.handle_self_update(update.incarnation(), update.state(), &mut runtime)?;
- } else if self.identity.has_same_prefix(update.id()) {
+ } else if self.identity.addr() == update.id().addr() {
// We received an update that's about an identity that *could*
// have been ours but definitely isn't (the branch right above,
// where we check equality)
//
// This can happen naturally: an instance rejoins the cluster
- // while the cluster activelly talking about its previous identity
+ // while the cluster actively talking about its previous identity
// going down.
//
// Any non-Down state, however, is questionable: maybe there are
//
// NOTE If there are multiple nodes claiming to have the same
// identity, this will lead to a looping scenario where
- // node A delcares B down, then B changes identity and
+ // node A declares B down, then B changes identity and
// declares A down; nonstop
#[cfg(feature = "tracing")]
if update.is_active() {
tracing::trace!(
- ?self,
- ?update,
+ self = tracing::field::debug(&self),
+ update = tracing::field::debug(&update),
"update about identity with same prefix as ours, declaring it down"
);
}
///
/// This is the cleanest way to terminate a running Foca.
pub fn leave_cluster(&mut self, mut runtime: impl Runtime<T>) -> Result<()> {
+ let addr = Addr(self.identity().addr());
let data = self.serialize_member(Member::down(self.identity().clone()))?;
- self.updates.add_or_replace(
- ClusterUpdate {
- member_id: self.identity().clone(),
- data,
- },
- self.config.max_transmissions.get().into(),
- );
+ self.updates
+ .add_or_replace(addr, data, self.config.max_transmissions.get().into());
self.gossip(&mut runtime)?;
/// Register some data to be broadcast along with Foca messages.
///
/// Calls into this instance's `BroadcastHandler` and reacts accordingly.
- pub fn add_broadcast(&mut self, data: &[u8]) -> Result<()> {
- // NOTE: Receiving B::Broadcast instead of a byte slice would make it
- // look more convenient, however it gets in the way when
- // implementing more ergonomic interfaces (say: an async driver)
- // it forces everything to know the exact concrete type of
- // the broadcast. So... maybe revisit this decision later?
+ pub fn add_broadcast(&mut self, data: &[u8]) -> Result<bool> {
+ if data.is_empty() {
+ return Err(Error::MalformedPacket);
+ }
// Not considering the whole header
if data.len() > self.config.max_packet_size.get() {
return Err(Error::DataTooBig);
}
- self.handle_custom_broadcasts(data, None)
+ if let Some(key) = self
+ .broadcast_handler
+ .receive_item(data, None)
+ .map_err(anyhow::Error::msg)
+ .map_err(Error::CustomBroadcast)?
+ {
+ self.custom_broadcasts.add_or_replace(
+ key,
+ data.to_vec(),
+ self.config.max_transmissions.get().into(),
+ );
+ Ok(true)
+ } else {
+ Ok(false)
+ }
}
/// React to a previously scheduled timer event.
/// See [`Runtime::submit_after`].
pub fn handle_timer(&mut self, event: Timer<T>, mut runtime: impl Runtime<T>) -> Result<()> {
#[cfg(feature = "tracing")]
- let _span = tracing::trace_span!("handle_timer", ?event).entered();
+ let _span =
+ tracing::trace_span!("handle_timer", event = tracing::field::debug(&event)).entered();
match event {
Timer::SendIndirectProbe { probed_id, token } => {
// Changing identities in the middle of the probe cycle may
if !self.probe.is_probing(&probed_id) {
#[cfg(feature = "tracing")]
- tracing::trace!(?probed_id, "Member not being probed");
+ tracing::trace!(
+ probed_id = tracing::field::debug(&probed_id),
+ "Member not being probed"
+ );
return Ok(());
}
if self.probe.succeeded() {
// We received an Ack already, nothing else to do
#[cfg(feature = "tracing")]
- tracing::trace!(?probed_id, "Probe succeeded, no need for indirect cycle");
+ tracing::trace!(
+ probed_id = tracing::field::debug(&probed_id),
+ "Probe succeeded, no need for indirect cycle"
+ );
return Ok(());
}
self.config.num_indirect_probes.get(),
&mut self.member_buf,
&mut self.rng,
- |candidate| candidate != &probed_id && !candidate.has_same_prefix(&probed_id),
+ |candidate| candidate != &probed_id,
);
#[cfg(feature = "tracing")]
tracing::debug!(
- ?probed_id,
+ probed_id = tracing::field::debug(&probed_id),
"Member didn't respond to ping in time, starting indirect probe cycle"
);
member.incarnation() == incarnation
})
{
- self.handle_apply_summary(&summary, as_down, &mut runtime)?;
+ self.handle_apply_summary(summary, as_down, &mut runtime)?;
// Member went down we might need to adjust our internal state
self.adjust_connection_state(&mut runtime);
)]
if let Some(_removed) = self.members.remove_if_down(&down) {
#[cfg(feature = "tracing")]
- tracing::trace!(?down, "Member removed");
+ tracing::trace!(down = tracing::field::debug(&down), "Member removed");
}
Ok(())
self.updates.len()
}
- /// Repports the current length of the custom broadcast queue.
+ /// Reports the current length of the custom broadcast queue.
///
/// Custom broadcasts are transmitted [`Config::max_transmissions`]
/// times at most or until they get invalidated by another custom
/// Presently, attempting to change [`Config::probe_period`] or
/// [`Config::probe_rtt`] results in [`Error::InvalidConfig`]; For
/// such cases it's recommended to recreate your Foca instance. When
- /// an error occurrs, every configuration parameter remains
+ /// an error occurs, every configuration parameter remains
/// unchanged.
pub fn set_config(&mut self, config: Config) -> Result<()> {
if self.config.probe_period != config.probe_period
Err(Error::InvalidConfig)
} else {
#[cfg(feature = "tracing")]
- tracing::trace!(?config, "Configuration changed");
+ tracing::trace!(
+ config = tracing::field::debug(&config),
+ "Configuration changed"
+ );
self.config = config;
Ok(())
#[cfg(feature = "tracing")]
span.record("header", tracing::field::debug(&header));
- if header.src == self.identity {
+ // Since one can implement PartialEq and Identity however
+ // they like, there's no guarantee that if addresses are
+ // different, so are identities. So we check both
+ if header.src == self.identity || header.src.addr() == self.identity.addr() {
return Err(Error::DataFromOurselves);
}
// untrustworthy from their perspective
// So we handle TurnUndead here, otherwise the nodes will be
// spamming each other with this message until enough time passes
- // that foca forgets the down memer (`Config::remove_down_after`)
+ // that foca forgets the down member (`Config::remove_down_after`)
if message == Message::TurnUndead {
self.handle_self_update(Incarnation::default(), State::Down, &mut runtime)?;
}
#[cfg_attr(not(feature = "tracing"), allow(clippy::if_same_then_else))]
if self.probe.receive_ack(&src, probe_number) {
#[cfg(feature = "tracing")]
- tracing::debug!(probed_id=?src, "Probe success");
+ tracing::debug!(probed_id = tracing::field::debug(&src), "Probe success");
} else {
// May be triggered by a member that slows down (say, you ^Z
- // the proccess and `fg` back after a while).
+ // the process and `fg` back after a while).
// Might be interesting to keep an eye on.
#[cfg(feature = "tracing")]
tracing::trace!(
if self.probe.receive_indirect_ack(&src, probe_number) {
#[cfg(feature = "tracing")]
tracing::debug!(
- probed_id = ?self.probe.target(),
+ probed_id = tracing::field::debug(self.probe.target()),
"Indirect probe success"
);
} else {
custom_broadcasts_result
}
- fn serialize_member(&mut self, member: Member<T>) -> Result<Bytes> {
- let mut buf = self.updates_buf.split();
+ fn serialize_member(&mut self, member: Member<T>) -> Result<Vec<u8>> {
+ let mut buf = Vec::new();
self.codec
.encode_member(&member, &mut buf)
.map_err(anyhow::Error::msg)
.map_err(Error::Encode)?;
- Ok(buf.freeze())
+ Ok(buf)
}
fn reset(&mut self) {
if !self.probe.validate() {
#[cfg(feature = "tracing")]
tracing::trace!(
- probed_id = ?self.probe.target(),
+ probed_id = tracing::field::debug(self.probe.target()),
"Recovering: Probe cycle didn't complete correctly"
);
// Probe has invalid state. We'll reset and submit another timer
// 3. The member is now Down, either by leaving voluntarily or by
// being declared down by another cluster member
//
- // 4. The member doesn't exist anymore, which shouldn't actually
- // happen...?
+ // 4. The member doesn't exist anymore. i.e. a newer identity with
+ // the same address has appeared in the cluster
let as_suspect = Member::new(failed.id().clone(), failed.incarnation(), State::Suspect);
if let Some(summary) = self
.members
.apply_existing_if(as_suspect.clone(), |_member| true)
{
- self.handle_apply_summary(&summary, as_suspect, &mut runtime)?;
+ let is_active_now = summary.is_active_now;
+ #[cfg_attr(not(feature = "tracing"), allow(unused_variables))]
+ let apply_successful = summary.apply_successful;
+ self.handle_apply_summary(summary, as_suspect, &mut runtime)?;
// Now we ensure we change the member to Down if it
// isn't already inactive
- if summary.is_active_now {
+ if is_active_now {
// We check for summary.apply_successful prior to logging
// because we may pick a member multiple times before the
// timer runs out.
// May lead to not logging at all if our knowledge of this
// member was already set as State::Suspect
#[cfg(feature = "tracing")]
- if summary.apply_successful {
+ if apply_successful {
tracing::debug!(
- member_id = ?failed.id(),
- timeout = ?self.config.suspect_to_down_after,
+ member_id = tracing::field::debug(failed.id()),
+ timeout = tracing::field::debug(self.config.suspect_to_down_after),
"Member failed probe, will declare it down if it doesn't react"
);
}
let probe_number = self.probe.start(member.clone());
#[cfg(feature = "tracing")]
- tracing::debug!(?member_id, "Probe start");
+ tracing::debug!(member_id = tracing::field::debug(&member_id), "Probe start");
self.send_message(member_id.clone(), Message::Ping(probe_number), &mut runtime)?;
fn apply_update(&mut self, update: Member<T>, runtime: impl Runtime<T>) -> Result<bool> {
debug_assert_ne!(&self.identity, update.id());
let summary = self.members.apply(update.clone(), &mut self.rng);
- self.handle_apply_summary(&summary, update, runtime)?;
+ let active = summary.is_active_now;
+ self.handle_apply_summary(summary, update, runtime)?;
- Ok(summary.is_active_now)
+ Ok(active)
}
fn handle_apply_summary(
&mut self,
- summary: &ApplySummary,
+ summary: ApplySummary<T>,
update: Member<T>,
mut runtime: impl Runtime<T>,
) -> Result<()> {
if summary.apply_successful {
#[cfg(feature = "tracing")]
- tracing::trace!(?update, ?summary, "Update applied");
+ tracing::trace!(
+ update = tracing::field::debug(&update),
+ summary = tracing::field::debug(&summary),
+ "Update applied"
+ );
// Cluster state changed, start broadcasting it
+ let addr = Addr(id.addr());
let data = self.serialize_member(update)?;
- self.updates.add_or_replace(
- ClusterUpdate {
- member_id: id.clone(),
- data,
- },
- self.config.max_transmissions.get().into(),
- );
+ self.updates
+ .add_or_replace(addr, data, self.config.max_transmissions.get().into());
// Down is a terminal state, so set up a handler for removing
// the member so that it may rejoin later
}
}
+ if let Some(old) = summary.replaced_id {
+ #[cfg(feature = "tracing")]
+ tracing::debug!(
+ previous_id = tracing::field::debug(&old),
+ member_id = tracing::field::debug(&id),
+ "Renamed"
+ );
+ runtime.notify(Notification::Rename(old, id.clone()));
+ }
+
if summary.changed_active_set {
if summary.is_active_now {
#[cfg(feature = "tracing")]
- tracing::debug!(member_id=?id, "Member up");
+ tracing::debug!(member_id = tracing::field::debug(&id), "Member up");
runtime.notify(Notification::MemberUp(id));
} else {
#[cfg(feature = "tracing")]
- tracing::debug!(member_id=?id, "Member down");
+ tracing::debug!(member_id = tracing::field::debug(&id), "Member down");
runtime.notify(Notification::MemberDown(id));
}
}
Ok(())
}
- fn handle_custom_broadcasts(&mut self, mut data: impl Buf, sender: Option<&T>) -> Result<()> {
- #[cfg(feature = "tracing")]
- if data.has_remaining() {
- tracing::trace!(len = data.remaining(), "handle_custom_broadcasts");
+ fn handle_custom_broadcasts(&mut self, mut data: &[u8], sender: Option<&T>) -> Result<()> {
+ if !data.is_empty() && data.len() < 3 {
+ return Err(Error::MalformedPacket);
}
- while data.has_remaining() {
- if let Some(broadcast) = self
+
+ while data.remaining() > 2 {
+ let pkt_len = data.get_u16() as usize;
+ if pkt_len == 0 || data.len() < pkt_len {
+ return Err(Error::MalformedPacket);
+ }
+ let pkt = &data[..pkt_len];
+ if let Some(key) = self
.broadcast_handler
- .receive_item(&mut data, sender)
+ .receive_item(pkt, sender)
.map_err(anyhow::Error::msg)
.map_err(Error::CustomBroadcast)?
{
#[cfg(feature = "tracing")]
- tracing::trace!("received broadcast item");
+ tracing::trace!(len = pkt_len, "received broadcast item");
- self.custom_broadcasts
- .add_or_replace(broadcast, self.config.max_transmissions.get().into());
+ self.custom_broadcasts.add_or_replace(
+ key,
+ pkt.to_vec(),
+ self.config.max_transmissions.get().into(),
+ );
}
+ data.advance(pkt_len);
}
- Ok(())
+ if data.has_remaining() {
+ Err(Error::MalformedPacket)
+ } else {
+ Ok(())
+ }
}
fn become_disconnected(&mut self, mut runtime: impl Runtime<T>) {
#[cfg(feature = "tracing")]
let span = tracing::trace_span!(
"send_message",
- ?header,
+ header = tracing::field::debug(&header),
num_updates = tracing::field::Empty,
num_broadcasts = tracing::field::Empty,
len = tracing::field::Empty,
)
.entered();
- // XXX this looks very backwards. it's done as such to be able to
- // reuse the buffer without having to do significant changes
- // to the Codec trait or the existing code. With some effort,
- // send_buf could simply be a Vec<u8>
- // XXX We split_off() here and by the end we unsplit().
+ // XXX We take() here and by the end we put it back.
// This must be done for every return point in send_message
self.send_buf.clear();
- let mut buf = self
- .send_buf
- .split_off(0)
- .limit(self.config.max_packet_size.get());
+ let mut buf = mem::take(&mut self.send_buf).limit(self.config.max_packet_size.get());
debug_assert_eq!(
buf.get_ref().capacity(),
self.config.max_packet_size.get(),
.map_err(anyhow::Error::msg)
.map_err(Error::Encode)
{
- self.send_buf.unsplit(buf.into_inner());
+ self.send_buf = buf.into_inner();
return Err(err);
}
// Fill the remaining space in the buffer with custom
// broadcasts, if any
#[cfg_attr(not(feature = "tracing"), allow(unused_variables))]
- let num_broadcasts = self.custom_broadcasts.fill(&mut buf, usize::MAX);
+ let num_broadcasts = self
+ .custom_broadcasts
+ .fill_with_len_prefix(&mut buf, usize::MAX);
#[cfg(feature = "tracing")]
span.record("num_broadcasts", num_broadcasts);
}
runtime.send_to(dst, &data);
// absorb the buf into send_buf so we can reuse its capacity
- self.send_buf.unsplit(data);
+ self.send_buf = data;
Ok(())
}
fn accept_payload(&self, header: &Header<T>) -> bool {
- // Only accept payloads addessed to us
+ // Only accept payloads addressed to us
header.dst == self.identity
// Unless it's an Announce message
|| (header.message == Message::Announce
// Then we accept it if DST is one of our _possible_
// identities
- && self.identity.has_same_prefix(&header.dst))
+ && self.identity.addr() == header.dst.addr())
}
fn handle_self_update(
Ordering::Greater => {
#[cfg(feature = "tracing")]
tracing::trace!(
- ?self.incarnation,
+ incarnation = self.incarnation,
suspected = incarnation,
"Received suspicion about old incarnation",
);
Ordering::Less => {
#[cfg(feature = "tracing")]
tracing::debug!(
- ?self.incarnation,
+ incarnation = self.incarnation,
suspected = incarnation,
"Suspicion on incarnation higher than current",
);
#[cfg(feature = "tracing")]
tracing::debug!("Rejoin failure: Identity::renew() returned same id",);
Ok(false)
+ } else if !new_identity.win_addr_conflict(&self.identity) {
+ #[cfg(feature = "tracing")]
+ tracing::warn!(
+ new = tracing::field::debug(&new_identity),
+ old = tracing::field::debug(&self.identity),
+ "Rejoin failure: New identity doesn't win the conflict with the old one",
+ );
+ Ok(false)
} else {
self.change_identity(new_identity.clone(), &mut runtime)?;
impl std::error::Error for BroadcastsDisabledError {}
impl<T> BroadcastHandler<T> for NoCustomBroadcast {
- type Broadcast = &'static [u8];
+ type Key = &'static [u8];
type Error = BroadcastsDisabledError;
fn receive_item(
&mut self,
- _data: impl Buf,
+ _data: &[u8],
_sender: Option<&T>,
- ) -> core::result::Result<Option<Self::Broadcast>, Self::Error> {
+ ) -> core::result::Result<Option<Self::Key>, Self::Error> {
Err(BroadcastsDisabledError)
}
}
-struct ClusterUpdate<T> {
- member_id: T,
- data: Bytes,
-}
+struct Addr<T>(T);
-impl<T: PartialEq> Invalidates for ClusterUpdate<T> {
+impl<T: PartialEq> Invalidates for Addr<T> {
// State is managed externally (via Members), so invalidation
// is a trivial replace-if-same-key
fn invalidates(&self, other: &Self) -> bool {
- self.member_id == other.member_id
- }
-}
-
-impl<T> AsRef<[u8]> for ClusterUpdate<T> {
- fn as_ref(&self) -> &[u8] {
- self.data.as_ref()
+ self.0 == other.0
}
}
time::Duration,
};
- use bytes::{Buf, BufMut};
+ use bytes::{Buf, BufMut, Bytes};
use rand::{rngs::SmallRng, SeedableRng};
- use crate::testing::{BadCodec, InMemoryRuntime, ID};
+ use crate::testing::{BadCodec, ID};
fn rng() -> SmallRng {
SmallRng::seed_from_u64(0xF0CA)
fn encode(src: (Header<ID>, Vec<Member<ID>>)) -> Bytes {
let (header, updates) = src;
let mut codec = codec();
- let mut buf = BytesMut::new();
+ let mut buf = bytes::BytesMut::new();
codec
.encode_header(&header, &mut buf)
- .expect("MAYBE FIXME?");
+ .expect("BadCodec shouldn't fail");
if !updates.is_empty() {
buf.put_u16(u16::try_from(updates.len()).unwrap());
for member in updates.iter() {
- codec.encode_member(member, &mut buf).expect("MAYBE FIXME?");
+ codec
+ .encode_member(member, &mut buf)
+ .expect("BadCodec shouldn't fail");
}
}
assert_eq!(Err(Error::NotUndead), foca.reuse_down_identity());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
assert_eq!(
Err(Error::SameIdentity),
foca.change_identity(identity, &mut runtime)
fn cant_probe_when_not_connected() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let runtime = InMemoryRuntime::new();
+ let runtime = AccumulatingRuntime::new();
let res = foca.handle_timer(Timer::ProbeRandomMember(foca.timer_token()), runtime);
assert_eq!(Err(Error::NotConnected), res);
let mut foca_one = Foca::new(ID::new(1), config(), rng(), codec());
let mut foca_two = Foca::new(ID::new(2), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// Here foca_one will send an announce packet to foca_two
foca_one
let one = ID::new(1);
let two = ID::new(2);
let mut foca_one = Foca::new(one, config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
assert_eq!(Ok(()), foca_one.announce(two, &mut runtime));
let data = runtime
(header, updates)
};
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
assert_eq!(Ok(()), foca.handle_data(&encode(data), &mut runtime));
expect_notification!(runtime, Notification::<ID>::Active);
#[test]
fn new_down_member_triggers_remove_down_scheduling() -> Result<()> {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// ID::new(2) is new and down
foca.apply(Member::down(ID::new(2)), &mut runtime)?;
#[test]
fn notification_triggers() -> Result<()> {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// Brand new member. The first in our set, so we should
// also be notified about going active
// This test verifies that not submitting the second
// timer event causes an error.
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// Add an active member so that the probing can start
foca.apply(Member::alive(ID::new(2)), &mut runtime)?;
// This test verifies that if someone ask us to talk to
// ourselves via this mechanism, an error occurrs.
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
let probe_number = foca.probe().probe_number();
let indirect_messages = vec![
#[test]
fn cant_receive_data_from_same_identity() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
assert_eq!(
Err(Error::DataFromOurselves),
}
#[test]
+ fn cant_receive_data_from_same_addr() {
+ let id = ID::new(1);
+ let mut foca = Foca::new(id, config(), rng(), codec());
+ let mut runtime = AccumulatingRuntime::new();
+
+ // Just the address is the same now
+ assert_eq!(
+ Err(Error::DataFromOurselves),
+ foca.handle_data(
+ &encode((
+ Header {
+ src: id.bump(),
+ src_incarnation: 0,
+ dst: ID::new(1),
+ message: Message::Announce
+ },
+ Vec::new()
+ )),
+ &mut runtime
+ )
+ );
+ }
+
+ #[test]
fn cant_receive_announce_with_extra_data() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
assert_eq!(
Err(Error::MalformedPacket),
let target_id = ID::new_with_bump(1, 255);
let codec = codec();
let mut foca = Foca::new(target_id, config(), rng(), codec);
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// Our goal is getting `src` to join `target_id`'s cluster.
let src_id = ID::new(2);
// passing the "has same prefix" check to verify the join
// doesn't happen
let wrong_dst = ID::new(3);
- assert!(!target_id.has_same_prefix(&wrong_dst));
+ assert_ne!(target_id.addr(), wrong_dst.addr());
let data = (
Header {
src: src_id,
// prefix check
let dst = ID::new_with_bump(1, 42);
assert_ne!(target_id, dst);
- assert!(target_id.has_same_prefix(&dst));
+ assert_eq!(target_id.addr(), dst.addr());
let data = (
Header {
src: src_id,
#[test]
fn suspicion_refutal() -> Result<()> {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
let original_incarnation = foca.incarnation();
#[test]
fn incarnation_does_not_increase_for_stale_suspicion() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
let suspected_incarnation = 10;
let update = Member::new(ID::new(1), suspected_incarnation, State::Suspect);
#[test]
fn gossips_when_being_suspected() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// just one peer in the cluster, for simplificy's sake
assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(2)), &mut runtime));
#[test]
fn change_identity_gossips_immediately() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// Introduce a new member so we have someone to gossip to
assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(2)), &mut runtime));
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
let orig_timer_token = foca.timer_token();
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
assert_eq!(Ok(()), foca.change_identity(ID::new(2), &mut runtime));
assert_ne!(orig_timer_token, foca.timer_token());
fn renew_during_probe_shouldnt_cause_errors() {
let id = ID::new(1).rejoinable();
let mut foca = Foca::new(id, config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
let updates = [
Member::alive(ID::new(2)),
Timer<ID>,
) {
let mut foca = Foca::new(ID::new(1), config.clone(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
assert!(num_members > 0);
// Assume some members exist
// A foca is probing
let (mut foca, _probed, _send_indirect_probe) = craft_probing_foca(2, config());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// Clippy gets it wrong here: can't use just the plain iterator
// otherwise foca remains borrowed
#[test]
fn probe_ping_ack_cycle() {
let (mut foca, probed, send_indirect_probe) = craft_probing_foca(5, config());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// Now if probed replies before the timer fires, the probe
// should complete and the indirect probe cycle shouldn't
#[test]
fn probe_cycle_requires_correct_probe_number() {
let (mut foca, probed, send_indirect_probe) = craft_probing_foca(5, config());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
let incorrect_probe_number = foca.probe().probe_number() + 1;
assert_ne!(incorrect_probe_number, foca.probe().probe_number());
// we don't send more requests than the configured value.
let (mut foca, probed, send_indirect_probe) =
craft_probing_foca((num_indirect_probes + 2) as u8, config());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// `probed` did NOT reply with an Ack before the timer
assert_eq!(Ok(()), foca.handle_timer(send_indirect_probe, &mut runtime));
#[test]
fn probe_receiving_ping_replies_with_ack() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
let probe_number = foca.probe().probe_number();
let data = (
#[test]
fn probe_receiving_ping_req_sends_indirect_ping() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
let probe_number = foca.probe().probe_number();
let data = (
#[test]
fn probe_receiving_indirect_ping_sends_indirect_ack() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
let probe_number = foca.probe().probe_number();
let data = (
#[test]
fn probe_receiving_indirect_ack_sends_forwarded_ack() {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
let probe_number = foca.probe().probe_number();
let data = (
for member in members.iter().rev() {
let mut foca = Foca::new(*member.id(), config(), rng(), codec());
- foca.apply_many(members.iter().cloned(), InMemoryRuntime::new())?;
+ foca.apply_many(members.iter().cloned(), AccumulatingRuntime::new())?;
herd.push(foca);
}
let three = *foca_three.identity();
// foca_one starts suspecting two and three
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
foca_one.apply(Member::suspect(two), &mut runtime)?;
foca_one.apply(Member::suspect(three), &mut runtime)?;
assert_eq!(2, foca_one.num_members());
#[test]
fn leave_cluster_gossips_about_our_death() -> Result<()> {
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
foca.apply(Member::alive(ID::new(2)), &mut runtime)?;
};
let mut foca = Foca::new(ID::new(1), config, rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// And only have one
foca.apply(Member::alive(ID::new(2)), &mut runtime)?;
#[test]
fn auto_rejoin_behaviour() {
let mut foca = Foca::new(ID::new(1).rejoinable(), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
let updates = [
// New known members
assert_eq!(
Err(Error::DataTooBig),
- foca.handle_data(&large_data[..], InMemoryRuntime::new())
+ foca.handle_data(&large_data[..], AccumulatingRuntime::new())
);
assert_eq!(Err(Error::DataTooBig), foca.add_broadcast(&large_data[..]));
assert_eq!(
Ok(()),
- foca.handle_data(valid_data.as_ref(), InMemoryRuntime::new()),
+ foca.handle_data(valid_data.as_ref(), AccumulatingRuntime::new()),
"valid_data should be valid :-)"
);
bad_data.push(0);
assert_eq!(
- Err(Error::CustomBroadcast(anyhow::Error::msg(
- BroadcastsDisabledError
- ))),
- foca.handle_data(bad_data.as_ref(), InMemoryRuntime::new()),
+ Err(Error::MalformedPacket),
+ foca.handle_data(bad_data.as_ref(), AccumulatingRuntime::new()),
);
}
struct Handler(BTreeMap<u64, u16>);
impl BroadcastHandler<ID> for Handler {
- type Broadcast = VersionedKey;
+ type Key = VersionedKey;
type Error = &'static str;
fn receive_item(
&mut self,
- data: impl Buf,
+ data: &[u8],
_sender: Option<&ID>,
- ) -> core::result::Result<Option<Self::Broadcast>, Self::Error> {
+ ) -> core::result::Result<Option<Self::Key>, Self::Error> {
let decoded = VersionedKey::from_bytes(data)?;
let is_new_information = self
);
assert!(
- foca.add_broadcast(b"hue").is_err(),
+ foca.add_broadcast(b"huehue").is_err(),
"Adding garbage shouldn't work"
);
assert_eq!(
- Ok(()),
+ Ok(true),
foca.add_broadcast(VersionedKey::new(420, 0).as_ref()),
);
);
assert_eq!(
- Ok(()),
+ Ok(true),
foca.add_broadcast(VersionedKey::new(420, 1).as_ref()),
);
assert_eq!(
1,
foca.custom_broadcast_backlog(),
- "But receiving a new version should simply replace the existing one"
+ "Receiving a new version should simply replace the existing one"
);
+
+ assert_eq!(
+ Ok(false),
+ foca.add_broadcast(VersionedKey::new(420, 1).as_ref()),
+ "Adding stale/known broadcast should signal that nothing was added"
+ );
+
+ assert_eq!(1, foca.custom_broadcast_backlog(),);
// Let's add one more custom broadcast because testing with N=1
// is pretty lousy :-)
assert_eq!(
- Ok(()),
+ Ok(true),
foca.add_broadcast(VersionedKey::new(710, 1).as_ref()),
);
+
+ assert_eq!(2, foca.custom_broadcast_backlog(),);
// Now let's see if the custom broadcasts actually get
// disseminated.
);
// Teach the original foca about this new `other_foca`
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
assert_eq!(Ok(()), foca.apply(Member::alive(other_id), &mut runtime));
// Now foca will talk to other_foca. The encoded data
// Here we get a foca in the middle of a probe cycle. The correct
// sequencing should submit `_send_indirect_probe`
let (mut foca, _probed, _send_indirect_probe) = craft_probing_foca(2, config());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
let old_probeno = foca.probe().probe_number();
// ... but we'll manually craft a ProbeRandomMember event instead
};
let (mut foca, probed, send_indirect_probe) = craft_probing_foca(2, config);
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// `probed` did NOT reply with an Ack before the timer
assert_eq!(Ok(()), foca.handle_timer(send_indirect_probe, &mut runtime));
c.notify_down_members = true;
c
};
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// We have a simple foca instance
let mut foca = Foca::new(ID::new(1), config, rng(), codec());
config_setter(&mut config, params);
let mut foca = Foca::new(ID::new(1), config, rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// When it becomes active (i.e.: has at least one active member)
assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(2)), &mut runtime));
// So that they are not the same
assert_ne!(id, renewed);
// But have the same prefix
- assert!(id.has_same_prefix(&renewed));
+ assert_eq!(id.addr(), renewed.addr());
// If we have an instance running with the renewed
// id as its identity
let mut foca = Foca::new(renewed, config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// Learning anything about its previous identity
assert_eq!(
// announces to another and then verify that we can handle
// the reply with no errors
let mut foca_one = Foca::new(ID::new(1), config.clone(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// let's assume that foca_one knows about another member, ID=3'
// so that the feed reply contains at least one member
fn feed_fits_as_many_as_it_can() {
// We prepare a foca cluster with a bunch of live members
let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
let cluster = (2u8..=u8::MAX)
.map(|id| Member::alive(ID::new(id)))
.collect::<Vec<_>>();
c
};
let mut foca = Foca::new(id_one, config, rng(), codec());
- let mut runtime = InMemoryRuntime::new();
+ let mut runtime = AccumulatingRuntime::new();
// that thinks ID=2 is down;
assert_eq!(Ok(()), foca.apply(Member::down(ID::new(2)), &mut runtime));
id_one, header.src,
"message should be crafted with the new/renewed id"
);
+ }
+
+ #[test]
+ fn handles_member_addr_conflict() {
+ let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
+ let mut runtime = AccumulatingRuntime::new();
+
+ // Given a known member ID=2,0
+ let original = ID::new_with_bump(2, 0);
+ assert_eq!(Ok(()), foca.apply(Member::alive(original), &mut runtime));
+ assert_eq!(1, foca.num_members());
+
+ // When foca learns about a new member with same address ID=2,1
+ // that wins its conflict resolution round
+ let conflicted = ID::new_with_bump(2, 1);
+ assert_eq!(original.addr(), conflicted.addr());
+ assert!(conflicted.win_addr_conflict(&original));
+ assert_eq!(Ok(()), foca.apply(Member::alive(conflicted), &mut runtime));
+
+ // It should replace the original state
+ assert_eq!(1, foca.num_members());
+ assert_eq!(
+ foca.iter_members().next().unwrap(),
+ &Member::alive(conflicted)
+ );
+
+ // Conversely, if it learns about a member with same address
+ // that loses the conflict
+ assert!(!original.win_addr_conflict(&conflicted));
+ // nothing changes
+ for m in [
+ Member::alive(original),
+ Member::suspect(original),
+ Member::down(original),
+ ] {
+ assert_eq!(Ok(()), foca.apply(m, &mut runtime));
+ assert_eq!(1, foca.num_members());
+ assert_eq!(
+ foca.iter_members().next().unwrap(),
+ &Member::alive(conflicted)
+ );
+ }
+ }
+
+ #[test]
+ fn does_not_mark_renewed_identity_as_down() {
+ let (mut foca, probed, send_indirect_probe) = craft_probing_foca(1, config());
+ let mut runtime = AccumulatingRuntime::new();
+ assert_eq!(1, foca.num_members());
+
+ // `probed` did NOT reply with an Ack before the timer
+ assert_eq!(Ok(()), foca.handle_timer(send_indirect_probe, &mut runtime));
+
+ // meanwhile, the member rejoined (same addr, but not the same id)
+ let bumped = probed.bump();
+ assert_ne!(probed, bumped);
+ assert_eq!(probed.addr(), bumped.addr());
+ assert_eq!(Ok(()), foca.apply(Member::alive(bumped), &mut runtime));
+ assert_eq!(1, foca.num_members());
+
+ runtime.clear();
+ // So by the time the ChangeSuspectToDown timer fires
+ assert_eq!(
+ Ok(()),
+ foca.handle_timer(
+ Timer::ChangeSuspectToDown {
+ member_id: probed,
+ incarnation: Incarnation::default(),
+ token: foca.timer_token()
+ },
+ &mut runtime
+ )
+ );
+
+ // the member is NOT marked as down
+ assert_eq!(1, foca.num_members());
+ assert_eq!(foca.iter_members().next().unwrap(), &Member::alive(bumped));
+ }
+
+ #[test]
+ fn notifies_on_conflict_resolution() {
+ let mut foca = Foca::new(ID::new(1), config(), rng(), codec());
+ let mut runtime = AccumulatingRuntime::new();
+
+ // Given a known member
+ let member = ID::new(2).rejoinable();
+ assert_eq!(Ok(()), foca.apply(Member::alive(member), &mut runtime));
+ assert_eq!(1, foca.num_members());
+
+ runtime.clear();
+
+ // Learning about its renewd id
+ let renewed = member.renew().expect("bumped");
+ assert_eq!(Ok(()), foca.apply(Member::alive(renewed), &mut runtime));
+ assert_eq!(1, foca.num_members());
+ // Should notify the runtime about the change
+ expect_notification!(runtime, Notification::Rename(member, renewed));
+ // But no MemberUp notification should be fired, since
+ // previous addr was already active
+ reject_notification!(runtime, Notification::MemberUp(member));
+ reject_notification!(runtime, Notification::MemberUp(renewed));
+
+ runtime.clear();
+
+ // But if the renewed id is not active
+ let inactive = renewed.renew().expect("bumped");
+ assert_eq!(Ok(()), foca.apply(Member::down(inactive), &mut runtime));
+ assert_eq!(0, foca.num_members());
+ // We get notified of the rename
+ expect_notification!(runtime, Notification::Rename(renewed, inactive));
+ // AND about the member going down with its new identity
+ expect_notification!(runtime, Notification::MemberDown(inactive));
+ // but nothing about the (now overriden, forgotten) previous one
+ reject_notification!(runtime, Notification::MemberDown(renewed));
+
+ runtime.clear();
+
+ // The inverse behaves similarly:
+ // Learning about a renewed active
+ let active = inactive.renew().expect("bumped");
+ assert_eq!(Ok(()), foca.apply(Member::suspect(active), &mut runtime));
+ assert_eq!(1, foca.num_members());
+ // Should notify about the rename
+ expect_notification!(runtime, Notification::Rename(inactive, active));
+ // And about the member being active
+ expect_notification!(runtime, Notification::MemberUp(active));
+
+ runtime.clear();
+ // And if it learns about the previous ids again, regardless
+ // of their state, nothing happens
+ for m in [member, renewed, inactive] {
+ assert_eq!(Ok(()), foca.apply(Member::alive(m), &mut runtime));
+ assert_eq!(1, foca.num_members());
+ assert!(runtime.is_empty());
+
+ assert_eq!(Ok(()), foca.apply(Member::down(m), &mut runtime));
+ assert_eq!(1, foca.num_members());
+ assert!(runtime.is_empty());
+ }
}
}
Modified src/member.rs
}
}
-impl<T: PartialEq + Clone> Members<T> {
+impl<T> Members<T>
+where
+ T: PartialEq + Clone + crate::Identity,
+{
pub(crate) fn num_active(&self) -> usize {
self.num_active
}
pub(crate) fn apply_existing_if<F: Fn(&Member<T>) -> bool>(
&mut self,
- update: Member<T>,
+ mut update: Member<T>,
condition: F,
- ) -> Option<ApplySummary> {
+ ) -> Option<ApplySummary<T>> {
if let Some(known_member) = self
.inner
.iter_mut()
- .find(|member| &member.id == update.id())
+ .find(|member| member.id.addr() == update.id().addr())
{
+ // if there's a conflict and the update wins, the member
+ // state is fully replaced
+ let mut force_apply = false;
+ if known_member.id != update.id {
+ // If the update wins the conflict, the full member
+ // state is replaced (it's essentially a rejoin)
+ if known_member.id.win_addr_conflict(&update.id) {
+ // update lost conflict, it's junk
+ return Some(ApplySummary {
+ is_active_now: known_member.is_active(),
+ apply_successful: false,
+ changed_active_set: false,
+ replaced_id: None,
+ });
+ }
+ force_apply = true;
+ }
+
if !condition(known_member) {
return Some(ApplySummary {
is_active_now: known_member.is_active(),
apply_successful: false,
changed_active_set: false,
+ replaced_id: None,
});
}
let was_active = known_member.is_active();
- let apply_successful = known_member.change_state(update.incarnation(), update.state());
+ let mut replaced_id = None;
+ let apply_successful = if force_apply {
+ core::mem::swap(&mut known_member.id, &mut update.id);
+ replaced_id = Some(update.id);
+ known_member.state = update.state;
+ known_member.incarnation = update.incarnation;
+ true
+ } else {
+ known_member.change_state(update.incarnation, update.state)
+ };
let is_active_now = known_member.is_active();
let changed_active_set = is_active_now != was_active;
is_active_now,
apply_successful,
changed_active_set,
+ replaced_id,
})
} else {
None
}
}
- pub(crate) fn apply(&mut self, update: Member<T>, mut rng: impl Rng) -> ApplySummary {
+ pub(crate) fn apply(&mut self, update: Member<T>, mut rng: impl Rng) -> ApplySummary<T> {
self.apply_existing_if(update.clone(), |_member| true)
.unwrap_or_else(|| {
// Unknown member, we'll register it
apply_successful: true,
// Registering a new active member changes the active set
changed_active_set: is_active_now,
+ replaced_id: None,
}
})
}
#[derive(Debug, Clone, PartialEq)]
#[must_use]
-pub(crate) struct ApplySummary {
+pub(crate) struct ApplySummary<T> {
pub(crate) is_active_now: bool,
pub(crate) apply_successful: bool,
pub(crate) changed_active_set: bool,
+ pub(crate) replaced_id: Option<T>,
}
#[cfg(test)]
mod tests {
+
+ use crate::Identity;
use super::*;
use alloc::vec;
use rand::{rngs::SmallRng, SeedableRng};
+ #[derive(Clone, Debug, PartialEq, Eq, Copy, PartialOrd, Ord)]
+ struct Id(&'static str);
+ impl crate::Identity for Id {
+ type Addr = &'static str;
+
+ fn renew(&self) -> Option<Self> {
+ None
+ }
+
+ fn addr(&self) -> Self::Addr {
+ self.0
+ }
+
+ fn win_addr_conflict(&self, _adversary: &Self) -> bool {
+ panic!("addr is self, there'll never be a conflict");
+ }
+ }
+
use State::*;
#[test]
fn alive_transitions() {
- let mut member = Member::new("a", 0, Alive);
+ let mut member = Member::new(Id("a"), 0, Alive);
// Alive => Alive
assert!(
);
assert_eq!(Suspect, member.state);
- member = Member::new("b", 0, Alive);
+ member = Member::new(Id("b"), 0, Alive);
assert!(
member.change_state(member.incarnation + 1, Suspect),
"transition to suspect with higher incarnation"
#[test]
fn suspect_transitions() {
- let mut member = Member::new("a", 0, Suspect);
+ let mut member = Member::new(Id("a"), 0, Suspect);
// Suspect => Suspect
assert!(
#[test]
fn next_walks_sequentially_then_shuffles() {
- let ordered_ids = vec![1, 2, 3, 4, 5];
+ let ordered_ids = vec![Id("1"), Id("2"), Id("3"), Id("4"), Id("5")];
let mut members = Members::new(ordered_ids.iter().cloned().map(Member::alive).collect());
let mut rng = SmallRng::seed_from_u64(0xF0CA);
assert_eq!(
None,
- members.apply_existing_if(Member::alive(1), |_member| true),
+ members.apply_existing_if(Member::alive(Id("1")), |_member| true),
"Only yield None only if member is not found"
);
let mut rng = SmallRng::seed_from_u64(0xF0CA);
- let _ = members.apply(Member::alive(1), &mut rng);
+ let _ = members.apply(Member::alive(Id("1")), &mut rng);
assert_ne!(
None,
- members.apply_existing_if(Member::alive(1), |_member| true),
+ members.apply_existing_if(Member::alive(Id("1")), |_member| true),
"Must yield Some() if existing, regardless of condition"
);
assert_ne!(
None,
- members.apply_existing_if(Member::alive(1), |_member| false),
+ members.apply_existing_if(Member::alive(Id("1")), |_member| false),
"Must yield Some() if existing, regardless of condition"
);
}
let mut rng = SmallRng::seed_from_u64(0xF0CA);
// New and active member
- let res = members.apply(Member::suspect(1), &mut rng);
+ let res = members.apply(Member::suspect(Id("1")), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: true,
apply_successful: true,
- changed_active_set: true
+ changed_active_set: true,
+ replaced_id: None,
},
res,
);
// Failed attempt to change member id=1 to alive
// (since it's already suspect with same incarnation)
- let res = members.apply(Member::alive(1), &mut rng);
+ let res = members.apply(Member::alive(Id("1")), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: true,
apply_successful: false,
- changed_active_set: false
+ changed_active_set: false,
+ replaced_id: None,
},
res,
);
// Successful attempt at changing member id=1 to
// alive by using a higher incarnation
- let res = members.apply(Member::new(1, 1, State::Alive), &mut rng);
+ let res = members.apply(Member::new(Id("1"), 1, State::Alive), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: true,
apply_successful: true,
- changed_active_set: false
+ changed_active_set: false,
+ replaced_id: None,
},
res,
);
assert_eq!(1, members.len());
// Change existing member to down
- let res = members.apply(Member::down(1), &mut rng);
+ let res = members.apply(Member::down(Id("1")), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: false,
apply_successful: true,
- changed_active_set: true
+ changed_active_set: true,
+ replaced_id: None,
},
res,
);
assert_eq!(0, members.num_active());
// New and inactive member
- let res = members.apply(Member::down(2), &mut rng);
+ let res = members.apply(Member::down(Id("2")), &mut rng);
assert_eq!(
ApplySummary {
is_active_now: false,
apply_successful: true,
- changed_active_set: false
+ changed_active_set: false,
+ replaced_id: None,
},
res,
);
assert_eq!(
None,
- members.remove_if_down(&1),
+ members.remove_if_down(&Id("1")),
"cant remove member that does not exist"
);
- let _ = members.apply(Member::alive(1), &mut rng);
+ let _ = members.apply(Member::alive(Id("1")), &mut rng);
assert_eq!(
None,
- members.remove_if_down(&1),
+ members.remove_if_down(&Id("1")),
"cant remove member that isnt down"
);
- let _ = members.apply(Member::down(1), &mut rng);
+ let _ = members.apply(Member::down(Id("1")), &mut rng);
assert_eq!(
- Some(Member::down(1)),
- members.remove_if_down(&1),
+ Some(Member::down(Id("1"))),
+ members.remove_if_down(&Id("1")),
"must return the removed member"
);
}
"next() should yield None when there are no members"
);
- let _ = members.apply(Member::down(-1), &mut rng);
- let _ = members.apply(Member::down(-2), &mut rng);
- let _ = members.apply(Member::down(-3), &mut rng);
+ let _ = members.apply(Member::down(Id("-1")), &mut rng);
+ let _ = members.apply(Member::down(Id("-2")), &mut rng);
+ let _ = members.apply(Member::down(Id("-3")), &mut rng);
assert_eq!(
None,
"next() should yield None when there are no active members"
);
- let _ = members.apply(Member::alive(1), &mut rng);
+ let _ = members.apply(Member::alive(Id("1")), &mut rng);
for _i in 0..10 {
assert_eq!(
- Some(1),
+ Some(Id("1")),
members.next(&mut rng).map(|m| m.id),
"next() should yield the same member if its the only active"
);
fn choose_active_members_behaviour() {
let members = Members::new(Vec::from([
// 5 active members
- Member::alive(1),
- Member::alive(2),
- Member::alive(3),
- Member::suspect(4),
- Member::suspect(5),
+ Member::alive(Id("1")),
+ Member::alive(Id("2")),
+ Member::alive(Id("3")),
+ Member::suspect(Id("4")),
+ Member::suspect(Id("5")),
// 2 down
- Member::down(6),
- Member::down(7),
+ Member::down(Id("6")),
+ Member::down(Id("7")),
]));
assert_eq!(7, members.len());
assert_eq!(2, out.len(), "Respects `wanted` even if we have more");
out.clear();
- members.choose_active_members(usize::MAX, &mut out, &mut rng, |&member_id| member_id > 4);
- assert_eq!(vec![Member::suspect(5)], out);
+ members.choose_active_members(usize::MAX, &mut out, &mut rng, |&member_id| {
+ member_id.0.parse::<usize>().expect("number") > 4
+ });
+ assert_eq!(vec![Member::suspect(Id("5"))], out);
+ }
+
+ #[test]
+ fn sets_replaced_id_on_addr_conflict() {
+ let id = crate::testing::ID::new(1).rejoinable();
+ let mut members = Members::new(Vec::from([
+ // 5 active members
+ Member::alive(id),
+ ]));
+
+ let renewed = id.renew().unwrap();
+ let summary = members
+ .apply_existing_if(Member::alive(renewed), |_| true)
+ .expect("member found");
+
+ assert!(summary.apply_successful);
+ assert_eq!(Some(id), summary.replaced_id);
+
+ let another = renewed.renew().unwrap();
+ let summary = members
+ .apply_existing_if(Member::alive(another), |_| false)
+ .expect("member found");
+
+ assert!(!summary.apply_successful);
+ assert_eq!(
+ None, summary.replaced_id,
+ "must not apply if condition fails"
+ );
}
}
Modified src/probe.rs
use crate::{member::Member, ProbeNumber};
-// FIXME This whole thing is ugly AF :(
-
pub(crate) struct Probe<T> {
direct: Option<Member<T>>,
indirect: Vec<T>,
Modified src/runtime.rs
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+use alloc::collections::VecDeque;
use core::{cmp::Ordering, time::Duration};
+
+use bytes::{Bytes, BytesMut};
use crate::{Identity, Incarnation};
/// Implementations may react directly to it for a fully synchronous
/// behavior or accumulate-then-drain when dispatching via fancier
/// mechanisms like async.
-pub trait Runtime<T: Identity> {
+pub trait Runtime<T>
+where
+ T: Identity,
+{
/// Whenever something changes Foca's state significantly a
/// notification is emitted.
///
///
/// Can only happen if `MemberUp(T)` happened before.
MemberDown(T),
+
+ /// Foca has learned that there's a more recent identity with
+ /// the same address and chose to use it instead of the previous
+ /// one.
+ ///
+ /// So `Notification::Rename(A,B)` means that we knew about a member
+ /// `A` but now there's a `B` with the same `Identity::Addr` and
+ /// foca chose to keep it. i.e. `B.win_addr_conflict(A) == true`.
+ ///
+ /// This happens naturally when a member rejoins the cluster after
+ /// any event (maybe they were declared down and `Identity::renew`d
+ /// themselves, maybe it's a restart/upgrade process)
+ ///
+ /// Example:
+ ///
+ /// If `A` was considered Down and `B` is Alive, you'll get
+ /// two notifications, in order:
+ //
+ /// 1. `Notification::Rename(A,B)`
+ /// 2. `Notification::MemberUp(B)`
+ ///
+ /// However, if there's no liveness change (both are active
+ /// or both are down), you'll only get the `Rename` notification
+ Rename(T, T),
/// Foca's current identity is known by at least one active member
/// of the cluster.
///
/// Similar in spirit to [`crate::ProbeNumber`].
pub type TimerToken = u8;
+
+/// A `Runtime` implementation that's good enough for simple use-cases.
+///
+/// It accumulates all events that happen during an interaction with
+/// `crate::Foca` and users must drain those and react accordingly.
+///
+/// Better runtimes would react directly to the events, intead of
+/// needlessly storing the events in a queue.
+///
+/// Users must drain the runtime immediately after interacting with
+/// foca. Example:
+///
+/// See it in use at `examples/foca_insecure_udp_agent.rs`
+pub struct AccumulatingRuntime<T> {
+ to_send: VecDeque<(T, Bytes)>,
+ to_schedule: VecDeque<(Duration, Timer<T>)>,
+ notifications: VecDeque<Notification<T>>,
+ buf: BytesMut,
+}
+
+impl<T> Default for AccumulatingRuntime<T> {
+ fn default() -> Self {
+ Self {
+ to_send: Default::default(),
+ to_schedule: Default::default(),
+ notifications: Default::default(),
+ buf: Default::default(),
+ }
+ }
+}
+
+impl<T: Identity> Runtime<T> for AccumulatingRuntime<T> {
+ fn notify(&mut self, notification: Notification<T>) {
+ self.notifications.push_back(notification);
+ }
+
+ fn send_to(&mut self, to: T, data: &[u8]) {
+ self.buf.extend_from_slice(data);
+ let packet = self.buf.split().freeze();
+ self.to_send.push_back((to, packet));
+ }
+
+ fn submit_after(&mut self, event: Timer<T>, after: Duration) {
+ // We could spawn+sleep here
+ self.to_schedule.push_back((after, event));
+ }
+}
+
+impl<T> AccumulatingRuntime<T> {
+ /// Create a new `AccumulatingRuntime`
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Yields data to be sent to a cluster member `T` in the
+ /// order they've happened.
+ ///
+ /// Users are expected to drain it until it yields `None`
+ /// after every interaction with `crate::Foca`
+ pub fn to_send(&mut self) -> Option<(T, Bytes)> {
+ self.to_send.pop_front()
+ }
+
+ /// Yields timer events and how far in the future they
+ /// must be given back to the foca instance that produced it
+ ///
+ /// Users are expected to drain it until it yields `None`
+ /// after every interaction with `crate::Foca`
+ pub fn to_schedule(&mut self) -> Option<(Duration, Timer<T>)> {
+ self.to_schedule.pop_front()
+ }
+
+ /// Yields event notifications in the order they've happened
+ ///
+ /// Users are expected to drain it until it yields `None`
+ /// after every interaction with `crate::Foca`
+ pub fn to_notify(&mut self) -> Option<Notification<T>> {
+ self.notifications.pop_front()
+ }
+
+ /// Returns how many unhandled events are left in this runtime
+ ///
+ /// Should be brought down to zero after every interaction with
+ /// `crate::Foca`
+ pub fn backlog(&self) -> usize {
+ self.to_send.len() + self.to_schedule.len() + self.notifications.len()
+ }
+}
+
+#[cfg(test)]
+impl<T: PartialEq> AccumulatingRuntime<T> {
+ pub(crate) fn clear(&mut self) {
+ self.notifications.clear();
+ self.to_send.clear();
+ self.to_schedule.clear();
+ }
+
+ pub(crate) fn is_empty(&self) -> bool {
+ self.notifications.is_empty() && self.to_send.is_empty() && self.to_schedule.is_empty()
+ }
+
+ pub(crate) fn take_all_data(&mut self) -> VecDeque<(T, Bytes)> {
+ core::mem::take(&mut self.to_send)
+ }
+
+ pub(crate) fn take_data(&mut self, dst: T) -> Option<Bytes> {
+ let position = self.to_send.iter().position(|(to, _data)| to == &dst)?;
+
+ self.to_send.remove(position).map(|(_, data)| data)
+ }
+
+ pub(crate) fn take_notification(&mut self, wanted: Notification<T>) -> Option<Notification<T>> {
+ let position = self
+ .notifications
+ .iter()
+ .position(|notification| notification == &wanted)?;
+
+ self.notifications.remove(position)
+ }
+
+ pub(crate) fn take_scheduling(&mut self, timer: Timer<T>) -> Option<Duration> {
+ let position = self
+ .to_schedule
+ .iter()
+ .position(|(_when, event)| event == &timer)?;
+
+ self.to_schedule.remove(position).map(|(when, _)| when)
+ }
+
+ pub(crate) fn find_scheduling<F>(&self, predicate: F) -> Option<&Timer<T>>
+ where
+ F: Fn(&Timer<T>) -> bool,
+ {
+ self.to_schedule
+ .iter()
+ .find(|(_, timer)| predicate(timer))
+ .map(|(_, timer)| timer)
+ }
+}
#[cfg(test)]
mod tests {
Modified src/testing.rs
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use alloc::vec::Vec;
-use core::time::Duration;
-use bytes::{Buf, BufMut, Bytes};
+use bytes::{Buf, BufMut};
-use crate::{Codec, Header, Identity, Member, Message, Notification, Runtime, State, Timer};
+use crate::{Codec, Header, Identity, Member, Message, State};
#[derive(Debug, Clone, Copy, PartialOrd, Ord)]
pub(crate) struct ID {
- id: u8,
+ addr: u8,
bump: u8,
rejoinable: bool,
}
impl PartialEq for ID {
fn eq(&self, other: &Self) -> bool {
// Ignoring `rejoinable` field
- self.id == other.id && self.bump == other.bump
+ self.addr == other.addr && self.bump == other.bump
+ }
+}
+
+impl core::hash::Hash for ID {
+ fn hash<H: core::hash::Hasher>(&self, state: &mut H) {
+ self.addr.hash(state);
+ self.bump.hash(state);
}
}
ID::new_with_bump(id, 0)
}
+ pub(crate) fn bump(mut self) -> Self {
+ self.bump = self.bump.wrapping_add(1);
+ self
+ }
+
pub(crate) fn new_with_bump(id: u8, bump: u8) -> Self {
Self {
- id,
+ addr: id,
bump,
rejoinable: false,
}
pub(crate) fn serialize_into(&self, mut buf: impl BufMut) -> Result<(), BadCodecError> {
if buf.remaining_mut() >= 2 {
- buf.put_u8(self.id);
+ buf.put_u8(self.addr);
buf.put_u8(self.bump);
Ok(())
} else {
pub(crate) fn deserialize_from(mut buf: impl Buf) -> Result<Self, BadCodecError> {
if buf.remaining() >= 2 {
Ok(Self {
- id: buf.get_u8(),
+ addr: buf.get_u8(),
bump: buf.get_u8(),
// Only the identity held by foca cares about this
rejoinable: false,
}
impl Identity for ID {
- fn has_same_prefix(&self, other: &Self) -> bool {
- self.id == other.id
- }
+ type Addr = u8;
fn renew(&self) -> Option<Self> {
if self.rejoinable {
- Some(ID::new_with_bump(self.id, self.bump.wrapping_add(1)).rejoinable())
+ Some(ID::new_with_bump(self.addr, self.bump.wrapping_add(1)).rejoinable())
} else {
None
}
+ }
+
+ fn addr(&self) -> u8 {
+ self.addr
+ }
+
+ fn win_addr_conflict(&self, adversary: &Self) -> bool {
+ debug_assert_ne!(self, adversary);
+ self.bump > adversary.bump
}
}
fn decode_member(&mut self, mut buf: impl Buf) -> Result<Member<ID>, Self::Error> {
BadCodec::decode_member(self, &mut buf)
- }
-}
-
-#[derive(Debug, Clone)]
-pub(crate) struct InMemoryRuntime {
- notifications: Vec<Notification<ID>>,
- to_send: Vec<(ID, Bytes)>,
- to_schedule: Vec<(Timer<ID>, Duration)>,
-}
-
-impl InMemoryRuntime {
- pub(crate) fn new() -> Self {
- Self {
- notifications: Vec::new(),
- to_send: Vec::new(),
- to_schedule: Vec::new(),
- }
- }
-
- pub(crate) fn clear(&mut self) {
- self.notifications.clear();
- self.to_send.clear();
- self.to_schedule.clear();
- }
-
- pub(crate) fn take_all_data(&mut self) -> Vec<(ID, Bytes)> {
- core::mem::take(&mut self.to_send)
- }
-
- pub(crate) fn take_data(&mut self, dst: ID) -> Option<Bytes> {
- let position = self.to_send.iter().position(|(to, _data)| to == &dst)?;
-
- let taken = self.to_send.swap_remove(position);
- Some(taken.1)
- }
-
- pub(crate) fn take_notification(
- &mut self,
- wanted: Notification<ID>,
- ) -> Option<Notification<ID>> {
- let position = self
- .notifications
- .iter()
- .position(|notification| notification == &wanted)?;
-
- let taken = self.notifications.swap_remove(position);
- Some(taken)
- }
-
- pub(crate) fn take_scheduling(&mut self, timer: Timer<ID>) -> Option<Duration> {
- let position = self
- .to_schedule
- .iter()
- .position(|(event, _when)| event == &timer)?;
-
- let taken = self.to_schedule.swap_remove(position);
- Some(taken.1)
- }
-
- pub(crate) fn find_scheduling<F>(&self, predicate: F) -> Option<&Timer<ID>>
- where
- F: Fn(&Timer<ID>) -> bool,
- {
- self.to_schedule
- .iter()
- .find(|(timer, _)| predicate(timer))
- .map(|(timer, _)| timer)
- }
-}
-
-impl Runtime<ID> for InMemoryRuntime {
- fn notify(&mut self, notification: Notification<ID>) {
- self.notifications.push(notification);
- }
-
- fn send_to(&mut self, to: ID, data: &[u8]) {
- self.to_send.push((to, Bytes::copy_from_slice(data)));
- }
-
- fn submit_after(&mut self, event: Timer<ID>, after: Duration) {
- self.to_schedule.push((event, after));
}
}
Deleted examples/broadcasting.rs
-/* Any copyright is dedicated to the Public Domain.
- * https://creativecommons.org/publicdomain/zero/1.0/ */
-use std::{
- collections::{HashMap, HashSet},
- net::SocketAddr,
- time::SystemTime,
-};
-
-use bincode::Options;
-use bytes::{BufMut, Bytes, BytesMut};
-use serde::{Deserialize, Serialize};
-use uuid::Uuid;
-
-use foca::{BroadcastHandler, Invalidates};
-
-// Broadcasts here will always have the following shape:
-//
-// 0. u16 length prefix
-// 1. Tag describing the payload
-// 2. Payload
-//
-
-#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
-enum Tag {
- // We can propagate general-purpose operations. Foca shouldn't
- // care about what's inside the payload, just wether this
- // has been acted on already or not.
- // Side-effects, conflict resolution and whatnot are not its
- // responsibilities, so messages like these aren't invalidated
- // at all: everything NEW it receives will be broadcast.
- Operation {
- operation_id: Uuid,
- // Depending on the nature of the opeartions, this could
- // use more metadata.
- // E.g.: members may receive operations out of order;
- // If the storage doesn't handle that correctly you'll
- // need to do it yourself
- },
-
- // For scenarios where the interactions are very clear, we can
- // be smarter with what we decide to broadcast.
- // E.g.: In our cluster we expect nodes to broadcast their
- // configuration when they join and when it changes, so for
- // any key `node`, there's only one writer (assuming no byzantine
- // behaviour): we can simply use last-write wins
- NodeConfig {
- node: SocketAddr,
- // XXX SystemTime does NOT guarantee monotonic growth.
- // It's good enough for an example, but it's an outage
- // waiting to happen. Use something you have better
- // control of.
- version: SystemTime,
- },
-}
-
-struct Broadcast {
- tag: Tag,
- data: Bytes,
-}
-
-impl Invalidates for Broadcast {
- // I think this is where confusion happens: It's about invalidating
- // items ALREADY in the broadcast buffer, i.e.: foca uses this
- // to manage its broadcast buffer so it can stop talking about unecessary
- // (invalidated) data.
- fn invalidates(&self, other: &Self) -> bool {
- match (self.tag, other.tag) {
- // The only time we care about invalidation is when we have
- // a new nodeconfig for a node and are already broadcasting
- // a config about this same node. We need to decide which
- // to keep.
- (
- Tag::NodeConfig {
- node: self_node,
- version: self_version,
- },
- Tag::NodeConfig {
- node: other_node,
- version: other_version,
- },
- ) if self_node == other_node => self_version > other_version,
- // Any other case we'll keep broadcasting until it gets sent
- // `Config::max_transmissions` times (or gets invalidated)
- _ => false,
- }
- }
-}
-
-impl AsRef<[u8]> for Broadcast {
- fn as_ref(&self) -> &[u8] {
- self.data.as_ref()
- }
-}
-
-// XXX Use actually useful types
-type Operation = String;
-type NodeConfig = String;
-
-struct Handler {
- buffer: BytesMut,
- seen_op_ids: HashSet<Uuid>,
- node_config: HashMap<SocketAddr, (SystemTime, NodeConfig)>,
-}
-
-impl Handler {
- fn craft_broadcast<T: Serialize>(&mut self, tag: Tag, item: T) -> Broadcast {
- self.buffer.reserve(1400);
- let mut crafted = self.buffer.split();
-
- // The payload length. We'll circle back and update it to
- // a real value at the end
- crafted.put_u16(0);
-
- let mut writer = crafted.writer();
-
- let opts = bincode::DefaultOptions::new();
- opts.serialize_into(&mut writer, &tag)
- .expect("error handling");
-
- opts.serialize_into(&mut writer, &item)
- .expect("error handling");
-
- let mut crafted = writer.into_inner();
- let final_len = crafted.len() as u16;
- (&mut crafted[0..1]).put_u16(final_len);
-
- // Notice that `tag` here is already inside `data`,
- // we keep a copy outside to make it easier when implementing
- // `Invalidates`
- Broadcast {
- tag,
- data: crafted.freeze(),
- }
- }
-}
-
-impl<T> BroadcastHandler<T> for Handler {
- type Broadcast = Broadcast;
- type Error = String;
-
- fn receive_item(
- &mut self,
- data: impl bytes::Buf,
- _sender: Option<&T>,
- ) -> Result<Option<Self::Broadcast>, Self::Error> {
- // Broadcast payload is u16-length prefixed
- if data.remaining() < 2 {
- return Err(String::from("Not enough bytes"));
- }
-
- let mut cursor = data;
- let len = cursor.get_u16();
- if cursor.remaining() < usize::from(len) {
- return Err(String::from("Malformed packet"));
- }
-
- // And a tag/header that tells us what the remaining
- // bytes actually are. We leave the blob untouched until
- // we decide wether we care about it.
- let opts = bincode::DefaultOptions::new();
- let mut reader = cursor.reader();
- let tag: Tag = opts.deserialize_from(&mut reader).unwrap();
-
- // Now `reader` points at the actual useful data in
- // the buffer, immediatelly after the tag. We can finally
- // make a decision
- match tag {
- Tag::Operation { operation_id } => {
- if self.seen_op_ids.contains(&operation_id) {
- // We've seen this data before, nothing to do
- return Ok(None);
- }
-
- self.seen_op_ids.insert(operation_id);
-
- let op: Operation = opts.deserialize_from(&mut reader).expect("error handling");
- {
- // This is where foca stops caring
- // If it were me, I'd stuff the bytes as-is into a channel
- // and have a separate task/thread consuming it.
- do_something_with_the_data()
- }
-
- // This WAS new information, so we signal it to foca
- let broadcast = self.craft_broadcast(tag, op);
- Ok(Some(broadcast))
- }
- Tag::NodeConfig { node, version } => {
- if let Some((current_version, _)) = self.node_config.get(&node) {
- if &version > current_version {
- let conf: NodeConfig =
- opts.deserialize_from(&mut reader).expect("error handling");
- Ok(Some(self.craft_broadcast(tag, conf)))
- } else {
- Ok(None)
- }
- } else {
- let conf: NodeConfig =
- opts.deserialize_from(&mut reader).expect("error handling");
- Ok(Some(self.craft_broadcast(tag, conf)))
- }
- }
- }
- }
-}
-
-fn do_something_with_the_data() {
- unimplemented!()
-}
-
-fn main() {}