NATS Logo by Example

Request-Reply in JetStream

This is an experimental preview of a possible request-reply API for JetStream. With core NATS, if a given client is not showing interest in a subject at the time a message is published on that subject, the message will not be received.

In general, this behavior is expected of request-reply since these are expected to behave like a phone call where someone needs to pick up in order to talk. The alternative is relying on a mailbox which is essentially what a peristent stream enables.

The question is what happens with the reply if a message can be sent, queued, and the responder comes online later when the original requester is now offline?

These are some active questions the NATS team are evaluating if you want to show your interest and use cases!

CLI Go Python Deno Node Rust C# Java Ruby Elixir C
Jump to the output or the recording
$ nbe run jetstream/request-reply/go
View the source code or learn how to run this example yourself

Code

package main


import (
	"fmt"
	"os"
	"time"


	"github.com/nats-io/nats.go"
)


func main() {
	natsURL := os.Getenv("NATS_URL")


	nc, _ := nats.Connect(natsURL)
	defer nc.Drain()


	js, _ := nc.JetStream()

Create a [work-queue][wq] stream that will act as a buffer for requests.

	js.AddStream(&nats.StreamConfig{
		Name:      "REQUESTS",
		Subjects:  []string{"requests.*"},
		Retention: nats.WorkQueuePolicy,
	})

Create an ephemeral consumer + subscription responsible for replying.

	sub, _ := js.Subscribe("requests.*", func(msg *nats.Msg) {
		var r string
		switch msg.Subject {
		case "requests.order-sandwich":
			r = "🥪"
		case "requests.order-bagel":
			r = "🥯"
		case "requests.order-flatbread":
			r = "🥙"
		default:
			return
		}
		msg.Respond([]byte(r))
	})
	defer sub.Drain()

Send some requests.

	rep, _ := js.Request("requests.order-sandwich", nil, time.Second)
	fmt.Println(string(rep.Data))


	rep, _ = js.Request("requests.order-flatbread", nil, time.Second)
	fmt.Println(string(rep.Data))

If a request cannot be fulfilled, the message is terminated.

	_, err := js.Request("requests.order-drink", nil, time.Second)
	fmt.Printf("timeout? %v\n", err == nats.ErrTimeout)


	info, _ := js.StreamInfo("REQUESTS")
	fmt.Printf("%d remaining in the stream\n", info.State.Msgs)
}

Output

Network 7b333570_default  Creating
Network 7b333570_default  Created
Container 7b333570-nats-1  Creating
Container 7b333570-nats-1  Created
Container 7b333570-nats-1  Starting
Container 7b333570-nats-1  Started
🥪
🥙
timeout? true
0 remaining in the stream

Recording

Note, playback is half speed to make it a bit easier to follow.