A high-performance multi-producer multi-consumer (MPMC) queue implementation based on Tokio.
- Asynchronous implementation based on Tokio
- Support for multi-producer multi-consumer pattern
- Message processing using consumer pool
- Simple and intuitive API
- Complete error handling
- Queue capacity control
Add the following dependency to your Cargo.toml:
[dependencies]
tokio-mpmc = "0.2"
use tokio_mpmc::Queue;
#[tokio::main]
async fn main() {
// Create a queue with capacity of 100
let queue = Queue::new(100);
// Send a message
if let Err(e) = queue.send("Hello").await {
eprintln!("Send failed: {}", e);
}
// Receive a message
match queue.receive().await {
Ok(Some(msg)) => println!("Received message: {}", msg),
Ok(None) => println!("Queue is empty"),
Err(e) => eprintln!("Receive failed: {}", e),
}
// Close the queue
drop(queue);
}
use tokio_mpmc::channel;
#[tokio::main]
async fn main() {
// Create a channel with capacity of 100
let (tx, rx) = channel(100);
// Send a message
if let Err(e) = tx.send("Hello").await {
eprintln!("Send failed: {}", e);
}
// Receive a message
match rx.recv().await {
Ok(Some(msg)) => println!("Received message: {}", msg),
Ok(None) => println!("Channel is closed"),
Err(e) => eprintln!("Receive failed: {}", e),
}
// Close the channel
drop(tx);
}
cargo criterion --message-format=json | criterion-table > BENCHMARKS.md
non-io | 65.96 us (✅ 1.00x) | 166.24 us (❌ 2.52x slower) | 780.75 us (❌ 11.84x slower) |
io | 48.12 ms (✅ 1.00x) | 50.64 ms (✅ 1.05x slower) | 202.26 ms (❌ 4.20x slower) |
Note: non-io means no IO operation, io means IO operation.
See benchmark code
This project is licensed under the Apache-2.0 License. See the LICENSE file for details.