Multi-Stream Consumption in JetStream
There may be use cases where a fan-in of messages across streams
may be desired. One way to achieve this is to create a push consumer
per stream and specify the same DeliverSubject and, optionally,
DeliverGroup. This will result in each consumer delivering messages
to clients subscribed to the subject and/or part of a queue group.
This example will demonstrate how to configure the consumers and subscription to achieve this fan-in consumption.
$ nbe run jetstream/multi-stream-consumption/goView the source code or learn how to run this example yourself
Code
package main
import (
	"fmt"
	"os"
	"time"
	"github.com/nats-io/nats.go"
)
func main() {
	natsURL := os.Getenv("NATS_URL")
	if natsURL == "" {
		natsURL = nats.DefaultURL
	}
	nc, _ := nats.Connect(natsURL)
	js, _ := nc.JetStream()
        Create a stream for each region.
	js.AddStream(&nats.StreamConfig{
		Name:     "EVENTS-EU",
		Subjects: []string{"events.eu.>"},
	})
	js.AddStream(&nats.StreamConfig{
		Name:     "EVENTS-US",
		Subjects: []string{"events.us.>"},
	})
        Create a consumer for each stream. Both publish to the same deliver subject. This is a straightforward way to do this in a single account. It is recommended a user is created with specific permissions to subscribe to this subject.
	js.AddConsumer("EVENTS-EU", &nats.ConsumerConfig{
		Durable:        "processor",
		DeliverSubject: "push.events",
		DeliverGroup:   "processor",
		AckPolicy:      nats.AckExplicitPolicy,
	})
	js.AddConsumer("EVENTS-US", &nats.ConsumerConfig{
		Durable:        "processor",
		DeliverSubject: "push.events",
		DeliverGroup:   "processor",
		AckPolicy:      nats.AckExplicitPolicy,
	})
        Publish messages to each stream.
	js.Publish("events.eu.page_loaded", nil)
	js.Publish("events.eu.input_focused", nil)
	js.Publish("events.us.page_loaded", nil)
	js.Publish("events.us.mouse_clicked", nil)
	js.Publish("events.eu.mouse_clicked", nil)
	js.Publish("events.us.input_focused", nil)
        Subscribe to the deliver subject with core NATS subscription. Observe that messages from both streams are being received and can be ack’ed.
	sub, _ := nc.QueueSubscribeSync("push.events", "processor")
	defer sub.Drain()
	for {
		msg, err := sub.NextMsg(time.Second)
		if err == nats.ErrTimeout {
			break
		}
		fmt.Println(msg.Subject)
		msg.Ack()
	}
        Confirm the consumer state is updated.
	info1, _ := js.ConsumerInfo("EVENTS-EU", "processor")
	fmt.Printf("eu: last delivered: %d, num pending: %d\n", info1.Delivered.Stream, info1.NumPending)
	info2, _ := js.ConsumerInfo("EVENTS-US", "processor")
	fmt.Printf("us: last delivered: %d, num pending: %d\n", info2.Delivered.Stream, info2.NumPending)
}
        Output
Network 59bfde6d_default Creating Network 59bfde6d_default Created Container 59bfde6d-nats-1 Creating Container 59bfde6d-nats-1 Created Container 59bfde6d-nats-1 Starting Container 59bfde6d-nats-1 Started events.eu.page_loaded events.eu.input_focused events.us.page_loaded events.eu.mouse_clicked events.us.mouse_clicked events.us.input_focused eu: last delivered: 3, num pending: 0 us: last delivered: 3, num pending: 0