caio.co/de/cantine

Extract log functionality into StructuredLog<T>

Id
e02c53da13abad0d5eb5d5e8e523ea3ff36dc03f
Author
Caio
Commit time
2019-12-09T11:30:03+01:00

Modified crates/cantine/src/database/bincodedb.rs

@@ -16,7 +16,7
use super::mapped_file::MappedFile;

pub struct BincodeDatabase<T> {
- offsets: File,
+ offsets: StructuredLog<LogEntry>,
data: MappedFile,

uuid_index: HashMap<Uuid, usize>,
@@ -62,55 +62,18
}

pub fn open<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
+ let offsets = StructuredLog::new(base_dir.as_ref().join(OFFSETS_FILE))?;
+
+ let num_entries = offsets.len()?;
+ let mut id_index = HashMap::with_capacity(num_entries);
+ let mut uuid_index = HashMap::with_capacity(num_entries);
+
let mut max_offset = DATA_HEADER_SIZE;
-
- let offsets = OpenOptions::new()
- .read(true)
- .append(true)
- .open(base_dir.as_ref().join(OFFSETS_FILE))?;
-
- let offsets_size = offsets.metadata()?.len() as usize;
- if offsets_size % LOG_ENTRY_LEN != 0 {
- return Err(io::Error::new(
- io::ErrorKind::InvalidData,
- format!(
- "Expected offsets file to size to be a multiple of {}. Got {}",
- LOG_ENTRY_LEN, offsets_size
- ),
- ));
- }
-
- let index_size = offsets_size / LOG_ENTRY_LEN;
- let mut id_index = HashMap::with_capacity(index_size);
- let mut uuid_index = HashMap::with_capacity(index_size);
-
- let mut log_reader =
- BufReader::with_capacity((8192 / LOG_ENTRY_LEN) * LOG_ENTRY_LEN, &offsets);
-
- loop {
- let buf = log_reader.fill_buf()?;
-
- if buf.is_empty() {
- break;
- }
-
- let mut bytes_consumed = 0;
- if let Some(slice) = LayoutVerified::new_slice(buf) {
- let entries: &[LogEntry] = slice.into_slice();
- for entry in entries {
- bytes_consumed += LOG_ENTRY_LEN;
- // No removals, the offsets are always increasing
- max_offset = entry.offset.get() as usize;
- // Updates are simply same id, larger offset
- uuid_index.insert(Uuid::from_bytes(entry.uuid), max_offset);
- id_index.insert(entry.id.get(), max_offset);
- }
- } else {
- return Err(io::Error::new(io::ErrorKind::InvalidData, "Log corrupted!"));
- }
-
- log_reader.consume(bytes_consumed);
- }
+ offsets.for_each_entry(|entry: &LogEntry| {
+ max_offset = entry.offset.get() as usize;
+ uuid_index.insert(Uuid::from_bytes(entry.uuid), max_offset);
+ id_index.insert(entry.id.get(), max_offset);
+ })?;

let datafile = OpenOptions::new()
.read(true)
@@ -162,7 +125,7
let id = obj.get_id();

let entry = LogEntry::new(uuid, id, read_offset);
- self.offsets.write_all(entry.as_bytes())?;
+ self.offsets.append(&entry)?;

self.uuid_index.insert(*uuid, read_offset);
self.id_index.insert(id, read_offset);
@@ -197,8 +160,6
}
}

-const LOG_ENTRY_LEN: usize = size_of::<LogEntry>();
-
#[derive(FromBytes, AsBytes)]
#[repr(C)]
struct LogEntry {
@@ -214,6 +175,84
uuid: *uuid.as_bytes(),
offset: U64::new(offset as u64),
}
+ }
+}
+
+struct StructuredLog<T> {
+ file: File,
+ _header: PhantomData<T>,
+}
+
+impl<T> StructuredLog<T>
+where
+ T: FromBytes + AsBytes,
+{
+ fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ if !path.as_ref().exists() {
+ File::create(&path)?;
+ }
+
+ let file = OpenOptions::new()
+ .read(true)
+ .append(true)
+ .open(&path.as_ref())?;
+
+ let entry_len = size_of::<T>();
+
+ let file_size = file.metadata()?.len() as usize;
+ if file_size % entry_len != 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!(
+ "Expected file to size to be a multiple of {}. Got {}",
+ entry_len, file_size
+ ),
+ ));
+ }
+
+ Ok(Self {
+ file,
+ _header: PhantomData,
+ })
+ }
+
+ fn len(&self) -> Result<usize> {
+ Ok(self.file.metadata()?.len() as usize)
+ }
+
+ fn for_each_entry<F>(&self, mut each_entry: F) -> std::io::Result<()>
+ where
+ F: FnMut(&T),
+ {
+ let entry_len = size_of::<T>();
+ let mut log_reader = BufReader::with_capacity((8192 / entry_len) * entry_len, &self.file);
+
+ loop {
+ let buf = log_reader.fill_buf()?;
+
+ if buf.is_empty() {
+ break;
+ }
+
+ let mut bytes_consumed = 0;
+ if let Some(slice) = LayoutVerified::new_slice(buf) {
+ let entries: &[T] = slice.into_slice();
+ for entry in entries {
+ (each_entry)(entry);
+ bytes_consumed += entry_len;
+ }
+ } else {
+ return Err(io::Error::new(io::ErrorKind::InvalidData, "Log corrupted!"));
+ }
+
+ log_reader.consume(bytes_consumed);
+ }
+
+ Ok(())
+ }
+
+ fn append(&mut self, item: &T) -> Result<()> {
+ self.file.write_all(item.as_bytes())
}
}