ArrayFire in Threaded Applications
In this chapter, we will looking at how to use ArrayFire in multi-threaded programs. We shall go over the details in the following order.
- Move an Array to thread
- Read Array from Multiple threads
- Write to Array from Multiple threads
- Write to single Array using Channel
Move an Array to thread
In this section, we are going to create an Array on main thread and move it to a child thread, modify it and then print it from the child thread.
set_device(0);
info();
let mut a = constant(1, dim4!(3, 3));
let handle = thread::spawn(move || {
//set_device to appropriate device id is required in each thread
set_device(0);
println!("\nFrom thread {:?}", thread::current().id());
a += constant(2, dim4!(3, 3));
print(&a);
});
//Need to join other threads as main thread holds arrayfire context
handle.join().unwrap();
Read Array from Multiple threads
Now, let's expand the earlier example to do a bunch of arithmetic operations in parallel on multiple threads using the same Array objects.
let ops: Vec<_> = vec![Op::Add, Op::Sub, Op::Div, Op::Mul, Op::Add, Op::Div];
// Set active GPU/device on main thread on which
// subsequent Array objects are created
set_device(0);
// ArrayFire Array's are internally maintained via atomic reference counting
// Thus, they need no Arc wrapping while moving to another thread.
// Just call clone method on the object and share the resulting clone object
let a = constant(1.0f32, dim4!(3, 3));
let b = constant(2.0f32, dim4!(3, 3));
let threads: Vec<_> = ops
.into_iter()
.map(|op| {
let x = a.clone();
let y = b.clone();
thread::spawn(move || {
set_device(0); //Both of objects are created on device 0 earlier
match op {
Op::Add => {
let _c = x + y;
}
Op::Sub => {
let _c = x - y;
}
Op::Div => {
let _c = x / y;
}
Op::Mul => {
let _c = x * y;
}
}
sync(0);
thread::sleep(std::time::Duration::new(1, 0));
})
})
.collect();
for child in threads {
let _ = child.join();
}
Given below is the definition of the enum Op
we used in the example for illustration simplicity.
#[derive(Debug, Copy, Clone)]
enum Op {
Add,
Sub,
Div,
Mul,
}
Write to Array from Multiple threads
Let us further expand the earlier example by accumulating the results of the arithmetic operations into a single Array object.
The code will differ from earlier section in couple of locations:
- In the main thread, we wrap the accumulating Array in a read-write lock (
std::sync::RwLock
) which is in turn wrapped in an atomically reference counted counter a.k.astd::sync::Arc
. - In the children threads, we use the guarded objects returned by RwLock's write method to access the accumulator Array.
let ops: Vec<_> = vec![Op::Add, Op::Sub, Op::Div, Op::Mul, Op::Add, Op::Div];
// Set active GPU/device on main thread on which
// subsequent Array objects are created
set_device(0);
let c = constant(0.0f32, dim4!(3, 3));
let a = constant(1.0f32, dim4!(3, 3));
let b = constant(2.0f32, dim4!(3, 3));
// Move ownership to RwLock and wrap in Arc since same object is to be modified
let c_lock = Arc::new(RwLock::new(c));
// a and b are internally reference counted by ArrayFire. Unless there
// is prior known need that they may be modified, you can simply clone
// the objects pass them to threads
let threads: Vec<_> = ops
.into_iter()
.map(|op| {
let x = a.clone();
let y = b.clone();
let wlock = c_lock.clone();
thread::spawn(move || {
//Both of objects are created on device 0 in main thread
//Every thread needs to set the device that it is going to
//work on. Note that all Array objects must have been created
//on same device as of date this is written on.
set_device(0);
if let Ok(mut c_guard) = wlock.write() {
match op {
Op::Add => {
*c_guard += x + y;
}
Op::Sub => {
*c_guard += x - y;
}
Op::Div => {
*c_guard += x / y;
}
Op::Mul => {
*c_guard += x * y;
}
}
}
})
})
.collect();
for child in threads {
let _ = child.join();
}
//let read_guard = c_lock.read().unwrap();
//af_print!("C after threads joined", *read_guard);
//C after threads joined
//[3 3 1 1]
// 8.0000 8.0000 8.0000
// 8.0000 8.0000 8.0000
// 8.0000 8.0000 8.0000
Write to single Array using Channel
In this section, we shall modify the example to use channel instead of data sharing.
let ops: Vec<_> = vec![Op::Add, Op::Sub, Op::Div, Op::Mul, Op::Add, Op::Div];
let ops_len: usize = ops.len();
// Set active GPU/device on main thread on which
// subsequent Array objects are created
set_device(0);
let mut c = constant(0.0f32, dim4!(3, 3));
let a = constant(1.0f32, dim4!(3, 3));
let b = constant(2.0f32, dim4!(3, 3));
let (tx, rx) = mpsc::channel();
let threads: Vec<_> = ops
.into_iter()
.map(|op| {
// a and b are internally reference counted by ArrayFire. Unless there
// is prior known need that they may be modified, you can simply clone
// the objects pass them to threads
let x = a.clone();
let y = b.clone();
let tx_clone = tx.clone();
thread::spawn(move || {
//Both of objects are created on device 0 in main thread
//Every thread needs to set the device that it is going to
//work on. Note that all Array objects must have been created
//on same device as of date this is written on.
set_device(0);
let c = match op {
Op::Add => x + y,
Op::Sub => x - y,
Op::Div => x / y,
Op::Mul => x * y,
};
tx_clone.send(c).unwrap();
})
})
.collect();
for _i in 0..ops_len {
c += rx.recv().unwrap();
}
//Need to join other threads as main thread holds arrayfire context
for child in threads {
let _ = child.join();
}
//af_print!("C after accumulating results", &c);
//[3 3 1 1]
// 8.0000 8.0000 8.0000
// 8.0000 8.0000 8.0000
// 8.0000 8.0000 8.0000