Powered By GitBook
Wildcard Subscriptions
There is no special code to subscribe with a wildcard subject. Wildcards are a normal part of the subject name. However, it is a common technique to use the subject provided with the incoming message to determine what to do with the message.
For example, you can subscribe using * and then act based on the actual subject.
Go
Java
JavaScript
Python
Ruby
TypeScript
C
1
nc, err := nats.Connect("demo.nats.io")
2
if err != nil {
3
log.Fatal(err)
4
}
5
defer nc.Close()
6
7
// Use a WaitGroup to wait for 2 messages to arrive
8
wg := sync.WaitGroup{}
9
wg.Add(2)
10
11
// Subscribe
12
if _, err := nc.Subscribe("time.*.east", func(m *nats.Msg) {
13
log.Printf("%s: %s", m.Subject, m.Data)
14
wg.Done()
15
}); err != nil {
16
log.Fatal(err)
17
}
18
19
// Wait for the 2 messages to come in
20
wg.Wait()
Copied!
1
Connection nc = Nats.connect("nats://demo.nats.io:4222");
2
3
// Use a latch to wait for 2 messages to arrive
4
CountDownLatch latch = new CountDownLatch(2);
5
6
// Create a dispatcher and inline message handler
7
Dispatcher d = nc.createDispatcher((msg) -> {
8
String subject = msg.getSubject();
9
String str = new String(msg.getData(), StandardCharsets.UTF_8);
10
System.out.println(subject + ": " + str);
11
latch.countDown();
12
});
13
14
// Subscribe
15
d.subscribe("time.*.east");
16
17
// Wait for messages to come in
18
latch.await();
19
20
// Close the connection
21
nc.close();
Copied!
1
let nc = NATS.connect({
2
url: "nats://demo.nats.io:4222"});
3
4
nc.subscribe('time.us.*', (msg, reply, subject) => {
5
// converting timezones correctly in node requires a library
6
// this doesn't take into account *many* things.
7
let time = "";
8
switch (subject) {
9
case 'time.us.east':
10
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
11
break;
12
case 'time.us.central':
13
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
14
break;
15
case 'time.us.mountain':
16
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
17
break;
18
case 'time.us.west':
19
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
20
break;
21
default:
22
time = "I don't know what you are talking about Willis";
23
}
24
t.log(subject, time);
25
});
Copied!
1
nc = NATS()
2
3
await nc.connect(servers=["nats://demo.nats.io:4222"])
4
5
# Use queue to wait for 2 messages to arrive
6
queue = asyncio.Queue()
7
async def cb(msg):
8
await queue.put_nowait(msg)
9
10
await nc.subscribe("time.*.east", cb=cb)
11
12
# Send 2 messages and wait for them to come in
13
await nc.publish("time.A.east", b'A')
14
await nc.publish("time.B.east", b'B')
15
16
msg_A = await queue.get()
17
msg_B = await queue.get()
18
19
print("Msg A:", msg_A)
20
print("Msg B:", msg_B)
Copied!
1
require 'nats/client'
2
require 'fiber'
3
4
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
5
Fiber.new do
6
f = Fiber.current
7
8
nc.subscribe("time.*.east") do |msg, reply|
9
f.resume Time.now
10
end
11
12
nc.publish("time.A.east", "A")
13
nc.publish("time.B.east", "B")
14
15
# Use the response
16
msg_A = Fiber.yield
17
puts "Msg A: #{msg_A}"
18
19
msg_B = Fiber.yield
20
puts "Msg B: #{msg_B}"
21
22
end.resume
23
end
Copied!
1
await nc.subscribe('time.us.*', (err, msg) => {
2
// converting timezones correctly in node requires a library
3
// this doesn't take into account *many* things.
4
let time = "";
5
switch (msg.subject) {
6
case 'time.us.east':
7
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
8
break;
9
case 'time.us.central':
10
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
11
break;
12
case 'time.us.mountain':
13
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
14
break;
15
case 'time.us.west':
16
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
17
break;
18
default:
19
time = "I don't know what you are talking about Willis";
20
}
21
console.log(msg.subject, time);
22
});
Copied!
1
static void
2
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
3
{
4
printf("Received msg: %s - %.*s\n",
5
natsMsg_GetSubject(msg),
6
natsMsg_GetDataLength(msg),
7
natsMsg_GetData(msg));
8
9
// Need to destroy the message!
10
natsMsg_Destroy(msg);
11
}
12
13
14
(...)
15
16
natsConnection *conn = NULL;
17
natsSubscription *sub = NULL;
18
natsStatus s;
19
20
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
21
if (s == NATS_OK)
22
s = natsConnection_Subscribe(&sub, conn, "time.*.east", onMsg, NULL);
23
24
(...)
25
26
27
// Destroy objects that were created
28
natsSubscription_Destroy(sub);
29
natsConnection_Destroy(conn);
Copied!
or do something similar with >:
Go
Java
JavaScript
Python
Ruby
TypeScript
C
1
nc, err := nats.Connect("demo.nats.io")
2
if err != nil {
3
log.Fatal(err)
4
}
5
defer nc.Close()
6
7
// Use a WaitGroup to wait for 4 messages to arrive
8
wg := sync.WaitGroup{}
9
wg.Add(4)
10
11
// Subscribe
12
if _, err := nc.Subscribe("time.>", func(m *nats.Msg) {
13
log.Printf("%s: %s", m.Subject, m.Data)
14
wg.Done()
15
}); err != nil {
16
log.Fatal(err)
17
}
18
19
// Wait for the 4 messages to come in
20
wg.Wait()
21
22
// Close the connection
23
nc.Close()
Copied!
1
Connection nc = Nats.connect("nats://demo.nats.io:4222");
2
3
// Use a latch to wait for 4 messages to arrive
4
CountDownLatch latch = new CountDownLatch(4);
5
6
// Create a dispatcher and inline message handler
7
Dispatcher d = nc.createDispatcher((msg) -> {
8
String subject = msg.getSubject();
9
String str = new String(msg.getData(), StandardCharsets.UTF_8);
10
System.out.println(subject + ": " + str);
11
latch.countDown();
12
});
13
14
// Subscribe
15
d.subscribe("time.>");
16
17
// Wait for messages to come in
18
latch.await();
19
20
// Close the connection
21
nc.close();
Copied!
1
let nc = NATS.connect({
2
url: "nats://demo.nats.io:4222"});
3
4
nc.subscribe('time.>', (msg, reply, subject) => {
5
// converting timezones correctly in node requires a library
6
// this doesn't take into account *many* things.
7
let time = "";
8
switch (subject) {
9
case 'time.us.east':
10
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
11
break;
12
case 'time.us.central':
13
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
14
break;
15
case 'time.us.mountain':
16
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
17
break;
18
case 'time.us.west':
19
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
20
break;
21
default:
22
time = "I don't know what you are talking about Willis";
23
}
24
t.log(subject, time);
25
});
Copied!
1
nc = NATS()
2
3
await nc.connect(servers=["nats://demo.nats.io:4222"])
4
5
# Use queue to wait for 4 messages to arrive
6
queue = asyncio.Queue()
7
async def cb(msg):
8
await queue.put(msg)
9
10
await nc.subscribe("time.>", cb=cb)
11
12
# Send 2 messages and wait for them to come in
13
await nc.publish("time.A.east", b'A')
14
await nc.publish("time.B.east", b'B')
15
await nc.publish("time.C.west", b'C')
16
await nc.publish("time.D.west", b'D')
17
18
for i in range(0, 4):
19
msg = await queue.get()
20
print("Msg:", msg)
21
22
await nc.close()
Copied!
1
require 'nats/client'
2
require 'fiber'
3
4
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
5
Fiber.new do
6
f = Fiber.current
7
8
nc.subscribe("time.>") do |msg, reply|
9
f.resume Time.now.to_f
10
end
11
12
nc.publish("time.A.east", "A")
13
nc.publish("time.B.east", "B")
14
nc.publish("time.C.west", "C")
15
nc.publish("time.D.west", "D")
16
17
# Use the response
18
4.times do
19
msg = Fiber.yield
20
puts "Msg: #{msg}"
21
end
22
end.resume
23
end
Copied!
1
await nc.subscribe('time.>', (err, msg) => {
2
// converting timezones correctly in node requires a library
3
// this doesn't take into account *many* things.
4
let time = "";
5
switch (msg.subject) {
6
case 'time.us.east':
7
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
8
break;
9
case 'time.us.central':
10
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
11
break;
12
case 'time.us.mountain':
13
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
14
break;
15
case 'time.us.west':
16
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
17
break;
18
default:
19
time = "I don't know what you are talking about Willis";
20
}
21
t.log(msg.subject, time);
22
});
Copied!
1
static void
2
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
3
{
4
printf("Received msg: %s - %.*s\n",
5
natsMsg_GetSubject(msg),
6
natsMsg_GetDataLength(msg),
7
natsMsg_GetData(msg));
8
9
// Need to destroy the message!
10
natsMsg_Destroy(msg);
11
}
12
13
14
(...)
15
16
natsConnection *conn = NULL;
17
natsSubscription *sub = NULL;
18
natsStatus s;
19
20
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
21
if (s == NATS_OK)
22
s = natsConnection_Subscribe(&sub, conn, "time.>", onMsg, NULL);
23
24
(...)
25
26
27
// Destroy objects that were created
28
natsSubscription_Destroy(sub);
29
natsConnection_Destroy(conn);
Copied!
The following example can be used to test these two subscribers. The * subscriber should receive at most 2 messages, while the > subscriber receives 4. More importantly the time.*.east subscriber won't receive on time.us.east.atlanta because that won't match.
Go
Java
JavaScript
Python
Ruby
TypeScript
1
nc, err := nats.Connect("demo.nats.io")
2
if err != nil {
3
log.Fatal(err)
4
}
5
defer nc.Close()
6
7
zoneID, err := time.LoadLocation("America/New_York")
8
if err != nil {
9
log.Fatal(err)
10
}
11
now := time.Now()
12
zoneDateTime := now.In(zoneID)
13
formatted := zoneDateTime.String()
14
15
nc.Publish("time.us.east", []byte(formatted))
16
nc.Publish("time.us.east.atlanta", []byte(formatted))
17
18
zoneID, err = time.LoadLocation("Europe/Warsaw")
19
if err != nil {
20
log.Fatal(err)
21
}
22
zoneDateTime = now.In(zoneID)
23
formatted = zoneDateTime.String()
24
25
nc.Publish("time.eu.east", []byte(formatted))
26
nc.Publish("time.eu.east.warsaw", []byte(formatted))
Copied!
1
Connection nc = Nats.connect("nats://demo.nats.io:4222");
2
ZoneId zoneId = ZoneId.of("America/New_York");
3
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(Instant.now(), zoneId);
4
String formatted = zonedDateTime.format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
5
6
nc.publish("time.us.east", formatted.getBytes(StandardCharsets.UTF_8));
7
nc.publish("time.us.east.atlanta", formatted.getBytes(StandardCharsets.UTF_8));
8
9
zoneId = ZoneId.of("Europe/Warsaw");
10
zonedDateTime = ZonedDateTime.ofInstant(Instant.now(), zoneId);
11
formatted = zonedDateTime.format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
12
nc.publish("time.eu.east", formatted.getBytes(StandardCharsets.UTF_8));
13
nc.publish("time.eu.east.warsaw", formatted.getBytes(StandardCharsets.UTF_8));
14
15
nc.flush(Duration.ZERO);
16
nc.close();
Copied!
1
nc.publish('time.us.east');
2
nc.publish('time.us.central');
3
nc.publish('time.us.mountain');
4
nc.publish('time.us.west');
Copied!
1
nc = NATS()
2
3
await nc.connect(servers=["nats://demo.nats.io:4222"])
4
5
await nc.publish("time.us.east", b'...')
6
await nc.publish("time.us.east.atlanta", b'...')
7
8
await nc.publish("time.eu.east", b'...')
9
await nc.publish("time.eu.east.warsaw", b'...')
10
11
await nc.close()
Copied!
1
NATS.start do |nc|
2
nc.publish("time.us.east", '...')
3
nc.publish("time.us.east.atlanta", '...')
4
5
nc.publish("time.eu.east", '...')
6
nc.publish("time.eu.east.warsaw", '...')
7
8
nc.drain
9
end
Copied!
1
nc.publish('time.us.east');
2
nc.publish('time.us.central');
3
nc.publish('time.us.mountain');
4
nc.publish('time.us.west');
Copied!
Last modified 1yr ago
Export as PDF
Copy link