Rust’s Monitor Object: Simple Concurrency Control for Easy Parallelism

Photo by RDNE Stock project: https://www.pexels.com/photo/black-mobile-phone-on-black-and-white-printer-paper-7948071/

Introduction

Sometimes in a multi-threaded program, you need to protect a resource from concurrent access, that is access by multiple threads. One way of doing this, is to implement a Monitor Object.

To achieve this, we do the following:

  1. We identify the resource we need to protect from being accessed by multiple threads
  2. Next we create a Monitor object in which we encapsulate the shared resource, and we provide method for access and modifying it.
  3. We can use Rust’s Mutex<T> struct to protect our resource, ensuring that only one thread at a time can access it.
  4. We must also implement methods that lock the mutex before accessing the shared resource, and unlock it afterwards.

Implementation in Rust

In this example we will implement a simple program to administer stocks, or in our case exactly one stock.

The Stock struct

This is how we define the Stock struct:

use std::sync::{Arc,Mutex,Condvar};
use std::thread;

#[derive(Debug,Clone)]
struct Stock {
    name: String,
    price: f64,
}

We first import the necessary crates, then we define our Stock struct. In our example a stock just has a name and a price. The Debug and Clone traits allow the Stock to be printed and copied.

Next we implement the Stock struct:

impl Stock {
    fn new(name: &str, price: f64) -> Self {
        Stock {
            name: name.to_string(),
            price,
        }
    }
    fn update_price(&mut self, new_price: f64) {
        self.price = new_price;
    }

    fn get_price(&self) -> f64 {
        self.price
    }
    fn get_name(&self) -> &str {
        &self.name
    }   
}

This code is no more than a constructor, the new() method, and a few getters and setters.

The Monitor struct

The Monitor struct looks like this:

struct Monitor {
    value: Mutex<Stock>,
    stock_signal: Condvar,
}

In this struct, the Mutex ensures only one thread can access the stock at any given time. The Condvar allows other threads to wait until a certain condition occurs.

Let’s implement the monitor object:

impl Monitor {
    fn new(initial_value:Stock)->Self {
        Monitor {
            value: Mutex::new(initial_value.clone()),
            stock_signal: Condvar::new(),
        }
    }

    fn update_price(&self, new_price: f64) {
        let mut stock = self.value.lock().unwrap();
        println!("Updating price from {} to {} for stock {}", stock.get_price(), new_price,stock.get_name());
        stock.update_price(new_price);
        self.stock_signal.notify_one();
    }
    
    fn wait_for_release(&self) {
        let limit=115.0;
        let mut stock = self.value.lock().unwrap();
        while stock.get_price() < limit {
            stock = self.stock_signal.wait(stock).unwrap();
        }
        println!("Price is now above {}",limit);
    }
    
}

A few points:

  1. The new() functions as a constructor, and creates a new Monitor instance with the given Stock.
  2. The update_price() method, locks the Stock object, updates its prices, and signals one waiting thread.
  3. The wait_for_release() method, locks the stock, and waits until the prices gets above a certain level.

The wait_for_release() method

In the wait_for_release() method, we lock the Stock object. It then enters a loop where it continually checks if the price of the Stock is less than a certain limit. If the price is less than the less than the limit, the method calls the self.stock_signal.wait(stock) method. This block the current thread, until another thread calls notify_one() or notify_all() on the same Condvar

The consequence of this, is that if the price of the Stock is initially less than the limit, this method will block the current thread until the price increases to the limit or above. This will allow other threads to update the price of the Stock while the current thread is blocked. Once the prices reaches the limit, the wait() method will return. The method will exit the loop and continue executing.

Using a Condvar in this way, we can effectively manage access to the Stock. By using the wait_for_release() method, the main thread waits for the price of the Stock to reach a certain limit before proceeding. This is useful in scenarios where the order of operations matters, for example when one operation depends on the result of another. Example scenarios would be things like managing stocks, or a warehouse ledger system.

Testing time

Let’s put our monitor object to good use:


fn main() {
    let monitor=Arc::new(Monitor::new(Stock::new("MSFT", 100.0)));
    let threads:Vec<_> = (0..10).map(|counter| {
        let monitor=monitor.clone();
        thread::spawn(move || {
            monitor.update_price(110.0 + 2.0*(counter as f64));
        })
    }).collect();

    monitor.wait_for_release();
    for thread in threads {
        thread.join().unwrap();
    }

    
    let final_value = monitor.value.lock().unwrap();
    println!("Stock is now for {:?}",final_value);

    
}

In the main() function, we create a monitor object, with some initial Stock values. After that we create ten threads, each of which attempt to update the price of the Stock. After all threads have finished updating the prices, the main threads waits for the price to exceed 115.

Finally we print the final stock value, including its name.

Conclusion

Writing a good monitor object example in Rust we not easy, especially when it came to finding a good example. However, you can see that using the provided structs in stdlib, it is actually pretty straightforward to implement this pattern. Especially using the Condvar struct to signal other threads can both boost performance and stability.

Leave a Reply

Your email address will not be published. Required fields are marked *