Introduction
Sometimes, it’s handy to separate when a method is called from when it actually runs. That’s where the Active Object design pattern comes in. It lets us build more flexible and concurrent systems. One key feature is that each method call is wrapped up in an object and placed in a queue. The Active Object then processes this queue in a separate thread.
Now, let’s simplify this with a practical example in Rust.
Implementation in Rust
Let’s create a basic logger. Logging can get complex, especially when dealing with external databases or web services. In such cases, it’s efficient to delegate the writing process to a separate thread.
The Basics
Before we dive into the details of the pattern, let’s set up the basics:
- We’ll use different log levels, represented by an enum.
- The
LogMessage
struct holds the log level and the log message.
use std::sync::mpsc;
use std::thread;
#[derive(Debug, Clone)]
enum LogLevel {
Info,
Warn,
Error,
}
#[derive(Clone)]
struct LogMessage {
level: LogLevel,
message: String,
}
impl LogMessage {
fn new(level: LogLevel, message: &str) -> LogMessage {
LogMessage {
level,
message: message.to_string(),
}
}
}
The ActiveLogger
struct
Now, onto the heart of our pattern. First, let’s implement the QueueMessage
enum:
enum QueueMessage {
Run((Box<dyn FnOnce(LogMessage) + Send>, LogMessage)),
Terminate,
}
Here we see one of the most powerful features of Rust, in my opinion: enums.
A short explanation:
QueueMessage::Run
takes a function and a message to process.QueueMessage::Terminate
signals the logger to stop processing messages.
Next, we create the ActiveLogger
struct:
struct ActiveLogger {
sender: mpsc::Sender<QueueMessage>,
}
impl ActiveLogger {
fn new() -> ActiveLogger {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
while let Ok(message) = receiver.recv() {
match message {
QueueMessage::Run((f, m)) => f(m.clone()),
QueueMessage::Terminate => break,
}
}
});
ActiveLogger { sender }
}
fn run<F>(&self, f: F, message: LogMessage)
where
F: FnOnce(LogMessage) + Send + 'static,
{
self.sender
.send(QueueMessage::Run((Box::new(f), message)))
.unwrap();
}
fn terminate(&self) {
println!("Terminating...");
self.sender.send(QueueMessage::Terminate).unwrap();
}
}
Some notes:
- It has an
mpsc::Sender
to send messages to the logger. - In the constructor (
new()
method), we spawn a thread to receive messages. - The
run()
method sends messages to the logger, including the processing function and the message. - The
terminate()
method sends a termination message to stop the logger.
Testing time
Time to test this setup:
fn main() {
let logger = ActiveLogger::new();
let message = LogMessage::new(LogLevel::Info, "Hello, world!");
logger.run(
|mes| {
println!("{:?}:{}", mes.level, mes.message);
},
message,
);
logger.terminate();
}
Some notes:
- When we construct the logger, we start the thread.
- The
logger.run()
we supply a simple function with aLogMessage
parameter, and the message to process. - Lastly we terminate the logger.
Conclusion
Rust’s ‘fearless concurrency’ makes implementing the Active Object pattern straightforward. This pattern proves useful in various scenarios, such as message processing, printer spooling, or chat applications. It excels in situations where separating method execution from invocation and handling tasks concurrently brings benefits.