NATS Logo by Example

Pull Consumers in JetStream

A pull consumer allows for the application to fetch one or more messages on-demand using a subscription bound to the consumer. This allows the application to control the flow of the messages coming in so it can process and ack them in an appropriate amount of time.

A consumer can either be durable or ephemeral. A durable consumer will have its state tracked on the server, most importantly, the last acknowledged message from the client.

Ephemeral consumers are useful as one-off needs and are a bit cheaper in terms of resources and management. However, ephemerals do not support multiple subscribers nor do they (of course) persist after the primary subscriber unsubscribes. The server will automatically clean up (delete) the consumer after a period of time.

Since each subscription is fetching messages on-demand, multiple subscriptions can be create bound to the same pull consumer without any additional configuration. Each subscriber can fetch batches of messages and process them concurrently.

It is important to note that the messages in a given batch are ordered with respect to each other, but each subscriber will be handling a batch independently. If there is a need to have determinstic partitioning for scalable order processing, learn more here.

CLI Go Python Deno Node Rust C# Java Ruby Elixir C
Jump to the output or the recording
$ nbe run jetstream/pull-consumer/go
View the source code or learn how to run this example yourself

Code

NOTE: This example requires a fix that came after the v1.16.0 release of the Go client which corrected an issue with ephemeral pull consumers.

package main


import (
	"fmt"
	"os"
	"time"


	"github.com/nats-io/nats.go"
)


func main() {

Use the env variable if running in the container, otherwise use the default.

	url := os.Getenv("NATS_URL")
	if url == "" {
		url = nats.DefaultURL
	}

Create an unauthenticated connection to NATS.

	nc, _ := nats.Connect(url)
	defer nc.Drain()

Access the JetStreamContext for managing streams and consumers as well as for publishing and subscription convenience methods.

	js, _ := nc.JetStream()


	streamName := "EVENTS"

Declare a simple limits-based stream.

	js.AddStream(&nats.StreamConfig{
		Name:     streamName,
		Subjects: []string{"events.>"},
	})

Publish a few messages for the example.

	js.Publish("events.1", nil)
	js.Publish("events.2", nil)
	js.Publish("events.3", nil)

The JetStreamContext provides a simple way to create an ephemeral pull consumer. Simply durable name (second parameter) and specify either the subject or the explicit stream to bind to using BindStream. If the subject is provided, it will be used to look up the stream the subject is bound to.

	sub, _ := js.PullSubscribe("", "", nats.BindStream(streamName))

An ephemeral consumer has a name generated on the server-side. Since there is only one consumer so far, let’s just get the first one.

	ephemeralName := <-js.ConsumerNames(streamName)
	fmt.Printf("ephemeral name is %q\n", ephemeralName)

We can fetch messages in batches. The first argument being the batch size which is the maximum number of messages that should be returned. For this first fetch, we ask for two and we will get those since they are in the stream.

	msgs, _ := sub.Fetch(2)
	fmt.Printf("got %d messages\n", len(msgs))

Let’s also ack them so they are not redelivered.

	msgs[0].Ack()
	msgs[1].Ack()

Even though we are requesting a large batch (we know there is only one message left in the stream), we will get that one message right away.

	msgs, _ = sub.Fetch(100)
	fmt.Printf("got %d messages\n", len(msgs))


	msgs[0].Ack()

Finally, if we are at the end of the stream and we call fetch, the call will be blocked until the “max wait” time which is 5 seconds by default, but this can be set explicitly as an option.

	_, err := sub.Fetch(1, nats.MaxWait(time.Second))
	fmt.Printf("timeout? %v\n", err == nats.ErrTimeout)

Unsubscribing this subscription will result in the ephemeral consumer being deleted.

	sub.Unsubscribe()

Create a basic durable pull consumer by specifying the name and the Explicit ack policy (which is required). The AckWait is only added just to expedite one step in this example since it is normally 30 seconds. Note, the js.PullSubscribe can be used to create a durable consumer as well relying on subscription options, but this shows a more explicit way. In addition, you can use the UpdateConsumer method passing configuration to change on-demand.

	consumerName := "processor"
	js.AddConsumer(streamName, &nats.ConsumerConfig{
		Durable:   consumerName,
		AckPolicy: nats.AckExplicitPolicy,
	})

Unlike the js.PullSubscribe abstraction above, creating a consumer still requires a subcription to be setup to actually process the messages. This is done by using the nats.Bind option.

	sub1, _ := js.PullSubscribe("", consumerName, nats.BindStream(streamName))

All the same fetching works as above.

	msgs, _ = sub1.Fetch(1)
	fmt.Printf("received %q from sub1\n", msgs[0].Subject)
	msgs[0].Ack()

However, now we can unsubscribe and re-subscribe to pick up where we left off.

	sub1.Unsubscribe()
	sub1, _ = js.PullSubscribe("", consumerName, nats.BindStream(streamName))


	msgs, _ = sub1.Fetch(1)
	fmt.Printf("received %q from sub1 (after reconnect)\n", msgs[0].Subject)
	msgs[0].Ack()

We can also transparently add another subscription (typically in a separate process) and fetch independently.

	sub2, _ := js.PullSubscribe("", consumerName, nats.BindStream(streamName))


	msgs, _ = sub2.Fetch(1)
	fmt.Printf("received %q from sub2\n", msgs[0].Subject)
	msgs[0].Ack()

If we try to fetch from sub1 again, notice we will timeout since sub2 processed the third message already.

	_, err = sub1.Fetch(1, nats.MaxWait(time.Second))
	fmt.Printf("timeout on sub1? %v\n", err == nats.ErrTimeout)

Explicitly clean up. Note nc.Drain() above will do this automatically.

	sub1.Unsubscribe()
	sub2.Unsubscribe()
}

Output

Network 62be849f_default  Creating
Network 62be849f_default  Created
Container 62be849f-nats-1  Creating
Container 62be849f-nats-1  Created
Container 62be849f-nats-1  Starting
Container 62be849f-nats-1  Started
ephemeral name is "wSgbguE5"
got 2 messages
got 1 messages
timeout? true
received "events.1" from sub1
received "events.2" from sub1 (after reconnect)
received "events.3" from sub2
timeout on sub1? true

Recording

Note, playback is half speed to make it a bit easier to follow.