caio.co/de/cantine


Read options solely from `std::env` 💬 by Caio 4 years ago (log)
StructOpt + clap are great and all, but I drive everything with
environment variables in production already and am more comfortable
with the language, so spending extra compilation time for the sake
of a cute CLI interface is not worth it anymore.

My 2013 CPU is thankful. Now my biggest gripe WRT build is that
`tantivy` pulls `failure` which implies a build dependency on GCC
for the sake of stacktraces... So xbuild for ARM is a bit annoying.

Blob cantine/src/bin/load.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
use std::{
    env,
    io::{self, BufRead},
    path::Path,
    str::FromStr,
    sync::{mpsc::channel, Arc, RwLock},
    thread::spawn,
    time::Instant,
};

use crossbeam_channel::unbounded;
use serde_json;

use tantivy::{self, directory::MmapDirectory, schema::SchemaBuilder, Index, Result};

use cantine::database::DatabaseWriter;
use cantine::index::RecipeIndex;
use cantine::model::Recipe;

/// Loads recipes as json into cantine's database and index
#[derive(Debug)]
pub struct LoadOptions {
    /// Size for tantivy's writer buffer in MBs
    buffer_size: usize,
    /// How many recipes to ingest before comitting
    commit_every: usize,
    /// Number of worker threads to start
    num_producers: usize,
    /// Path to a non-existing directory
    output_dir: String,
}

fn load(options: LoadOptions) -> Result<()> {
    log::info!("Started with {:?}", &options);

    let base_path = Path::new(options.output_dir.as_str());
    let db_path = base_path.join("database");
    let index_path = base_path.join("tantivy");

    std::fs::create_dir_all(&db_path)?;
    std::fs::create_dir(&index_path)?;

    let mut builder = SchemaBuilder::new();

    let fields = RecipeIndex::from(&mut builder);

    let index = Index::open_or_create(MmapDirectory::open(&index_path)?, builder.build())?;

    // A SpMc channel to paralellize decode and index preparation
    let (line_sender, line_receiver) = unbounded::<String>();
    // A MpSc channel to control index commit and write to db
    let (recipe_sender, recipe_receiver) = channel();

    let buffer_size = options.buffer_size;
    let writer = Arc::new(RwLock::new(index.writer(buffer_size * 1_000_000)?));

    let num_producers = options.num_producers;
    let mut workers = Vec::with_capacity(num_producers);
    for _ in 0..num_producers {
        let receiver = line_receiver.clone();
        let writer = writer.clone();
        let recipe_sender = recipe_sender.clone();

        let fields = fields.clone();
        workers.push(spawn(move || {
            for line in receiver {
                let recipe: Recipe =
                    serde_json::from_str(line.as_ref()).expect("valid recipe json");

                writer
                    .read()
                    .unwrap()
                    .add_document(fields.make_document(&recipe));

                recipe_sender.send(recipe).expect("send always works");
            }
        }))
    }

    let disk_writer = spawn(move || -> Result<()> {
        let mut db = DatabaseWriter::new(db_path)?;

        let cur = Instant::now();
        let mut num_recipes = 0;

        for recipe in recipe_receiver {
            num_recipes += 1;
            db.append(&recipe)?;

            if num_recipes % options.commit_every == 0 {
                writer.write()?.commit()?;

                log::info!(
                    "DiskWriter: {} Documents so far (@ {} secs).",
                    num_recipes,
                    cur.elapsed().as_secs()
                );
            }
        }

        writer.write()?.commit()?;

        log::info!(
            "DiskWriter: Wrote {} documents in {} seconds",
            num_recipes,
            cur.elapsed().as_secs()
        );

        Ok(())
    });

    for line in io::stdin().lock().lines() {
        line_sender.send(line?).unwrap();
    }

    drop(line_sender);

    for worker in workers.into_iter() {
        worker.join().unwrap();
    }

    drop(recipe_sender);

    disk_writer.join().unwrap()?;

    log::info!("Done!");

    Ok(())
}

const BUFFER_SIZE: &str = "BUFFER_SIZE";
const COMMIT_EVERY: &str = "COMMIT_EVERY";
const NUM_PRODUCERS: &str = "NUM_PRODUCERS";

fn get_usize_from_env_or(key: &str, default: usize) -> usize {
    env::var(key)
        .ok()
        .map(|v| usize::from_str(&v).expect("valid usize"))
        .unwrap_or(default)
}

fn main() -> Result<()> {
    let output_dir = env::args()
        .nth(1)
        .expect("First parameter must be the output directory");

    let buffer_size = get_usize_from_env_or(BUFFER_SIZE, 1000);

    let commit_every = get_usize_from_env_or(COMMIT_EVERY, 300_000);

    let num_producers = get_usize_from_env_or(NUM_PRODUCERS, 4);

    let options = LoadOptions {
        output_dir,
        buffer_size,
        commit_every,
        num_producers,
    };

    load(options)
}