Powered By GitBook
Queue Subscriptions
Subscribing to a queue group is only slightly different than subscribing to a subject alone. The application simply includes a queue name with the subscription. The server will load balance between all members of the queue group. In a cluster setup, every member has the same chance of receiving a particular message.
Keep in mind that queue groups in NATS are dynamic and do not require any server configuration.
As an example, to subscribe to the queue workers with the subject updates:
Go
Java
JavaScript
Python
Ruby
TypeScript
C
1
nc, err := nats.Connect("demo.nats.io")
2
if err != nil {
3
log.Fatal(err)
4
}
5
defer nc.Close()
6
7
// Use a WaitGroup to wait for 10 messages to arrive
8
wg := sync.WaitGroup{}
9
wg.Add(10)
10
11
// Create a queue subscription on "updates" with queue name "workers"
12
if _, err := nc.QueueSubscribe("updates", "workers", func(m *nats.Msg) {
13
wg.Done()
14
}); err != nil {
15
log.Fatal(err)
16
}
17
18
// Wait for messages to come in
19
wg.Wait()
Copied!
1
Connection nc = Nats.connect("nats://demo.nats.io:4222");
2
3
// Use a latch to wait for 10 messages to arrive
4
CountDownLatch latch = new CountDownLatch(10);
5
6
// Create a dispatcher and inline message handler
7
Dispatcher d = nc.createDispatcher((msg) -> {
8
String str = new String(msg.getData(), StandardCharsets.UTF_8);
9
System.out.println(str);
10
latch.countDown();
11
});
12
13
// Subscribe
14
d.subscribe("updates", "workers");
15
16
// Wait for a message to come in
17
latch.await();
18
19
// Close the connection
20
nc.close();
Copied!
1
let nc = NATS.connect({
2
url: "nats://demo.nats.io:4222"});
3
4
nc.subscribe('updates', {queue: "workers"}, (msg) => {
5
t.log('worker got message', msg);
6
});
Copied!
1
nc = NATS()
2
3
await nc.connect(servers=["nats://demo.nats.io:4222"])
4
5
future = asyncio.Future()
6
7
async def cb(msg):
8
nonlocal future
9
future.set_result(msg)
10
11
await nc.subscribe("updates", queue="workers", cb=cb)
12
await nc.publish("updates", b'All is Well')
13
14
msg = await asyncio.wait_for(future, 1)
15
print("Msg", msg)
Copied!
1
require 'nats/client'
2
require 'fiber'
3
4
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
5
Fiber.new do
6
f = Fiber.current
7
8
nc.subscribe("updates", queue: "worker") do |msg, reply|
9
f.resume Time.now
10
end
11
12
nc.publish("updates", "A")
13
14
# Use the response
15
msg = Fiber.yield
16
puts "Msg: #{msg}"
17
end.resume
18
end
Copied!
1
await nc.subscribe('updates', (err, msg) => {
2
t.log('worker got message', msg.data);
3
}, {queue: "workers"});
Copied!
1
static void
2
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
3
{
4
printf("Received msg: %s - %.*s\n",
5
natsMsg_GetSubject(msg),
6
natsMsg_GetDataLength(msg),
7
natsMsg_GetData(msg));
8
9
// Need to destroy the message!
10
natsMsg_Destroy(msg);
11
}
12
13
14
(...)
15
16
natsConnection *conn = NULL;
17
natsSubscription *sub = NULL;
18
natsStatus s;
19
20
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
21
22
// Create a queue subscription on "updates" with queue name "workers"
23
if (s == NATS_OK)
24
s = natsConnection_QueueSubscribe(&sub, conn, "updates", "workers", onMsg, NULL);
25
26
(...)
27
28
29
// Destroy objects that were created
30
natsSubscription_Destroy(sub);
31
natsConnection_Destroy(conn);
Copied!
If you run this example with the publish examples that send to updates, you will see that one of the instances gets a message while the others you run won't. But the instance that receives the message will change.
Last modified 1yr ago
Export as PDF
Copy link