The pattern of sending a message and receiving a response is encapsulated in most client libraries into a request method. Under the covers this method will publish a message with a unique reply-to subject and wait for the response before returning.
In the older versions of some libraries a completely new reply-to subject is created each time. In newer versions, a subject hierarchy is used so that a single subscriber in the client library listens for a wildcard, and requests are sent with a unique child subject of a single subject.
The primary difference between the request method and publishing with a reply-to is that the library is only going to accept one response, and in most libraries the request will be treated as a synchronous action. The library may even provide a way to set the timeout.
For example, updating the previous publish example we may request time
with a one second timeout:
nc, err := nats.Connect("demo.nats.io")if err != nil {log.Fatal(err)}defer nc.Close()​// Send the requestmsg, err := nc.Request("time", nil, time.Second)if err != nil {log.Fatal(err)}​// Use the responselog.Printf("Reply: %s", msg.Data)​// Close the connectionnc.Close()
Connection nc = Nats.connect("nats://demo.nats.io:4222");​// Send the requestMessage msg = nc.request("time", null, Duration.ofSeconds(1));​// Use the responseSystem.out.println(new String(msg.getData(), StandardCharsets.UTF_8));​// Close the connectionnc.close();
let nc = NATS.connect({url: "nats://demo.nats.io:4222"});​// set up a subscription to process the requestnc.subscribe('time', (msg, reply) => {if(reply) {nc.publish(reply, new Date().toLocaleTimeString());}});​nc.requestOne('time', (msg) => {t.log('the time is', msg);nc.close();});
nc = NATS()​async def sub(msg):await nc.publish(msg.reply, b'response')​await nc.connect(servers=["nats://demo.nats.io:4222"])await nc.subscribe("time", cb=sub)​# Send the requesttry:msg = await nc.request("time", b'', timeout=1)# Use the responseprint("Reply:", msg)except asyncio.TimeoutError:print("Timed out waiting for response")
require 'nats/client'require 'fiber'​NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|nc.subscribe("time") do |msg, reply|nc.publish(reply, "response")end​Fiber.new do# Use the responsemsg = nc.request("time", "")puts "Reply: #{msg}"end.resumeend
let msg = await nc.request('time', 1000);t.log('the time is', msg.data);nc.close();
natsConnection *conn = NULL;natsMsg *msg = NULL;natsStatus s = NATS_OK;​s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);​// Send a request and wait for up to 1 secondif (s == NATS_OK)s = natsConnection_RequestString(&msg, conn, "request", "this is the request", 1000);​if (s == NATS_OK){printf("Received msg: %s - %.*s\n",natsMsg_GetSubject(msg),natsMsg_GetDataLength(msg),natsMsg_GetData(msg));​// Destroy the message that was receivednatsMsg_Destroy(msg);}​(...)​// Destroy objects that were creatednatsConnection_Destroy(conn);
You can think of request-reply in the library as a subscribe, get one message, unsubscribe pattern. In Go this might look something like:
sub, err := nc.SubscribeSync(replyTo)if err != nil {log.Fatal(err)}​// Send the request immediatelync.PublishRequest(subject, replyTo, []byte(input))nc.Flush()​// Wait for a single responsefor {msg, err := sub.NextMsg(1 * time.Second)if err != nil {log.Fatal(err)}​response = string(msg.Data)break}sub.Unsubscribe()
You can expand the request-reply pattern into something often called scatter-gather. To receive multiple messages, with a timeout, you could do something like the following, where the loop getting messages is using time as the limitation, not the receipt of a single message:
sub, err := nc.SubscribeSync(replyTo)if err != nil {log.Fatal(err)}nc.Flush()​// Send the requestnc.PublishRequest(subject, replyTo, []byte(input))​// Wait for a single responsemax := 100 * time.Millisecondstart := time.Now()for time.Now().Sub(start) < max {msg, err := sub.NextMsg(1 * time.Second)if err != nil {break}​responses = append(responses, string(msg.Data))}sub.Unsubscribe()
Or, you can loop on a counter and a timeout to try to get at least N responses:
sub, err := nc.SubscribeSync(replyTo)if err != nil {log.Fatal(err)}nc.Flush()​// Send the requestnc.PublishRequest(subject, replyTo, []byte(input))​// Wait for a single responsemax := 500 * time.Millisecondstart := time.Now()for time.Now().Sub(start) < max {msg, err := sub.NextMsg(1 * time.Second)if err != nil {break}​responses = append(responses, string(msg.Data))​if len(responses) >= minResponses {break}}sub.Unsubscribe()