Easy Mastery: A Deep Dive into the Active Object Pattern in Rust’s Seamless Concurrency Model

Photo by Pixabay: https://www.pexels.com/photo/gray-scale-photo-of-gears-159298/

Introduction

Sometimes, it’s handy to separate when a method is called from when it actually runs. That’s where the Active Object design pattern comes in. It lets us build more flexible and concurrent systems. One key feature is that each method call is wrapped up in an object and placed in a queue. The Active Object then processes this queue in a separate thread.

Now, let’s simplify this with a practical example in Rust.

Implementation in Rust

Let’s create a basic logger. Logging can get complex, especially when dealing with external databases or web services. In such cases, it’s efficient to delegate the writing process to a separate thread.

The Basics

Before we dive into the details of the pattern, let’s set up the basics:

  • We’ll use different log levels, represented by an enum.
  • The LogMessage struct holds the log level and the log message.
use std::sync::mpsc;
use std::thread;

#[derive(Debug, Clone)]
enum LogLevel {
    Info,
    Warn,
    Error,
}

#[derive(Clone)]
struct LogMessage {
    level: LogLevel,
    message: String,
}

impl LogMessage {
    fn new(level: LogLevel, message: &str) -> LogMessage {
        LogMessage {
            level,
            message: message.to_string(),
        }
    }
}

The ActiveLogger struct

Now, onto the heart of our pattern. First, let’s implement the QueueMessage enum:

enum QueueMessage {
    Run((Box<dyn FnOnce(LogMessage) + Send>, LogMessage)),
    Terminate,
}

Here we see one of the most powerful features of Rust, in my opinion: enums.

A short explanation:

  • QueueMessage::Run takes a function and a message to process.
  • QueueMessage::Terminate signals the logger to stop processing messages.

Next, we create the ActiveLogger struct:

struct ActiveLogger {
    sender: mpsc::Sender<QueueMessage>,
}

impl ActiveLogger {
    fn new() -> ActiveLogger {
        let (sender, receiver) = mpsc::channel();
        thread::spawn(move || {
            while let Ok(message) = receiver.recv() {
                match message {
                    QueueMessage::Run((f, m)) => f(m.clone()),
                    QueueMessage::Terminate => break,
                }
            }
        });
        ActiveLogger { sender }
    }
    fn run<F>(&self, f: F, message: LogMessage)
    where
        F: FnOnce(LogMessage) + Send + 'static,
    {
        self.sender
            .send(QueueMessage::Run((Box::new(f), message)))
            .unwrap();
    }
    fn terminate(&self) {
        println!("Terminating...");
        self.sender.send(QueueMessage::Terminate).unwrap();
    }
}

Some notes:

  • It has an mpsc::Sender to send messages to the logger.
  • In the constructor (new() method), we spawn a thread to receive messages.
  • The run() method sends messages to the logger, including the processing function and the message.
  • The terminate() method sends a termination message to stop the logger.

Testing time

Time to test this setup:

fn main() {
    let logger = ActiveLogger::new();
    let message = LogMessage::new(LogLevel::Info, "Hello, world!");
    logger.run(
        |mes| {
            println!("{:?}:{}", mes.level, mes.message);
        },
        message,
    );
    logger.terminate();
}

Some notes:

  1. When we construct the logger, we start the thread.
  2. The logger.run() we supply a simple function with a LogMessage parameter, and the message to process.
  3. Lastly we terminate the logger.

Conclusion

Rust’s ‘fearless concurrency’ makes implementing the Active Object pattern straightforward. This pattern proves useful in various scenarios, such as message processing, printer spooling, or chat applications. It excels in situations where separating method execution from invocation and handling tasks concurrently brings benefits.

Leave a Reply

Your email address will not be published. Required fields are marked *