caio.co/de/cantine

Reorganize the database mod internal structure

Id
bf6ec67f077bf80a1c574d14fab876151284cf7e
Author
Caio
Commit time
2019-12-10T17:24:03+01:00

Modified crates/cantine/src/database/mod.rs

@@ -1,4 +1,6
-mod bincodedb;
-mod mapped_file;
+mod config;
+mod readerwriter;
+mod structuredlog;

-pub use bincodedb::{BincodeConfig, DatabaseReader, DatabaseRecord, DatabaseWriter};
+pub use config::BincodeConfig;
+pub use readerwriter::{DatabaseReader, DatabaseRecord, DatabaseWriter};

Renamed crates/cantine/src/database/bincodedb.rs to crates/cantine/src/database/readerwriter.rs

@@ -1,215 +1,25
use std::{
- borrow::Cow,
collections::HashMap,
fs::{File, OpenOptions},
- io::{self, BufRead, BufReader, BufWriter, Result, Seek, SeekFrom, Write},
+ io::{self, BufWriter, Result, Seek, SeekFrom, Write},
marker::PhantomData,
- mem::size_of,
path::Path,
};

-use bincode::{deserialize, serialize};
use byteorder::NativeEndian;
use memmap::Mmap;
-use serde::{Deserialize, Serialize};
+use serde::Serialize;
use uuid::{self, Uuid};
-use zerocopy::{AsBytes, FromBytes, LayoutVerified, U64};
+use zerocopy::{AsBytes, FromBytes, U64};

-const OFFSETS_FILE: &str = "offsets.bin";
-const DATA_FILE: &str = "data.bin";
-const DATA_HEADER_SIZE: usize = size_of::<u64>();
+use super::{
+ config::{Decoder, Encoder},
+ structuredlog::StructuredLog,
+};

pub trait DatabaseRecord {
fn get_id(&self) -> u64;
fn get_uuid(&self) -> uuid::Bytes;
-}
-
-#[derive(FromBytes, AsBytes)]
-#[repr(C)]
-struct LogEntry {
- uuid: uuid::Bytes,
- id: U64<NativeEndian>,
- offset: U64<NativeEndian>,
-}
-
-impl LogEntry {
- fn new(id: u64, uuid: uuid::Bytes, offset: u64) -> Self {
- Self {
- uuid,
- id: U64::new(id),
- offset: U64::new(offset),
- }
- }
-}
-
-struct StructuredLog<T> {
- file: File,
- _header: PhantomData<T>,
-}
-
-impl<T> StructuredLog<T>
-where
- T: FromBytes + AsBytes,
-{
- fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
- if !path.as_ref().exists() {
- File::create(&path)?;
- }
-
- let file = OpenOptions::new()
- .read(true)
- .append(true)
- .open(&path.as_ref())?;
-
- let entry_len = size_of::<T>();
-
- let file_size = file.metadata()?.len() as usize;
- if file_size % entry_len != 0 {
- return Err(io::Error::new(
- io::ErrorKind::InvalidData,
- format!(
- "Expected file to size to be a multiple of {}. Got {}",
- entry_len, file_size
- ),
- ));
- }
-
- Ok(Self {
- file,
- _header: PhantomData,
- })
- }
-
- fn len(&self) -> Result<usize> {
- Ok(self.file.metadata()?.len() as usize / size_of::<T>())
- }
-
- fn for_each_entry<F>(&self, mut each_entry: F) -> std::io::Result<()>
- where
- F: FnMut(&T),
- {
- let entry_len = size_of::<T>();
- let mut log_reader = BufReader::with_capacity((8192 / entry_len) * entry_len, &self.file);
-
- loop {
- let buf = log_reader.fill_buf()?;
-
- if buf.is_empty() {
- break;
- }
-
- let mut bytes_consumed = 0;
- if let Some(slice) = LayoutVerified::new_slice(buf) {
- let entries: &[T] = slice.into_slice();
- for entry in entries {
- (each_entry)(entry);
- bytes_consumed += entry_len;
- }
- } else {
- return Err(io::Error::new(io::ErrorKind::InvalidData, "Log corrupted!"));
- }
-
- log_reader.consume(bytes_consumed);
- }
-
- Ok(())
- }
-
- fn append(&mut self, item: &T) -> Result<()> {
- self.file.write_all(item.as_bytes())
- }
-}
-
-pub trait Encoder<'a> {
- type Item: 'a;
- fn to_bytes(item: &'a Self::Item) -> Option<Cow<'a, [u8]>>;
-}
-
-pub trait Decoder<'a> {
- type Item: 'a;
- fn from_bytes(src: &'a [u8]) -> Option<Self::Item>;
-}
-
-pub struct BincodeConfig<T>(PhantomData<T>);
-
-impl<T> BincodeConfig<T> {
- pub fn new() -> Self {
- Self(PhantomData)
- }
-}
-
-impl<'a, T: 'a> Encoder<'a> for BincodeConfig<T>
-where
- T: Serialize,
-{
- type Item = T;
-
- fn to_bytes(item: &'a T) -> Option<Cow<[u8]>> {
- serialize(item).map(Cow::Owned).ok()
- }
-}
-
-impl<'a, T: 'a> Decoder<'a> for BincodeConfig<T>
-where
- T: Deserialize<'a> + Clone,
-{
- type Item = T;
-
- fn from_bytes(src: &'a [u8]) -> Option<T> {
- deserialize(src).ok()
- }
-}
-
-struct TypedMmap<'a, T, TDecoder>
-where
- T: 'a,
- TDecoder: Decoder<'a, Item = T>,
-{
- data: Mmap,
- _file: File,
- _config: TDecoder,
- _marker: PhantomData<&'a T>,
-}
-
-impl<'a, T: 'a, TDecoder> TypedMmap<'a, T, TDecoder>
-where
- TDecoder: Decoder<'a, Item = T>,
-{
- pub fn with_config<P: AsRef<Path>>(path: P, _config: TDecoder) -> Result<Self> {
- let file = OpenOptions::new()
- .read(true)
- .write(true)
- .open(path.as_ref())?;
-
- Ok(Self {
- data: unsafe { Mmap::map(&file)? },
- _file: file,
- _config,
- _marker: PhantomData,
- })
- }
-
- pub fn get(&self, offset: usize) -> Result<TDecoder::Item> {
- if offset > self.data.len() {
- return Err(io::Error::new(
- io::ErrorKind::InvalidInput,
- "Offset too large",
- ));
- }
-
- let data = self.data[offset..].as_ptr();
- let len = self.data.len() - offset;
- if let Some(decoded) =
- TDecoder::from_bytes(unsafe { std::slice::from_raw_parts(data, len) })
- {
- Ok(decoded)
- } else {
- Err(io::Error::new(
- io::ErrorKind::InvalidData,
- format!("Failure decoding bytes at offset {}", offset),
- ))
- }
- }
}

pub struct DatabaseReader<'a, T, TDecoder: Decoder<'a, Item = T>> {
@@ -294,11 +104,87
}
}

+const OFFSETS_FILE: &str = "offsets.bin";
+const DATA_FILE: &str = "data.bin";
+
+#[derive(FromBytes, AsBytes)]
+#[repr(C)]
+struct LogEntry {
+ uuid: uuid::Bytes,
+ id: U64<NativeEndian>,
+ offset: U64<NativeEndian>,
+}
+
+impl LogEntry {
+ fn new(id: u64, uuid: uuid::Bytes, offset: u64) -> Self {
+ Self {
+ uuid,
+ id: U64::new(id),
+ offset: U64::new(offset),
+ }
+ }
+}
+
+struct TypedMmap<'a, T, TDecoder>
+where
+ T: 'a,
+ TDecoder: Decoder<'a, Item = T>,
+{
+ data: Mmap,
+ _file: File,
+ _config: TDecoder,
+ _marker: PhantomData<&'a T>,
+}
+
+impl<'a, T: 'a, TDecoder> TypedMmap<'a, T, TDecoder>
+where
+ TDecoder: Decoder<'a, Item = T>,
+{
+ pub fn with_config<P: AsRef<Path>>(path: P, _config: TDecoder) -> Result<Self> {
+ let file = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .open(path.as_ref())?;
+
+ Ok(Self {
+ data: unsafe { Mmap::map(&file)? },
+ _file: file,
+ _config,
+ _marker: PhantomData,
+ })
+ }
+
+ pub fn get(&self, offset: usize) -> Result<TDecoder::Item> {
+ if offset > self.data.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "Offset too large",
+ ));
+ }
+
+ let data = self.data[offset..].as_ptr();
+ let len = self.data.len() - offset;
+ if let Some(decoded) =
+ TDecoder::from_bytes(unsafe { std::slice::from_raw_parts(data, len) })
+ {
+ Ok(decoded)
+ } else {
+ Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("Failure decoding bytes at offset {}", offset),
+ ))
+ }
+ }
+}
+
#[cfg(test)]
mod tests {

use super::*;
use tempfile;
+
+ use crate::database::BincodeConfig;
+ use serde::Deserialize;

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Named<'a>(u64, Uuid, &'a str);
@@ -314,7 +200,7
}

#[test]
- fn less_awkward_api() -> Result<()> {
+ fn usage() -> Result<()> {
let basedir = tempfile::tempdir()?;

let mut db_writer = DatabaseWriter::new(basedir.path())?;

Renamed crates/cantine/src/database/mapped_file.rs to crates/cantine/src/database/structuredlog.rs

@@ -1,145 +1,124
use std::{
- fs::File,
- io::{Error, ErrorKind, Result},
- ops::{Deref, DerefMut},
+ fs::{File, OpenOptions},
+ io::{self, BufRead, BufReader, Result, Write},
+ marker::PhantomData,
+ mem::size_of,
+ path::Path,
};

-use memmap::{MmapMut, MmapOptions};
+use zerocopy::{AsBytes, FromBytes, LayoutVerified};

-pub(super) struct MappedFile {
+pub(crate) struct StructuredLog<T> {
file: File,
- mmap: MmapMut,
- offset: usize,
+ _header: PhantomData<T>,
}

-impl MappedFile {
- pub fn open(file: File) -> Result<MappedFile> {
- let offset = file.metadata()?.len() as usize;
- let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
- Ok(MappedFile { file, mmap, offset })
- }
-
- pub fn set_append_offset(&mut self, offset: usize) -> Result<()> {
- if offset <= self.len() {
- self.offset = offset;
- Ok(())
- } else {
- Err(Error::new(
- ErrorKind::InvalidInput,
- "offset must be <= len()",
- ))
- }
- }
-
- pub fn offset(&self) -> usize {
- self.offset
- }
-
- pub fn len(&self) -> usize {
- self.mmap.len()
- }
-
- pub fn append(&mut self, data: &[u8]) -> Result<usize> {
- let read_from = self.offset;
- let final_size = read_from + data.len();
-
- if final_size > self.mmap.len() {
- self.file.set_len(final_size as u64)?;
- self.mmap = unsafe { MmapOptions::new().map_mut(&self.file)? };
+impl<T> StructuredLog<T>
+where
+ T: FromBytes + AsBytes,
+{
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ if !path.as_ref().exists() {
+ File::create(&path)?;
}

- self.mmap[read_from..final_size].copy_from_slice(data);
- self.offset = final_size;
- Ok(read_from)
+ let file = OpenOptions::new()
+ .read(true)
+ .append(true)
+ .open(&path.as_ref())?;
+
+ let entry_len = size_of::<T>();
+
+ let file_size = file.metadata()?.len() as usize;
+ if file_size % entry_len != 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!(
+ "Expected file to size to be a multiple of {}. Got {}",
+ entry_len, file_size
+ ),
+ ));
+ }
+
+ Ok(Self {
+ file,
+ _header: PhantomData,
+ })
}
-}

-impl Deref for MappedFile {
- type Target = [u8];
-
- #[inline]
- fn deref(&self) -> &[u8] {
- &self.mmap
+ pub fn len(&self) -> Result<usize> {
+ Ok(self.file.metadata()?.len() as usize / size_of::<T>())
}
-}

-impl DerefMut for MappedFile {
- #[inline]
- fn deref_mut(&mut self) -> &mut [u8] {
- &mut self.mmap
+ pub fn for_each_entry<F>(&self, mut each_entry: F) -> std::io::Result<()>
+ where
+ F: FnMut(&T),
+ {
+ let entry_len = size_of::<T>();
+ let mut log_reader = BufReader::with_capacity((8192 / entry_len) * entry_len, &self.file);
+
+ loop {
+ let buf = log_reader.fill_buf()?;
+
+ if buf.is_empty() {
+ break;
+ }
+
+ let mut bytes_consumed = 0;
+ if let Some(slice) = LayoutVerified::new_slice(buf) {
+ let entries: &[T] = slice.into_slice();
+ for entry in entries {
+ (each_entry)(entry);
+ bytes_consumed += entry_len;
+ }
+ } else {
+ return Err(io::Error::new(io::ErrorKind::InvalidData, "Log corrupted!"));
+ }
+
+ log_reader.consume(bytes_consumed);
+ }
+
+ Ok(())
+ }
+
+ pub fn append(&mut self, item: &T) -> Result<()> {
+ self.file.write_all(item.as_bytes())
}
}

#[cfg(test)]
mod tests {
use super::*;
-
use tempfile;

- fn open_empty() -> Result<MappedFile> {
- let file = tempfile::tempfile()?;
- file.set_len(10)?;
- let db = MappedFile::open(file)?;
- Ok(db)
- }
+ use byteorder::NativeEndian;
+ use zerocopy::U64;

#[test]
- fn open_starts_at_end() -> Result<()> {
- let db = open_empty()?;
- assert_eq!(db.len(), db.offset);
- Ok(())
- }
+ fn usage() -> Result<()> {
+ let tmpdir = tempfile::tempdir()?;
+ let log_path = tmpdir.path().join("testlog");

- #[test]
- fn cannot_set_offset_beyond_len() -> Result<()> {
- let mut db = open_empty()?;
- assert!(db.set_append_offset(db.len() + 1).is_err());
- Ok(())
- }
+ {
+ let mut log = StructuredLog::new(&log_path)?;

- #[test]
- fn can_write_and_read() -> Result<()> {
- let mut db = open_empty()?;
- db.set_append_offset(0)?;
+ assert_eq!(0, log.len()?);

- let data = [1, 2, 3, 4, 5];
- let read_from = db.append(&data)?;
+ for i in 0..100 {
+ log.append(&U64::<NativeEndian>::new(i))?;
+ }
+ }

- assert_eq!(data, db[read_from..data.len()]);
- Ok(())
- }
+ let log = StructuredLog::new(&log_path)?;

- #[test]
- fn len_does_not_grow_if_not_needed() -> Result<()> {
- let mut db = open_empty()?;
- let initial_len = db.len();
- db.set_append_offset(0)?;
- db.append(&[1, 2, 3])?;
- assert_eq!(initial_len, db.len());
- Ok(())
- }
+ assert_eq!(100, log.len()?);

- #[test]
- fn len_grows_when_appending() -> Result<()> {
- let mut db = open_empty()?;
- let initial_len = db.len();
- db.append(&[1, 2, 3])?;
- assert_eq!(initial_len + 3, db.len());
- Ok(())
- }
+ let mut wanted: u64 = 0;
+ log.for_each_entry(|e: &U64<NativeEndian>| {
+ assert_eq!(wanted, e.get());
+ wanted += 1;
+ })?;

- #[test]
- fn len_grows_correctly_at_boundary() -> Result<()> {
- let mut db = open_empty()?;
- let initial_len = db.len();
- let data = [1u8, 2, 3, 4, 5];
-
- let write_from = initial_len - (data.len() - 2);
- db.set_append_offset(write_from)?;
- db.append(&data)?;
-
- assert_eq!(initial_len + 2, db.len());
- assert_eq!(data, db[write_from..]);
Ok(())
}
}

Created crates/cantine/src/database/config.rs

@@ -1,0 +1,50
+use std::{borrow::Cow, marker::PhantomData};
+
+use bincode::{deserialize, serialize};
+use serde::{Deserialize, Serialize};
+
+pub trait Encoder<'a> {
+ type Item: 'a;
+ fn to_bytes(item: &'a Self::Item) -> Option<Cow<'a, [u8]>>;
+}
+
+pub trait Decoder<'a> {
+ type Item: 'a;
+ fn from_bytes(src: &'a [u8]) -> Option<Self::Item>;
+}
+
+pub struct BincodeConfig<T>(PhantomData<T>);
+
+impl<T> BincodeConfig<T> {
+ pub fn new() -> Self {
+ Self(PhantomData)
+ }
+}
+
+impl<T> Default for BincodeConfig<T> {
+ fn default() -> Self {
+ Self(PhantomData)
+ }
+}
+
+impl<'a, T: 'a> Encoder<'a> for BincodeConfig<T>
+where
+ T: Serialize,
+{
+ type Item = T;
+
+ fn to_bytes(item: &'a T) -> Option<Cow<[u8]>> {
+ serialize(item).map(Cow::Owned).ok()
+ }
+}
+
+impl<'a, T: 'a> Decoder<'a> for BincodeConfig<T>
+where
+ T: Deserialize<'a> + Clone,
+{
+ type Item = T;
+
+ fn from_bytes(src: &'a [u8]) -> Option<T> {
+ deserialize(src).ok()
+ }
+}