NATS Logo by Example

Core Publish-Subcribe in Messaging

This example demonstrates the core NATS publish-subscribe behavior. This is the fundamental pattern that all other NATS patterns and higher-level APIs build upon. There are a few takeaways from this example:

  • Delivery is an at-most-once. For MQTT users, this is referred to as Quality of Service (QoS) 0.
  • There are two circumstances when a published message won’t be delivered to a subscriber:
    • The subscriber does not have an active connection to the server (i.e. the client is temporarily offline for some reason)
    • There is a network interruption where the message is ultimately dropped
  • Messages are published to subjects which can be one or more concrete tokens, e.g. greet.bob. Subscribers can utilize wildcards to show interest on a set of matching subjects.
CLI Go Python Deno Node Rust C# Java Ruby Elixir C
Jump to the output or the recording
$ nbe run messaging/pub-sub/python
View the source code or learn how to run this example yourself

Code

import os
import asyncio


import nats
from nats.errors import TimeoutError

Get the list of servers.

servers = os.environ.get("NATS_URL", "nats://localhost:4222").split(",")


async def main():

Create the connection to NATS which takes a list of servers.

    nc = await nats.connect(servers=servers)

Messages are published to subjects. Although there are no subscribers, this will be published successfully.

    await nc.publish("greet.joe", b"hello")

Let’s create a subscription on the greet.* wildcard.

    sub = await nc.subscribe("greet.*")

For a synchronous subscription, we need to fetch the next message. However.. since the publish occured before the subscription was established, this is going to timeout.

    try:
        msg = await sub.next_msg(timeout=0.1)
    except TimeoutError:
        pass

Publish a couple messages.

    await nc.publish("greet.joe", b"hello")
    await nc.publish("greet.pam", b"hello")

Since the subscription is established, the published messages will immediately be broadcasted to all subscriptions. They will land in # their buffer for subsequent NextMsg calls.

    msg = await sub.next_msg(timeout=0.1)
    print(f"{msg.data} on subject {msg.subject}")


    msg = await sub.next_msg(timeout=0.1)
    print(f"{msg.data} on subject {msg.subject}")

One more for good measures..

    await nc.publish("greet.bob", b"hello")


    msg = await sub.next_msg(timeout=0.1)
    print(f"{msg.data} on subject {msg.subject}")

Drain the subscription and connection. In contrast to unsubscribe, drain will process any queued messages before removing interest.

    await sub.unsubscribe()
    await nc.drain()




if __name__ == '__main__':
    asyncio.run(main())

Output

Network 4cc20542_default  Creating
Network 4cc20542_default  Created
Container 4cc20542-nats-1  Creating
Container 4cc20542-nats-1  Created
Container 4cc20542-nats-1  Starting
Container 4cc20542-nats-1  Started
b'hello' on subject greet.joe
b'hello' on subject greet.pam
b'hello' on subject greet.bob

Recording

Note, playback is half speed to make it a bit easier to follow.