NATS Logo by Example

Concurrent Message Processing in Messaging

By default, when a subscription is created, each message that is received it process sequentially. There can be multiple subscriptions setup in a [queue group][queue] in which case the NATS server will distribute messages to each member of the group.

However, even within a subscription, it may be desirable to handle messages concurrently. This example shows how this can be achieved in the clients that support it.

CLI Go Python Deno Node Rust C# Java Ruby Elixir C
Jump to the output or the recording
$ nbe run messaging/concurrent/rust
View the source code or learn how to run this example yourself

Code

use futures::stream::StreamExt;
use rand::Rng;
use std::{env, str::from_utf8, time::Duration};


#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {

Use the NATS_URL env variable if defined, otherwise fallback to the default.

    let nats_url = env::var("NATS_URL")
        .unwrap_or_else(|_| "nats://localhost:4222".to_string());


    let client = async_nats::connect(nats_url).await?;

Subscriber implements Rust iterator, so we can leverage combinators like take() to limit the messages intended to be consumed for this interaction.

    let subscription = client.subscribe("greet.*".to_string()).await?.take(50);

Publish set of messages, each with order identifier.

    for i in 0..50 {
        client
            .publish("greet.joe".to_string(), format!("hello {}", i).into())
            .await?;
    }

Iterate over messages concurrently. for_each_concurrent allows us to not wait for time-consuming operation and receive next message immediately. 25 is a limit for concurrent operations.

    subscription
        .for_each_concurrent(25, |message| async move {

Let’s simulate expensive operation.

            let num = rand::thread_rng().gen_range(0..500);
            tokio::time::sleep(Duration::from_millis(num)).await;

Print the result after sleep.

            println!(
                "received message: {:?}",
                from_utf8(&message.payload).unwrap()
            )
        })
        .await;


    Ok(())
}

Output

Network 3b5be259_default  Creating
Network 3b5be259_default  Created
Container 3b5be259-nats-1  Creating
Container 3b5be259-nats-1  Created
Container 3b5be259-nats-1  Starting
Container 3b5be259-nats-1  Started
received message: "hello 10"
received message: "hello 22"
received message: "hello 23"
received message: "hello 0"
received message: "hello 21"
received message: "hello 18"
received message: "hello 24"
received message: "hello 17"
received message: "hello 1"
received message: "hello 4"
received message: "hello 5"
received message: "hello 27"
received message: "hello 33"
received message: "hello 8"
received message: "hello 15"
received message: "hello 29"
received message: "hello 3"
received message: "hello 2"
received message: "hello 37"
received message: "hello 40"
received message: "hello 16"
received message: "hello 25"
received message: "hello 32"
received message: "hello 13"
received message: "hello 31"
received message: "hello 6"
received message: "hello 39"
received message: "hello 19"
received message: "hello 7"
received message: "hello 44"
received message: "hello 45"
received message: "hello 26"
received message: "hello 14"
received message: "hello 46"
received message: "hello 11"
received message: "hello 35"
received message: "hello 20"
received message: "hello 9"
received message: "hello 12"
received message: "hello 34"
received message: "hello 28"
received message: "hello 38"
received message: "hello 30"
received message: "hello 42"
received message: "hello 49"
received message: "hello 41"
received message: "hello 36"
received message: "hello 47"
received message: "hello 43"
received message: "hello 48"

Recording

Note, playback is half speed to make it a bit easier to follow.