caio.co/de/cantine

Merge branch 'db_reader_writer'

This is good enough for now. I don't think I'll follow through with a
borrow-heavy model because it's a lot of work for very little gain -
hard to justify. So, maybe one day as a personal challenge...
Id
ee6ffbc1aa8079f6f86a71a10b350e3b8c3e939e
Author
Caio
Commit time
2019-12-11T08:37:36+01:00

Modified crates/cantine/src/model.rs

@@ -1,10 +1,10
use serde::{Deserialize, Serialize};
-use uuid::Uuid;
+use uuid::{self, Uuid};

use crate::database::DatabaseRecord;
use cantine_derive::FilterAndAggregation;

-#[derive(Deserialize, Serialize, Debug, PartialEq)]
+#[derive(Deserialize, Serialize, Debug, PartialEq, Clone)]
pub struct Recipe {
pub uuid: Uuid,

@@ -27,8 +27,8
fn get_id(&self) -> u64 {
self.recipe_id
}
- fn get_uuid(&self) -> &Uuid {
- &self.uuid
+ fn get_uuid(&self) -> uuid::Bytes {
+ *self.uuid.as_bytes()
}
}

@@ -59,7 +59,7
}
}

-#[derive(FilterAndAggregation, Serialize, Deserialize, Debug, Default, PartialEq)]
+#[derive(FilterAndAggregation, Serialize, Deserialize, Debug, Default, PartialEq, Clone)]
pub struct Features {
pub num_ingredients: u8,
pub instructions_length: u32,

Modified crates/cantine/src/bin/check.rs

@@ -12,7 +12,7
use serde_json;
use structopt::StructOpt;

-use cantine::database::BincodeDatabase;
+use cantine::database::{BincodeConfig, DatabaseReader};
use cantine::model::Recipe;

/// Checks data generated via `load` against a stream of recipes
@@ -43,7 +43,8
// Nothing to check at the tantivy index atm

println!("Loading database");
- let db = Arc::new(BincodeDatabase::open(db_path.as_path())?);
+ let database = DatabaseReader::open(db_path, BincodeConfig::<Recipe>::new())?;
+ let db = Arc::new(database);

let mut workers = Vec::new();
let (line_sender, line_receiver) = unbounded::<String>();
@@ -58,8 +59,8
let recipe: Recipe =
serde_json::from_str(line.as_ref()).expect("valid recipe json");

- let db_recipe = db.get_by_uuid(&recipe.uuid).unwrap().unwrap();
- db.get_by_id(recipe.recipe_id).unwrap().unwrap();
+ let db_recipe = db.find_by_uuid(&recipe.uuid).unwrap().unwrap();
+ db.find_by_id(recipe.recipe_id).unwrap().unwrap();

if recipe != db_recipe {
panic!(

Modified crates/cantine/src/bin/load.rs

@@ -14,7 +14,7

use tantivy::{self, directory::MmapDirectory, schema::SchemaBuilder, Index};

-use cantine::database::BincodeDatabase;
+use cantine::database::{BincodeConfig, DatabaseWriter};
use cantine::index::IndexFields;
use cantine::model::Recipe;

@@ -96,15 +96,14
}

let disk_writer = spawn(move || {
- let mut db =
- BincodeDatabase::create(db_path, options.database_size.get() * 1024 * 1024).unwrap();
+ let mut db = DatabaseWriter::new(db_path).unwrap();

let cur = Instant::now();
let mut num_recipes = 0;

for recipe in recipe_receiver {
num_recipes += 1;
- db.add(&recipe).unwrap();
+ db.append::<BincodeConfig<Recipe>>(&recipe).unwrap();

if num_recipes % options.commit_every.get() == 0 {
writer.write().unwrap().commit().unwrap();

Modified crates/cantine/src/bin/query.rs

@@ -8,9 +8,11
use tantivy::{Result, Searcher};

use cantine::{
- database::BincodeDatabase,
+ database::{BincodeConfig, DatabaseReader},
index::Cantine,
- model::{Recipe, SearchCursor, SearchQuery, SearchResult, Sort},
+ model::{
+ FeaturesAggregationResult, Recipe, RecipeId, SearchCursor, SearchQuery, SearchResult, Sort,
+ },
};

/// Queries data generated via `load`
@@ -25,13 +27,19
agg_threshold: Option<usize>,
}

-fn search(
- database: &BincodeDatabase<Recipe>,
+type ExecuteResult = (
+ usize,
+ Vec<RecipeId>,
+ Option<SearchCursor>,
+ Option<FeaturesAggregationResult>,
+);
+
+fn execute_search(
searcher: &Searcher,
cantine: &Cantine,
query: SearchQuery,
agg_threshold: Option<usize>,
-) -> Result<SearchResult> {
+) -> Result<ExecuteResult> {
let interpreted_query = cantine.interpret_query(&query)?;
let limit = query.num_items.unwrap_or(10) as usize;

@@ -53,28 +61,18
None
};

- let mut items = Vec::with_capacity(recipe_ids.len());
- for recipe_id in recipe_ids {
- let recipe: Recipe = database
- .get_by_id(recipe_id)
- .expect("db operational")
- .expect("item in the index always present in the db");
- items.push(recipe.into());
- }
-
- Ok(SearchResult {
- total_found,
- items,
- agg,
- after,
- })
+ Ok((total_found, recipe_ids, after, agg))
}

pub fn main() -> Result<()> {
let options = QueryOptions::from_args();

let (index, cantine) = Cantine::open(options.base_path.join("tantivy"))?;
- let database = BincodeDatabase::open(options.base_path.join("database")).unwrap();
+ let database = DatabaseReader::open(
+ options.base_path.join("database"),
+ BincodeConfig::<Recipe>::new(),
+ )
+ .unwrap();

let stdin = stdin();
let reader = BufReader::new(stdin.lock());
@@ -87,7 +85,24
let query = serde_json::from_str(line.as_str()).expect("valid SearchQuery json");

eprintln!("Executing query {:?}", &query);
- let result = search(&database, &searcher, &cantine, query, options.agg_threshold)?;
+ let (total_found, recipe_ids, after, agg) =
+ execute_search(&searcher, &cantine, query, options.agg_threshold)?;
+
+ let mut items = Vec::with_capacity(recipe_ids.len());
+ for recipe_id in recipe_ids {
+ let recipe: Recipe = database
+ .find_by_id(recipe_id)
+ .expect("db operational")
+ .expect("item in the index always present in the db");
+ items.push(recipe.into());
+ }
+
+ let result = SearchResult {
+ total_found,
+ items,
+ after,
+ agg,
+ };

println!("{}", serde_json::to_string(&result).unwrap());
}

Modified crates/cantine/src/database/mod.rs

@@ -1,4 +1,6
-mod bincodedb;
-mod mapped_file;
+mod config;
+mod readerwriter;
+mod structuredlog;

-pub use bincodedb::{BincodeDatabase, DatabaseRecord};
+pub use config::BincodeConfig;
+pub use readerwriter::{DatabaseReader, DatabaseRecord, DatabaseWriter};

Renamed crates/cantine/src/database/bincodedb.rs to crates/cantine/src/database/readerwriter.rs

@@ -1,164 +1,111
use std::{
collections::HashMap,
fs::{File, OpenOptions},
- io::{self, BufRead, BufReader, Cursor, Result, Write},
+ io::{self, BufWriter, Result, Seek, SeekFrom, Write},
marker::PhantomData,
- mem::size_of,
path::Path,
};

-use bincode::{deserialize, serialize};
-use byteorder::{NativeEndian, ReadBytesExt, WriteBytesExt};
-use serde::{de::DeserializeOwned, Serialize};
-use uuid::Uuid;
-use zerocopy::{AsBytes, FromBytes, LayoutVerified, U64};
+use byteorder::NativeEndian;
+use memmap::Mmap;
+use serde::Serialize;
+use uuid::{self, Uuid};
+use zerocopy::{AsBytes, FromBytes, U64};

-use super::mapped_file::MappedFile;
-
-pub struct BincodeDatabase<T> {
- offsets: StructuredLog<LogEntry>,
- data: MappedFile,
-
- uuid_index: HashMap<Uuid, usize>,
- id_index: HashMap<u64, usize>,
-
- _marker: PhantomData<T>,
-}
-
-const OFFSETS_FILE: &str = "offsets.bin";
-const DATA_FILE: &str = "data.bin";
-const DATA_HEADER_SIZE: usize = size_of::<u64>();
+use super::{
+ config::{Decoder, Encoder},
+ structuredlog::StructuredLog,
+};

pub trait DatabaseRecord {
fn get_id(&self) -> u64;
- fn get_uuid(&self) -> &Uuid;
+ fn get_uuid(&self) -> uuid::Bytes;
}

-impl<T> BincodeDatabase<T>
-where
- T: Serialize + DeserializeOwned + DatabaseRecord,
-{
- pub fn create<P: AsRef<Path>>(base_dir: P, initial_size: u64) -> Result<Self> {
- let offsets_path = base_dir.as_ref().join(OFFSETS_FILE);
- let data_path = base_dir.as_ref().join(DATA_FILE);
+pub struct DatabaseReader<'a, T, TDecoder: Decoder<'a, Item = T>> {
+ data: TypedMmap<'a, T, TDecoder>,
+ uuid_index: HashMap<Uuid, usize>,
+ id_index: HashMap<u64, usize>,
+}

- if offsets_path.exists() || data_path.exists() {
- Err(io::Error::new(
- io::ErrorKind::InvalidInput,
- "database files already exist",
- ))
+impl<'a, T, TDecoder: Decoder<'a, Item = T>> DatabaseReader<'a, T, TDecoder> {
+ pub fn open<P: AsRef<Path>>(base_dir: P, config: TDecoder) -> Result<Self> {
+ let log = StructuredLog::new(base_dir.as_ref().join(OFFSETS_FILE))?;
+ let num_items = log.len()?;
+
+ let mut id_index = HashMap::with_capacity(num_items);
+ let mut uuid_index = HashMap::with_capacity(num_items);
+
+ log.for_each_entry(|entry: &LogEntry| {
+ let offset = entry.offset.get() as usize;
+ uuid_index.insert(Uuid::from_bytes(entry.uuid), offset);
+ id_index.insert(entry.id.get(), offset);
+ })?;
+
+ Ok(Self {
+ id_index,
+ uuid_index,
+ data: TypedMmap::with_config(base_dir.as_ref().join(DATA_FILE), config)?,
+ })
+ }
+
+ pub fn find_by_id(&self, id: u64) -> Result<Option<TDecoder::Item>> {
+ if let Some(&offset) = self.id_index.get(&id) {
+ Ok(Some(self.data.get(offset)?))
} else {
- File::create(offsets_path)?;
-
- let mut data = File::create(data_path)?;
- data.set_len(initial_size)?;
-
- // First u64 is the append offset, in this case
- // we append'll right after the header
- data.write_u64::<NativeEndian>(DATA_HEADER_SIZE as u64)?;
-
- BincodeDatabase::open(base_dir)
+ Ok(None)
}
}

- pub fn open<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
- let offsets = StructuredLog::new(base_dir.as_ref().join(OFFSETS_FILE))?;
-
- let num_entries = offsets.len()?;
- let mut id_index = HashMap::with_capacity(num_entries);
- let mut uuid_index = HashMap::with_capacity(num_entries);
-
- let mut max_offset = DATA_HEADER_SIZE;
- offsets.for_each_entry(|entry: &LogEntry| {
- max_offset = entry.offset.get() as usize;
- uuid_index.insert(Uuid::from_bytes(entry.uuid), max_offset);
- id_index.insert(entry.id.get(), max_offset);
- })?;
-
- let datafile = OpenOptions::new()
- .read(true)
- .append(true)
- .open(base_dir.as_ref().join(DATA_FILE))?;
- let mut data = MappedFile::open(datafile)?;
-
- if max_offset > data.len() {
- return Err(io::Error::new(
- io::ErrorKind::InvalidData,
- format!("index points at unreachable offset: {}", max_offset),
- ));
+ pub fn find_by_uuid(&self, uuid: &Uuid) -> Result<Option<TDecoder::Item>> {
+ if let Some(&offset) = self.uuid_index.get(&uuid) {
+ Ok(Some(self.data.get(offset)?))
+ } else {
+ Ok(None)
}
+ }
+}

- let append_offset = {
- let mut cursor = Cursor::new(&data as &[u8]);
- cursor.read_u64::<NativeEndian>()? as usize
- };
+pub struct DatabaseWriter<T> {
+ log: StructuredLog<LogEntry>,
+ writer: BufWriter<File>,
+ _marker: PhantomData<T>,
+}

- if append_offset < DATA_HEADER_SIZE {
- return Err(io::Error::new(
- io::ErrorKind::InvalidData,
- format!("Got weird append offset from data file: {}", append_offset),
- ));
- }
-
- data.set_append_offset(append_offset)?;
-
- Ok(BincodeDatabase {
- offsets,
- data,
- uuid_index,
- id_index,
+impl<T> DatabaseWriter<T>
+where
+ T: DatabaseRecord + Serialize,
+{
+ pub fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
+ Ok(Self {
+ writer: BufWriter::new(File::create(base_dir.as_ref().join(DATA_FILE))?),
+ log: StructuredLog::new(base_dir.as_ref().join(OFFSETS_FILE))?,
_marker: PhantomData,
})
}

- pub fn add(&mut self, obj: &T) -> Result<()> {
- let data = serialize(obj).map_err(|_| {
- io::Error::new(
- io::ErrorKind::InvalidData,
- "Failed to serialize data being added",
- )
- })?;
+ pub fn append<'a, TEncoder: Encoder<'a, Item = T>>(
+ &mut self,
+ item: &'a TEncoder::Item,
+ ) -> Result<()> {
+ if let Some(encoded) = TEncoder::to_bytes(item) {
+ let offset = self.writer.seek(SeekFrom::Current(0))?;
+ self.writer.write_all(&encoded)?;

- let read_offset = self.data.append(data.as_slice())?;
-
- let uuid = obj.get_uuid();
- let id = obj.get_id();
-
- let entry = LogEntry::new(uuid, id, read_offset);
- self.offsets.append(&entry)?;
-
- self.uuid_index.insert(*uuid, read_offset);
- self.id_index.insert(id, read_offset);
-
- let new_append_offset = U64::<NativeEndian>::new(self.data.offset() as u64);
- self.data[0..DATA_HEADER_SIZE].copy_from_slice(new_append_offset.as_bytes());
-
- Ok(())
- }
-
- fn deserialize_at(&self, offset: usize) -> Result<Option<T>> {
- Ok(Some(deserialize(&self.data[offset..]).map_err(|_| {
- io::Error::new(
- io::ErrorKind::InvalidData,
- "Failed to deserialize stored data",
- )
- })?))
- }
-
- pub fn get_by_id(&self, id: u64) -> Result<Option<T>> {
- match self.id_index.get(&id) {
- Some(&offset) => self.deserialize_at(offset),
- None => Ok(None),
- }
- }
-
- pub fn get_by_uuid(&self, uuid: &Uuid) -> Result<Option<T>> {
- match self.uuid_index.get(uuid) {
- Some(&offset) => self.deserialize_at(offset),
- None => Ok(None),
+ let entry = LogEntry::new(item.get_id(), item.get_uuid(), offset);
+ self.log.append(&entry)?;
+ Ok(())
+ } else {
+ Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "Failure encoding input",
+ ))
}
}
}
+
+const OFFSETS_FILE: &str = "offsets.bin";
+const DATA_FILE: &str = "data.bin";

#[derive(FromBytes, AsBytes)]
#[repr(C)]
@@ -169,90 +116,64
}

impl LogEntry {
- fn new(uuid: &Uuid, id: u64, offset: usize) -> Self {
+ fn new(id: u64, uuid: uuid::Bytes, offset: u64) -> Self {
Self {
+ uuid,
id: U64::new(id),
- uuid: *uuid.as_bytes(),
- offset: U64::new(offset as u64),
+ offset: U64::new(offset),
}
}
}

-struct StructuredLog<T> {
- file: File,
- _header: PhantomData<T>,
+struct TypedMmap<'a, T, TDecoder>
+where
+ T: 'a,
+ TDecoder: Decoder<'a, Item = T>,
+{
+ data: Mmap,
+ _file: File,
+ _config: TDecoder,
+ _marker: PhantomData<&'a T>,
}

-impl<T> StructuredLog<T>
+impl<'a, T: 'a, TDecoder> TypedMmap<'a, T, TDecoder>
where
- T: FromBytes + AsBytes,
+ TDecoder: Decoder<'a, Item = T>,
{
- fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
- if !path.as_ref().exists() {
- File::create(&path)?;
- }
-
+ pub fn with_config<P: AsRef<Path>>(path: P, _config: TDecoder) -> Result<Self> {
let file = OpenOptions::new()
.read(true)
- .append(true)
- .open(&path.as_ref())?;
-
- let entry_len = size_of::<T>();
-
- let file_size = file.metadata()?.len() as usize;
- if file_size % entry_len != 0 {
- return Err(io::Error::new(
- io::ErrorKind::InvalidData,
- format!(
- "Expected file to size to be a multiple of {}. Got {}",
- entry_len, file_size
- ),
- ));
- }
+ .write(true)
+ .open(path.as_ref())?;

Ok(Self {
- file,
- _header: PhantomData,
+ data: unsafe { Mmap::map(&file)? },
+ _file: file,
+ _config,
+ _marker: PhantomData,
})
}

- fn len(&self) -> Result<usize> {
- Ok(self.file.metadata()?.len() as usize)
- }
-
- fn for_each_entry<F>(&self, mut each_entry: F) -> std::io::Result<()>
- where
- F: FnMut(&T),
- {
- let entry_len = size_of::<T>();
- let mut log_reader = BufReader::with_capacity((8192 / entry_len) * entry_len, &self.file);
-
- loop {
- let buf = log_reader.fill_buf()?;
-
- if buf.is_empty() {
- break;
- }
-
- let mut bytes_consumed = 0;
- if let Some(slice) = LayoutVerified::new_slice(buf) {
- let entries: &[T] = slice.into_slice();
- for entry in entries {
- (each_entry)(entry);
- bytes_consumed += entry_len;
- }
- } else {
- return Err(io::Error::new(io::ErrorKind::InvalidData, "Log corrupted!"));
- }
-
- log_reader.consume(bytes_consumed);
+ pub fn get(&self, offset: usize) -> Result<TDecoder::Item> {
+ if offset > self.data.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "Offset too large",
+ ));
}

- Ok(())
- }
-
- fn append(&mut self, item: &T) -> Result<()> {
- self.file.write_all(item.as_bytes())
+ let data = self.data[offset..].as_ptr();
+ let len = self.data.len() - offset;
+ if let Some(decoded) =
+ TDecoder::from_bytes(unsafe { std::slice::from_raw_parts(data, len) })
+ {
+ Ok(decoded)
+ } else {
+ Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("Failure decoding bytes at offset {}", offset),
+ ))
+ }
}
}

@@ -262,119 +183,52
use super::*;
use tempfile;

+ use crate::database::BincodeConfig;
use serde::Deserialize;

- #[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Copy)]
- struct Item(u64, Uuid);
+ #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
+ struct Named<'a>(u64, Uuid, &'a str);

- impl Item {
- fn new(id: u64) -> Self {
- Self(id, Uuid::new_v4())
- }
- }
-
- impl DatabaseRecord for Item {
+ impl<'a> DatabaseRecord for Named<'a> {
fn get_id(&self) -> u64 {
self.0
}
- fn get_uuid(&self) -> &Uuid {
- &self.1
+
+ fn get_uuid(&self) -> uuid::Bytes {
+ *self.1.as_bytes()
}
}

- fn open_empty() -> Result<BincodeDatabase<Item>> {
- let tmpdir = tempfile::TempDir::new().unwrap();
- BincodeDatabase::create(tmpdir, 10)
- }
-
#[test]
- fn can_open_empty_db() {
- open_empty().unwrap();
- }
+ fn usage() -> Result<()> {
+ let basedir = tempfile::tempdir()?;

- #[test]
- fn get_on_empty_works() -> Result<()> {
- assert_eq!(None, open_empty()?.get_by_uuid(&Uuid::new_v4())?);
- assert_eq!(None, open_empty()?.get_by_id(42)?);
- Ok(())
- }
+ let mut db_writer = DatabaseWriter::new(basedir.path())?;

- #[test]
- fn can_add_and_get() -> Result<()> {
- let mut db = open_empty()?;
+ let entries = vec![
+ Named(0, Uuid::new_v4(), "a"),
+ Named(1, Uuid::new_v4(), "b"),
+ Named(2, Uuid::new_v4(), "c"),
+ Named(3, Uuid::new_v4(), "d"),
+ ];

- let one = Item::new(1);
- let two = Item::new(2);
- let three = Item::new(3);
-
- db.add(&one)?;
- db.add(&two)?;
- db.add(&three)?;
-
- assert_eq!(Some(one), db.get_by_id(1)?);
- assert_eq!(Some(three), db.get_by_id(3)?);
- assert_eq!(Some(two), db.get_by_id(2)?);
-
- Ok(())
- }
-
- #[test]
- fn add_updates_both_indices_correctly() -> Result<()> {
- let mut db = open_empty()?;
-
- let item = Item::new(42);
- db.add(&item)?;
-
- assert_eq!(
- db.get_by_id(item.get_id())?,
- db.get_by_uuid(item.get_uuid())?
- );
- Ok(())
- }
-
- #[test]
- fn cannot_overwrite_database() -> Result<()> {
- let tmpdir = tempfile::TempDir::new()?;
-
- BincodeDatabase::<Item>::create(&tmpdir, 1)?;
- let overwrite_result = BincodeDatabase::<Item>::create(tmpdir, 1);
- assert!(overwrite_result.is_err());
-
- Ok(())
- }
-
- #[test]
- fn can_load_existing_database() -> Result<()> {
- let tmpdir = tempfile::TempDir::new()?;
-
- const DB_SIZE: u64 = 1_000;
-
- let one = Item::new(1);
- let two = Item::new(2);
- let three = Item::new(3);
-
- {
- let mut db = BincodeDatabase::create(&tmpdir, DB_SIZE)?;
-
- db.add(&one)?;
- db.add(&two)?;
+ for entry in entries.iter() {
+ db_writer.append::<BincodeConfig<Named>>(entry)?;
}

- {
- let mut db = BincodeDatabase::open(&tmpdir)?;
- db.add(&three)?;
+ // So it flushes
+ drop(db_writer);
+
+ let db_reader = DatabaseReader::open(basedir, BincodeConfig::<Named>::new())?;
+
+ for entry in entries.into_iter() {
+ let id = entry.get_id();
+ let uuid = Uuid::from_bytes(entry.get_uuid());
+ let entry = Some(entry);
+
+ assert_eq!(entry, db_reader.find_by_id(id)?);
+ assert_eq!(entry, db_reader.find_by_uuid(&uuid)?);
}
-
- let existing_db = BincodeDatabase::open(&tmpdir)?;
- assert_eq!(Some(one), existing_db.get_by_uuid(one.get_uuid())?);
- assert_eq!(Some(two), existing_db.get_by_uuid(two.get_uuid())?);
- assert_eq!(Some(three), existing_db.get_by_uuid(three.get_uuid())?);
-
- // Shouldn't have grown from DB_SIZE
- let data_file = OpenOptions::new()
- .read(true)
- .open(tmpdir.path().join(DATA_FILE))?;
- assert_eq!(DB_SIZE, data_file.metadata()?.len());

Ok(())
}

Renamed crates/cantine/src/database/mapped_file.rs to crates/cantine/src/database/structuredlog.rs

@@ -1,145 +1,124
use std::{
- fs::File,
- io::{Error, ErrorKind, Result},
- ops::{Deref, DerefMut},
+ fs::{File, OpenOptions},
+ io::{self, BufRead, BufReader, Result, Write},
+ marker::PhantomData,
+ mem::size_of,
+ path::Path,
};

-use memmap::{MmapMut, MmapOptions};
+use zerocopy::{AsBytes, FromBytes, LayoutVerified};

-pub(super) struct MappedFile {
+pub(crate) struct StructuredLog<T> {
file: File,
- mmap: MmapMut,
- offset: usize,
+ _header: PhantomData<T>,
}

-impl MappedFile {
- pub fn open(file: File) -> Result<MappedFile> {
- let offset = file.metadata()?.len() as usize;
- let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
- Ok(MappedFile { file, mmap, offset })
- }
-
- pub fn set_append_offset(&mut self, offset: usize) -> Result<()> {
- if offset <= self.len() {
- self.offset = offset;
- Ok(())
- } else {
- Err(Error::new(
- ErrorKind::InvalidInput,
- "offset must be <= len()",
- ))
- }
- }
-
- pub fn offset(&self) -> usize {
- self.offset
- }
-
- pub fn len(&self) -> usize {
- self.mmap.len()
- }
-
- pub fn append(&mut self, data: &[u8]) -> Result<usize> {
- let read_from = self.offset;
- let final_size = read_from + data.len();
-
- if final_size > self.mmap.len() {
- self.file.set_len(final_size as u64)?;
- self.mmap = unsafe { MmapOptions::new().map_mut(&self.file)? };
+impl<T> StructuredLog<T>
+where
+ T: FromBytes + AsBytes,
+{
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ if !path.as_ref().exists() {
+ File::create(&path)?;
}

- self.mmap[read_from..final_size].copy_from_slice(data);
- self.offset = final_size;
- Ok(read_from)
+ let file = OpenOptions::new()
+ .read(true)
+ .append(true)
+ .open(&path.as_ref())?;
+
+ let entry_len = size_of::<T>();
+
+ let file_size = file.metadata()?.len() as usize;
+ if file_size % entry_len != 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!(
+ "Expected file to size to be a multiple of {}. Got {}",
+ entry_len, file_size
+ ),
+ ));
+ }
+
+ Ok(Self {
+ file,
+ _header: PhantomData,
+ })
}
-}

-impl Deref for MappedFile {
- type Target = [u8];
-
- #[inline]
- fn deref(&self) -> &[u8] {
- &self.mmap
+ pub fn len(&self) -> Result<usize> {
+ Ok(self.file.metadata()?.len() as usize / size_of::<T>())
}
-}

-impl DerefMut for MappedFile {
- #[inline]
- fn deref_mut(&mut self) -> &mut [u8] {
- &mut self.mmap
+ pub fn for_each_entry<F>(&self, mut each_entry: F) -> std::io::Result<()>
+ where
+ F: FnMut(&T),
+ {
+ let entry_len = size_of::<T>();
+ let mut log_reader = BufReader::with_capacity((8192 / entry_len) * entry_len, &self.file);
+
+ loop {
+ let buf = log_reader.fill_buf()?;
+
+ if buf.is_empty() {
+ break;
+ }
+
+ let mut bytes_consumed = 0;
+ if let Some(slice) = LayoutVerified::new_slice(buf) {
+ let entries: &[T] = slice.into_slice();
+ for entry in entries {
+ (each_entry)(entry);
+ bytes_consumed += entry_len;
+ }
+ } else {
+ return Err(io::Error::new(io::ErrorKind::InvalidData, "Log corrupted!"));
+ }
+
+ log_reader.consume(bytes_consumed);
+ }
+
+ Ok(())
+ }
+
+ pub fn append(&mut self, item: &T) -> Result<()> {
+ self.file.write_all(item.as_bytes())
}
}

#[cfg(test)]
mod tests {
use super::*;
-
use tempfile;

- fn open_empty() -> Result<MappedFile> {
- let file = tempfile::tempfile()?;
- file.set_len(10)?;
- let db = MappedFile::open(file)?;
- Ok(db)
- }
+ use byteorder::NativeEndian;
+ use zerocopy::U64;

#[test]
- fn open_starts_at_end() -> Result<()> {
- let db = open_empty()?;
- assert_eq!(db.len(), db.offset);
- Ok(())
- }
+ fn usage() -> Result<()> {
+ let tmpdir = tempfile::tempdir()?;
+ let log_path = tmpdir.path().join("testlog");

- #[test]
- fn cannot_set_offset_beyond_len() -> Result<()> {
- let mut db = open_empty()?;
- assert!(db.set_append_offset(db.len() + 1).is_err());
- Ok(())
- }
+ {
+ let mut log = StructuredLog::new(&log_path)?;

- #[test]
- fn can_write_and_read() -> Result<()> {
- let mut db = open_empty()?;
- db.set_append_offset(0)?;
+ assert_eq!(0, log.len()?);

- let data = [1, 2, 3, 4, 5];
- let read_from = db.append(&data)?;
+ for i in 0..100 {
+ log.append(&U64::<NativeEndian>::new(i))?;
+ }
+ }

- assert_eq!(data, db[read_from..data.len()]);
- Ok(())
- }
+ let log = StructuredLog::new(&log_path)?;

- #[test]
- fn len_does_not_grow_if_not_needed() -> Result<()> {
- let mut db = open_empty()?;
- let initial_len = db.len();
- db.set_append_offset(0)?;
- db.append(&[1, 2, 3])?;
- assert_eq!(initial_len, db.len());
- Ok(())
- }
+ assert_eq!(100, log.len()?);

- #[test]
- fn len_grows_when_appending() -> Result<()> {
- let mut db = open_empty()?;
- let initial_len = db.len();
- db.append(&[1, 2, 3])?;
- assert_eq!(initial_len + 3, db.len());
- Ok(())
- }
+ let mut wanted: u64 = 0;
+ log.for_each_entry(|e: &U64<NativeEndian>| {
+ assert_eq!(wanted, e.get());
+ wanted += 1;
+ })?;

- #[test]
- fn len_grows_correctly_at_boundary() -> Result<()> {
- let mut db = open_empty()?;
- let initial_len = db.len();
- let data = [1u8, 2, 3, 4, 5];
-
- let write_from = initial_len - (data.len() - 2);
- db.set_append_offset(write_from)?;
- db.append(&data)?;
-
- assert_eq!(initial_len + 2, db.len());
- assert_eq!(data, db[write_from..]);
Ok(())
}
}

Created crates/cantine/src/database/config.rs

@@ -1,0 +1,50
+use std::{borrow::Cow, marker::PhantomData};
+
+use bincode::{deserialize, serialize};
+use serde::{Deserialize, Serialize};
+
+pub trait Encoder<'a> {
+ type Item: 'a;
+ fn to_bytes(item: &'a Self::Item) -> Option<Cow<'a, [u8]>>;
+}
+
+pub trait Decoder<'a> {
+ type Item: 'a;
+ fn from_bytes(src: &'a [u8]) -> Option<Self::Item>;
+}
+
+pub struct BincodeConfig<T>(PhantomData<T>);
+
+impl<T> BincodeConfig<T> {
+ pub fn new() -> Self {
+ Self(PhantomData)
+ }
+}
+
+impl<T> Default for BincodeConfig<T> {
+ fn default() -> Self {
+ Self(PhantomData)
+ }
+}
+
+impl<'a, T: 'a> Encoder<'a> for BincodeConfig<T>
+where
+ T: Serialize,
+{
+ type Item = T;
+
+ fn to_bytes(item: &'a T) -> Option<Cow<[u8]>> {
+ serialize(item).map(Cow::Owned).ok()
+ }
+}
+
+impl<'a, T: 'a> Decoder<'a> for BincodeConfig<T>
+where
+ T: Deserialize<'a> + Clone,
+{
+ type Item = T;
+
+ fn from_bytes(src: &'a [u8]) -> Option<T> {
+ deserialize(src).ok()
+ }
+}