ArrayFire in Threaded Applications

In this chapter, we will looking at how to use ArrayFire in multi-threaded programs. We shall go over the details in the following order.

Move an Array to thread

In this section, we are going to create an Array on main thread and move it to a child thread, modify it and then print it from the child thread.

        set_device(0);
        info();
        let mut a = constant(1, dim4!(3, 3));

        let handle = thread::spawn(move || {
            //set_device to appropriate device id is required in each thread
            set_device(0);

            println!("\nFrom thread {:?}", thread::current().id());

            a += constant(2, dim4!(3, 3));
            print(&a);
        });

        //Need to join other threads as main thread holds arrayfire context
        handle.join().unwrap();

Read Array from Multiple threads

Now, let's expand the earlier example to do a bunch of arithmetic operations in parallel on multiple threads using the same Array objects.

        let ops: Vec<_> = vec![Op::Add, Op::Sub, Op::Div, Op::Mul, Op::Add, Op::Div];

        // Set active GPU/device on main thread on which
        // subsequent Array objects are created
        set_device(0);

        // ArrayFire Array's are internally maintained via atomic reference counting
        // Thus, they need no Arc wrapping while moving to another thread.
        // Just call clone method on the object and share the resulting clone object
        let a = constant(1.0f32, dim4!(3, 3));
        let b = constant(2.0f32, dim4!(3, 3));

        let threads: Vec<_> = ops
            .into_iter()
            .map(|op| {
                let x = a.clone();
                let y = b.clone();
                thread::spawn(move || {
                    set_device(0); //Both of objects are created on device 0 earlier
                    match op {
                        Op::Add => {
                            let _c = x + y;
                        }
                        Op::Sub => {
                            let _c = x - y;
                        }
                        Op::Div => {
                            let _c = x / y;
                        }
                        Op::Mul => {
                            let _c = x * y;
                        }
                    }
                    sync(0);
                    thread::sleep(std::time::Duration::new(1, 0));
                })
            })
            .collect();
        for child in threads {
            let _ = child.join();
        }

Given below is the definition of the enum Op we used in the example for illustration simplicity.

    #[derive(Debug, Copy, Clone)]
    enum Op {
        Add,
        Sub,
        Div,
        Mul,
    }

Write to Array from Multiple threads

Let us further expand the earlier example by accumulating the results of the arithmetic operations into a single Array object.

The code will differ from earlier section in couple of locations:

  • In the main thread, we wrap the accumulating Array in a read-write lock (std::sync::RwLock) which is in turn wrapped in an atomically reference counted counter a.k.a std::sync::Arc.
  • In the children threads, we use the guarded objects returned by RwLock's write method to access the accumulator Array.
        let ops: Vec<_> = vec![Op::Add, Op::Sub, Op::Div, Op::Mul, Op::Add, Op::Div];

        // Set active GPU/device on main thread on which
        // subsequent Array objects are created
        set_device(0);

        let c = constant(0.0f32, dim4!(3, 3));
        let a = constant(1.0f32, dim4!(3, 3));
        let b = constant(2.0f32, dim4!(3, 3));

        // Move ownership to RwLock and wrap in Arc since same object is to be modified
        let c_lock = Arc::new(RwLock::new(c));

        // a and b are internally reference counted by ArrayFire. Unless there
        // is prior known need that they may be modified, you can simply clone
        // the objects pass them to threads

        let threads: Vec<_> = ops
            .into_iter()
            .map(|op| {
                let x = a.clone();
                let y = b.clone();

                let wlock = c_lock.clone();
                thread::spawn(move || {
                    //Both of objects are created on device 0 in main thread
                    //Every thread needs to set the device that it is going to
                    //work on. Note that all Array objects must have been created
                    //on same device as of date this is written on.
                    set_device(0);
                    if let Ok(mut c_guard) = wlock.write() {
                        match op {
                            Op::Add => {
                                *c_guard += x + y;
                            }
                            Op::Sub => {
                                *c_guard += x - y;
                            }
                            Op::Div => {
                                *c_guard += x / y;
                            }
                            Op::Mul => {
                                *c_guard += x * y;
                            }
                        }
                    }
                })
            })
            .collect();

        for child in threads {
            let _ = child.join();
        }

        //let read_guard = c_lock.read().unwrap();
        //af_print!("C after threads joined", *read_guard);
        //C after threads joined
        //[3 3 1 1]
        //    8.0000     8.0000     8.0000
        //    8.0000     8.0000     8.0000
        //    8.0000     8.0000     8.0000

Write to single Array using Channel

In this section, we shall modify the example to use channel instead of data sharing.

        let ops: Vec<_> = vec![Op::Add, Op::Sub, Op::Div, Op::Mul, Op::Add, Op::Div];
        let ops_len: usize = ops.len();

        // Set active GPU/device on main thread on which
        // subsequent Array objects are created
        set_device(0);

        let mut c = constant(0.0f32, dim4!(3, 3));
        let a = constant(1.0f32, dim4!(3, 3));
        let b = constant(2.0f32, dim4!(3, 3));

        let (tx, rx) = mpsc::channel();

        let threads: Vec<_> = ops
            .into_iter()
            .map(|op| {
                // a and b are internally reference counted by ArrayFire. Unless there
                // is prior known need that they may be modified, you can simply clone
                // the objects pass them to threads
                let x = a.clone();
                let y = b.clone();

                let tx_clone = tx.clone();

                thread::spawn(move || {
                    //Both of objects are created on device 0 in main thread
                    //Every thread needs to set the device that it is going to
                    //work on. Note that all Array objects must have been created
                    //on same device as of date this is written on.
                    set_device(0);

                    let c = match op {
                        Op::Add => x + y,
                        Op::Sub => x - y,
                        Op::Div => x / y,
                        Op::Mul => x * y,
                    };
                    tx_clone.send(c).unwrap();
                })
            })
            .collect();

        for _i in 0..ops_len {
            c += rx.recv().unwrap();
        }

        //Need to join other threads as main thread holds arrayfire context
        for child in threads {
            let _ = child.join();
        }

        //af_print!("C after accumulating results", &c);
        //[3 3 1 1]
        //    8.0000     8.0000     8.0000
        //    8.0000     8.0000     8.0000
        //    8.0000     8.0000     8.0000