Powered By GitBook
Request-Reply Semantics
The pattern of sending a message and receiving a response is encapsulated in most client libraries into a request method. Under the covers this method will publish a message with a unique reply-to subject and wait for the response before returning.
In the older versions of some libraries a completely new reply-to subject is created each time. In newer versions, a subject hierarchy is used so that a single subscriber in the client library listens for a wildcard, and requests are sent with a unique child subject of a single subject.
The primary difference between the request method and publishing with a reply-to is that the library is only going to accept one response, and in most libraries the request will be treated as a synchronous action. The library may even provide a way to set the timeout.
For example, updating the previous publish example we may request time with a one second timeout:
Go
Java
JavaScript
Python
Ruby
TypeScript
C
1
nc, err := nats.Connect("demo.nats.io")
2
if err != nil {
3
log.Fatal(err)
4
}
5
defer nc.Close()
6
7
// Send the request
8
msg, err := nc.Request("time", nil, time.Second)
9
if err != nil {
10
log.Fatal(err)
11
}
12
13
// Use the response
14
log.Printf("Reply: %s", msg.Data)
15
16
// Close the connection
17
nc.Close()
Copied!
1
Connection nc = Nats.connect("nats://demo.nats.io:4222");
2
3
// Send the request
4
Message msg = nc.request("time", null, Duration.ofSeconds(1));
5
6
// Use the response
7
System.out.println(new String(msg.getData(), StandardCharsets.UTF_8));
8
9
// Close the connection
10
nc.close();
Copied!
1
let nc = NATS.connect({url: "nats://demo.nats.io:4222"});
2
3
// set up a subscription to process the request
4
nc.subscribe('time', (msg, reply) => {
5
if(reply) {
6
nc.publish(reply, new Date().toLocaleTimeString());
7
}
8
});
9
10
nc.requestOne('time', (msg) => {
11
t.log('the time is', msg);
12
nc.close();
13
});
Copied!
1
nc = NATS()
2
3
async def sub(msg):
4
await nc.publish(msg.reply, b'response')
5
6
await nc.connect(servers=["nats://demo.nats.io:4222"])
7
await nc.subscribe("time", cb=sub)
8
9
# Send the request
10
try:
11
msg = await nc.request("time", b'', timeout=1)
12
# Use the response
13
print("Reply:", msg)
14
except asyncio.TimeoutError:
15
print("Timed out waiting for response")
Copied!
1
require 'nats/client'
2
require 'fiber'
3
4
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
5
nc.subscribe("time") do |msg, reply|
6
nc.publish(reply, "response")
7
end
8
9
Fiber.new do
10
# Use the response
11
msg = nc.request("time", "")
12
puts "Reply: #{msg}"
13
end.resume
14
end
Copied!
1
let msg = await nc.request('time', 1000);
2
t.log('the time is', msg.data);
3
nc.close();
Copied!
1
natsConnection *conn = NULL;
2
natsMsg *msg = NULL;
3
natsStatus s = NATS_OK;
4
5
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
6
7
// Send a request and wait for up to 1 second
8
if (s == NATS_OK)
9
s = natsConnection_RequestString(&msg, conn, "request", "this is the request", 1000);
10
11
if (s == NATS_OK)
12
{
13
printf("Received msg: %s - %.*s\n",
14
natsMsg_GetSubject(msg),
15
natsMsg_GetDataLength(msg),
16
natsMsg_GetData(msg));
17
18
// Destroy the message that was received
19
natsMsg_Destroy(msg);
20
}
21
22
(...)
23
24
// Destroy objects that were created
25
natsConnection_Destroy(conn);
Copied!
You can think of request-reply in the library as a subscribe, get one message, unsubscribe pattern. In Go this might look something like:
1
sub, err := nc.SubscribeSync(replyTo)
2
if err != nil {
3
log.Fatal(err)
4
}
5
6
// Send the request immediately
7
nc.PublishRequest(subject, replyTo, []byte(input))
8
nc.Flush()
9
10
// Wait for a single response
11
for {
12
msg, err := sub.NextMsg(1 * time.Second)
13
if err != nil {
14
log.Fatal(err)
15
}
16
17
response = string(msg.Data)
18
break
19
}
20
sub.Unsubscribe()
Copied!

Scatter-Gather

You can expand the request-reply pattern into something often called scatter-gather. To receive multiple messages, with a timeout, you could do something like the following, where the loop getting messages is using time as the limitation, not the receipt of a single message:
1
sub, err := nc.SubscribeSync(replyTo)
2
if err != nil {
3
log.Fatal(err)
4
}
5
nc.Flush()
6
7
// Send the request
8
nc.PublishRequest(subject, replyTo, []byte(input))
9
10
// Wait for a single response
11
max := 100 * time.Millisecond
12
start := time.Now()
13
for time.Now().Sub(start) < max {
14
msg, err := sub.NextMsg(1 * time.Second)
15
if err != nil {
16
break
17
}
18
19
responses = append(responses, string(msg.Data))
20
}
21
sub.Unsubscribe()
Copied!
Or, you can loop on a counter and a timeout to try to get at least N responses:
1
sub, err := nc.SubscribeSync(replyTo)
2
if err != nil {
3
log.Fatal(err)
4
}
5
nc.Flush()
6
7
// Send the request
8
nc.PublishRequest(subject, replyTo, []byte(input))
9
10
// Wait for a single response
11
max := 500 * time.Millisecond
12
start := time.Now()
13
for time.Now().Sub(start) < max {
14
msg, err := sub.NextMsg(1 * time.Second)
15
if err != nil {
16
break
17
}
18
19
responses = append(responses, string(msg.Data))
20
21
if len(responses) >= minResponses {
22
break
23
}
24
}
25
sub.Unsubscribe()
Copied!
Last modified 1yr ago
Export as PDF
Copy link