Wednesday, Dec 17th, 2014
dso’s talk about Rust Multithreading (programming language)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate getopts; | |
extern crate redis; | |
extern crate time; | |
use redis::RedisResult; | |
use redis::Value as RV; | |
use redis::Commands; | |
use std::io::BufferedReader; | |
use std::char::{is_digit}; | |
use std::collections::HashMap; | |
use std::io::File; | |
use getopts::{optopt,optflag,getopts,OptGroup}; | |
use std::os; | |
use std::num; | |
use std::iter::AdditiveIterator; | |
use std::fmt; | |
use std::result; | |
use std::sync::{RWLock, Arc}; | |
use std::time::duration::Duration; | |
use time::{now, strftime}; | |
use std::io::timer::sleep; | |
type ArcVecSomeObj<'a> = Arc<RWLock<Vec<SomeObj<'a>>>>; | |
type ArcBool = Arc<RWLock<bool>>; | |
// Generalized (real world'ish) example of how to read a file, parse the input, | |
// and export the data to redis in a multithreaded manner. | |
// Program ran on a Corei7, 32GB RAM and completed the import of 33M records | |
// in about 90 Minutes (roughly 3 minutes per million records) | |
// The current configuration of the program will Spawn 1000 Redis Export tasks, | |
// which are responsible for taking the SomeObj from a shared (Atomically | |
// Referenced, RW Lock Protected Vector) and inserting it into a Redis | |
// key space. The SomeObj are derived from a Key-Value line in a file, and | |
// once, this line is parsed it is inserted into the shared vector. | |
// When the file reading is complete, the reader (Producer/main task) will | |
// signal all the consumer (Redis inserting) tasks, so that once the shared | |
// vector is empty, they know to quit (instead of waiting). The main task | |
// will receive a signal in the form if a u32 value via the channel and quit | |
// once all the redis consumers are done. | |
// The only other detail that might be of interest is the way the redis threads | |
// are monitored for quiting. I could not figure out a clean way to "Join" a | |
// task when once it was complete, so I simply used a HashMap, mapping the task- | |
// id to its receive channel. When the file reading is complete, the main task | |
// will poll over these tasks to determine whether or not it has completed. In | |
// this case, I collect the keys into a vector, and then walk over the HashMap, | |
// rather than iterating over the HashMap directly, because when I iterated | |
// over the HashMap (e.g. channel_map.iter()) this results in a compiler error | |
// at "channel_map.remove(id);". I think it is a bug in the Borrow checker, | |
// because I get an error that looks like this: | |
/* | |
main.rs:492:17: 492:28 error: cannot borrow `channel_map` as mutable because it | |
is also borrowed as immutable | |
main.rs:492 channel_map.remove(id); | |
main.rs:473:31: 440:42 note: previous borrow of `channel_map` occurs here; | |
the immutable borrow prevents subsequent moves or | |
mutable borrows of `channel_map` | |
until the borrow ends | |
main.rs:473 for (c_id, rx) in channel_map.iter() { | |
main.rs:473:6: 463:6 note: previous borrow ends here | |
main.rs:466 while cnt > 0 { | |
main.rs:500 } | |
with the following code at 473 instead of whats currently there: | |
for (c_id, rx) in channel_map.iter() { | |
let rx = channel_map.get(c_id).unwrap(); | |
let res_recv = rx.try_recv(); | |
match res_recv { | |
Ok(_) => { | |
del_ids.push (c_id); | |
println!("redis thread {} has completed import", c_id); | |
}, | |
Err (_) => (), | |
} | |
} | |
*/ | |
// This may be a bit over the top, | |
// but I hope this code is helpful to others – dso | |
struct SomeObj<'a> { | |
some_field1 : String, | |
some_field2 : String, | |
some_field3 : String, | |
some_field4 : String, | |
} | |
impl <'a> SomeObj<'a> { | |
pub fn new<'a>(some_field1 : &str, some_field2 : &str, inp_some_field3 : &str) -> SomeObj<'a> { | |
let somefield4 = "Blah" | |
SomeObj{some_field1:some_field1.to_string(), | |
some_field2:some_field2.to_string(), | |
some_field4:some_field4.to_string(), | |
some_field3:inp_some_field3.to_string() | |
} | |
} | |
} | |
impl <'a> Clone for SomeObj<'a> { | |
fn clone (&self) -> SomeObj<'a> { | |
SomeObj{some_field1:self.some_field2.clone(), | |
some_field2:self.some_field2.clone(), | |
some_field4:self.some_field4.clone(), | |
some_field3:self.some_field3.clone() | |
} | |
} | |
} | |
impl <'a> fmt::Show for SomeObj<'a> { | |
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | |
let x = format!("field1: 0x{:08x} field2: 0x{:08x} field3: {} field4: {}", | |
self.some_field1, self.some_field2, self.some_field3, self.some_field4); | |
write!(f, "{}", x) | |
} | |
} | |
fn get_time () -> String { | |
let fmt = "%Y:%m:%d:%H:%M:%S"; | |
match strftime(fmt, &now()) { | |
Ok(s) => s.clone(), | |
Err(_) => "ERROR_TIME".to_string(), | |
} | |
} | |
fn is_service_done (shared_done : &ArcBool) -> bool { | |
match shared_done.try_read() { | |
Some (v) => v.clone(), | |
None => false, | |
} | |
} | |
fn set_is_service_done (shared_done : &ArcBool, new_value : bool){ | |
let mut set_service = false; | |
while !set_service { | |
match shared_done.try_write () { | |
Some (mut v) => { | |
*v = new_value; | |
set_service = true; | |
}, | |
None => (), | |
} | |
} | |
} | |
fn is_service_active (shared_keep_running : &ArcBool) -> bool { | |
match shared_keep_running.try_read() { | |
Some (v) => v.clone(), | |
None => true, | |
} | |
} | |
fn execute_redis_insert_task ( | |
namespace : String, | |
host : String, | |
shared_vec : ArcVecSomeObj<'static>, | |
shared_keep_running : ArcBool, | |
shared_done : ArcBool) { | |
let uri = format!("redis://{}/", host); | |
let client : Result<_, _> = redis::Client::open(uri.as_slice()); | |
let rcon : Result<_, _> = client.unwrap().get_connection(); | |
let mut keep_running = is_service_active (&shared_keep_running) && | |
rcon.is_ok(); | |
if !keep_running { | |
return | |
} | |
let con = rcon.unwrap(); | |
let mut cnt : uint = 0; | |
while keep_running { | |
let mut opt_obj : Option <SomeObj>= None; | |
{ | |
opt_obj = match shared_vec.try_write() { | |
Some (ref mut vec) => { | |
if vec.len() == 0 && is_service_done(&shared_done) { | |
keep_running = false; | |
None | |
} else { | |
cnt += 1; | |
if cnt % 10000 == 0 { | |
println!("Inserted {} fields into redis.", cnt); | |
} | |
vec.pop() | |
} | |
}, | |
None => None | |
}; | |
} | |
if opt_some_obj.is_some() { | |
let obj = opt_some_obj.unwrap(); | |
do_redis_insert (&con, &obj, &namespace); | |
} else { | |
let time_sleep = Duration::seconds(1); | |
sleep(time_sleep); | |
} | |
let keep_running = keep_running && is_service_active (&shared_keep_running); | |
} | |
} | |
fn do_redis_insert<'a>(con : &redis::Connection, | |
obj : &SomeObj<'a>, | |
namespace : &String ) -> redis::RedisResult<()> | |
{ | |
let field1_key = "field1"; | |
let field2_key = "field2"; | |
let field3_key = "field3"; | |
let k = format!("{}-obj_data-{}", namespace, some_obj.some_field1); | |
let key = k.as_slice(); | |
//let mut sa_str :String = format!("0x{:08x}",some_obj.some_field1); | |
let _ : RV = try!(redis::pipe().cmd("HMSET").arg(key) | |
.arg(field1_key).arg(some_obj.some_field1.clone()) | |
.arg(field2_key).arg(some_obj.some_field2.clone()) | |
.arg(field3_key).arg(some_obj.some_field4.clone()).query(con)); | |
Ok(()) | |
} | |
fn do_redis_code<'a>(host : &String, | |
vec_objs : &Vec<SomeObj<'a>>, | |
namespace : &String ) -> redis::RedisResult<()> | |
{ | |
// general connection handling | |
let uri = format!("redis://{}/", host); | |
let client = try!(redis::Client::open(uri.as_slice())); | |
let con = try!(client.get_connection()); | |
let mut cnt : uint = 0; | |
for obj in vec_objs.iter() { | |
let k = format!("{}-obj_data-{}", namespace, some_obj.some_field1); | |
let mut sa_str :String = format!("0x{:08x}",some_obj.some_field1); | |
let _ : RedisResult<RV> = con.hset(k.as_slice(), | |
"some_field1", sa_str.as_slice()); | |
sa_str = format!("0x{:08x}",some_obj.some_field2); | |
let _ : RedisResult<RV> = con.hset(k.as_slice(), | |
"some_field2", sa_str.as_slice()); | |
let _ : RedisResult<RV> = con.hset(k.as_slice(), | |
"some_field3", some_obj.some_field3.as_slice()); | |
let _ : RedisResult<RV> = con.hset(k.as_slice(), | |
"field4_key", some_obj.some_field4.as_slice()); | |
cnt+=1; | |
if cnt % 100000 == 0 { | |
println!("Loaded {} obj fields.", cnt); | |
} | |
} | |
Ok(()) | |
} | |
fn get_keyed_line (line : &String ) -> HashMap<String, String> { | |
let mut hm = HashMap::new(); | |
let keys_values: Vec<&str> = line.as_slice().trim().split_str("||").collect(); | |
for kv_pair in keys_values.iter() { | |
//println!("kv_pair: {}", kv_pair); | |
let kv_string = kv_pair.to_string(); | |
let key_value : Vec<&str> = kv_string.as_slice().split_str("::").collect(); | |
let k = key_value[0].to_string(); | |
let v = key_value[1].to_string(); | |
hm.insert (k, v); | |
} | |
hm | |
} | |
fn read_objs_file<'a>(inputfile : &String, vec_objs : &mut Vec<SomeObj<'a>> ) { | |
let mut file = BufferedReader::new(File::open(&Path::new(inputfile.as_slice()))); | |
let mut cnt : uint = 0; | |
for line in file.lines() { | |
let hm = match line { | |
Ok (ref l) => get_keyed_line (l), | |
Err(_) => HashMap::new(), | |
}; | |
if hm.len() == 0 { | |
break; | |
} | |
let obj = SomeObj::new ( | |
hm.get("some_field1").unwrap().as_slice(), | |
hm.get("some_field2").unwrap().as_slice(), | |
hm.get("some_field3").unwrap().as_slice()); | |
//println!("{}", obj); | |
vec_objs.push(obj); | |
cnt+=1; | |
if cnt % 100000 == 0 { | |
println!("Loaded {} obj fields.", cnt); | |
} | |
} | |
} | |
fn parse_line<'a>( line : &String ) -> Option<SomeObj<'a>>{ | |
let hm = get_keyed_line (line); | |
if hm.len() == 0 { | |
return None | |
} | |
Some(SomeObj::new ( | |
hm.get("some_field1").unwrap().as_slice(), | |
hm.get("some_field2").unwrap().as_slice(), | |
hm.get("some_field3").unwrap().as_slice())) | |
} | |
fn queue_obj <'a> (obj : &SomeObj<'a>, shared_vec : ArcVecSomeObj<'static> ) { | |
let mut inserted_obj = false; | |
while !inserted_obj { | |
match shared_vec.try_write (){ | |
Some (ref mut vec) => { | |
vec.push (some_obj.clone()); | |
inserted_obj = true; | |
}, | |
None => (), | |
} | |
} | |
} | |
fn read_objs_file_mt<'a>(inputfile : &String, shared_vec : ArcVecSomeObj<'static> ) { | |
let mut file = BufferedReader::new(File::open(&Path::new(inputfile.as_slice()))); | |
let mut cnt : uint = 0; | |
for line in file.lines() { | |
let ls = match line { | |
Ok (ref l) => l.clone(), | |
Err(_) => String::new(), | |
}; | |
if ls.len() == 0 { | |
continue; | |
} | |
let t_shared_obj = shared_vec.clone(); | |
spawn (proc () { | |
//println!("{}", obj); | |
let t_shared_obj = t_shared_obj; | |
let opt_obj = parse_line (&ls); | |
match opt_obj { | |
Some (obj) => {queue_obj(&obj, t_shared_obj );}, | |
None => () | |
}; | |
}); | |
cnt+=1; | |
if cnt % 100000 == 0 { | |
if cnt % 100000 == 0 { | |
println!("Inserted {} fields into redis: {}.", cnt, get_time()); | |
} | |
let time_sleep = Duration::seconds(2); | |
sleep(time_sleep); | |
} | |
} | |
} | |
fn print_usage(program: &str, _opts: &[OptGroup]) { | |
println!("Usage: {} [options]", program); | |
println!("-f\t\tfile containing identified objs"); | |
println!("-H\t\tredis host"); | |
println!("-h –help\tUsage"); | |
} | |
fn main() { | |
let args: Vec<String> = os::args(); | |
let program = args[0].clone(); | |
let NUM_REDIS_TASKS = 1000; | |
//let mut obj_vec = Vec::new(); | |
let mut shared_vec :ArcVecSomeObj<'static> = Arc::new(RWLock::new(Vec::new())); | |
let mut shared_done = Arc::new(RWLock::new(false)); | |
let mut shared_keep_running = Arc::new(RWLock::new(true)); | |
let opts = &[ | |
optopt("f", "", "input some_field3", "NAME"), | |
optopt("H", "", "redi host", "NAME"), | |
optopt("n", "", "name space for the data", "NAME"), | |
optflag("h", "help", "print this help menu") | |
]; | |
let matches = match getopts(args.tail(), opts) { | |
Ok(m) => { m } | |
Err(f) => { panic!(f.to_string()) } | |
}; | |
if matches.opt_present("h") { | |
print_usage(program.as_slice(), opts); | |
return; | |
} | |
let inputfile = matches.opt_str("f").unwrap(); | |
let namespace = matches.opt_str("n").unwrap(); | |
let redishost = matches.opt_str("H").unwrap(); | |
println!("Hello, world!"); | |
println!("Reading objs from: {}", inputfile); | |
println!("Sending objs to redis host: {}", namespace); | |
println!("Sending objs to redis host: {}", redishost); | |
println!("Starting the redis consumer pool now: {}", get_time()); | |
// clone this proc values so that the clones can be moved | |
// into the spawned task | |
let mut cnt : u32 = 0; | |
let mut channel_map = HashMap::new(); | |
while cnt < NUM_REDIS_TASKS { | |
let (tx, rx) : (Sender<u32>, Receiver<u32> )= channel(); | |
let t_shared_vec = shared_vec.clone(); | |
let t_shared_done = shared_done.clone(); | |
let t_shared_keep_running = shared_keep_running.clone(); | |
let t_namespace = namespace.clone(); | |
let t_host = redishost.clone(); | |
channel_map.insert (cnt, rx); | |
cnt+=1; | |
spawn (proc () { | |
let t_shared_vec = t_shared_vec; | |
let t_shared_done = t_shared_done; | |
let t_shared_keep_running = t_shared_keep_running; | |
let t_namespace = t_namespace; | |
let t_host = t_host; | |
let child_tx = tx.clone(); | |
execute_redis_insert_task (t_namespace, t_host, t_shared_vec, | |
t_shared_keep_running, t_shared_done ); | |
child_tx.send (0); | |
}); | |
} | |
// Old slow way | |
// 1) Read in all data from a file into a vector | |
// 2) Write to redis field by field and return | |
// (ran for 12+ hours and did not finish) | |
//read_objs_file(&inputfile, &mut obj_vec); | |
//do_redis_code (&redishost, &obj_vec, &namespace); | |
// return | |
// New way | |
// 1) Read line, and create a spurious task to parse the line, and | |
// insert it the result into the shared vector | |
// 2) The 1000+ consumer tasks will submit the data in the back ground | |
// as the parsing takes place | |
println!("Consuming the file now: {}", get_time()); | |
read_objs_file_mt(&inputfile, shared_vec.clone()); | |
println!("Completed consuming the file now: {}", get_time()); | |
set_is_service_done (&shared_done, true); | |
println!("Waiting on the insertion task to finish"); | |
while cnt > 0 { | |
let mut del_ids = Vec::new(); | |
let mut keys = Vec::new(); | |
{ | |
for k in channel_map.keys() { | |
keys.push(*k); | |
} | |
for c_id in keys.iter() { | |
let rx = channel_map.get(c_id).unwrap(); | |
let res_recv = rx.try_recv(); | |
match res_recv { | |
Ok(_) => { | |
del_ids.push (c_id); | |
println!("redis thread {} has completed import", c_id); | |
}, | |
Err (_) => (), | |
} | |
} | |
} | |
if del_ids.len() == 0 { | |
let time_sleep = Duration::seconds(1); | |
sleep(time_sleep); | |
} else { | |
for c_id in del_ids.iter() { | |
let id = c_id.clone(); | |
channel_map.remove(id); | |
} | |
} | |
{ | |
cnt = channel_map.len() as u32; | |
} | |
} | |
println!("Completed All processing: {}", get_time()); | |
} |