Subscribing to a queue group is only slightly different than subscribing to a subject alone. The application simply includes a queue name with the subscription. The server will load balance between all members of the queue group. In a cluster setup, every member has the same chance of receiving a particular message.
Keep in mind that queue groups in NATS are dynamic and do not require any server configuration.
As an example, to subscribe to the queue workers with the subject updates:
nc, err := nats.Connect("demo.nats.io")if err !=nil { log.Fatal(err)}defer nc.Close()// Use a WaitGroup to wait for 10 messages to arrivewg :=sync.WaitGroup{}wg.Add(10)// Create a queue subscription on "updates" with queue name "workers"if _, err := nc.QueueSubscribe("updates", "workers", func(m *nats.Msg) { wg.Done()}); err !=nil { log.Fatal(err)}// Wait for messages to come inwg.Wait()
Connection nc =Nats.connect("nats://demo.nats.io:4222");// Use a latch to wait for 10 messages to arriveCountDownLatch latch =newCountDownLatch(10);// Create a dispatcher and inline message handlerDispatcher d =nc.createDispatcher((msg) -> {String str =newString(msg.getData(),StandardCharsets.UTF_8);System.out.println(str);latch.countDown();});// Subscribe to the "updates" subject with a queue group named "workers"d.subscribe("updates","workers");// Wait for a message to come inlatch.await(); // Close the connectionnc.close();
// dotnet add package NATS.NetusingNATS.Net;awaitusingvar client =newNatsClient();var count =0;// Subscribe to the "updates" subject with a queue group named "workers"awaitforeach (var msg inclient.SubscribeAsync<string>(subject:"updates", queueGroup:"workers")){Console.WriteLine($"Received {++count}: {msg.Subject}: {msg.Data}"); // Break after 10 messages if (count ==10) { break; }}Console.WriteLine("Done");
require'nats/client'require'fiber'NATS.start(servers:["nats://127.0.0.1:4222"]) do|nc|Fiber.newdo f = Fiber.current nc.subscribe("updates",queue:"worker") do|msg, reply| f.resumeTime.nowend nc.publish("updates","A")# Use the response msg = Fiber.yieldputs"Msg: #{msg}"end.resumeend
static void
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Need to destroy the message!
natsMsg_Destroy(msg);
}
(...)
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsStatus s;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Create a queue subscription on "updates" with queue name "workers"
if (s == NATS_OK)
s = natsConnection_QueueSubscribe(&sub, conn, "updates", "workers", onMsg, NULL);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
If you run this example with the publish examples that send to updates, you will see that one of the instances gets a message while the others you run won't. But the instance that receives the message will change.