The pattern of sending a message and receiving a response is encapsulated in most client libraries into a request method. Under the covers this method will publish a message with a unique reply-to subject and wait for the response before returning.
In the older versions of some libraries a completely new reply-to subject is created each time. In newer versions, a subject hierarchy is used so that a single subscriber in the client library listens for a wildcard, and requests are sent with a unique child subject of a single subject.
The primary difference between the request method and publishing with a reply-to is that the library is only going to accept one response, and in most libraries the request will be treated as a synchronous action. The library may even provide a way to set the timeout.
For example, updating the previous publish example we may request time with a one second timeout:
nc, err := nats.Connect("demo.nats.io")if err !=nil { log.Fatal(err)}defer nc.Close()// Send the requestmsg, err := nc.Request("time", nil, time.Second)if err !=nil { log.Fatal(err)}// Use the responselog.Printf("Reply: %s", msg.Data)// Close the connectionnc.Close()
Connection nc =Nats.connect("nats://demo.nats.io:4222");// set up a listener for "time" requestsDispatcher d =nc.createDispatcher(msg -> {System.out.println("Received time request");nc.publish(msg.getReplyTo(), (""+System.currentTimeMillis()).getBytes());});d.subscribe("time");// make a request to the "time" subject and wait 1 second for a responseMessage msg =nc.request("time",null,Duration.ofSeconds(1));// look at the responselong time =Long.parseLong(newString(msg.getData()));System.out.println(newDate(time));nc.close();
// set up a subscription to process the requestconstsc=StringCodec();nc.subscribe("time", {callback: (_err, msg) => {msg.respond(sc.encode(newDate().toLocaleTimeString())); },});constr=awaitnc.request("time");t.log(sc.decode(r.data));
nc =NATS()asyncdefsub(msg):await nc.publish(msg.reply, b'response')await nc.connect(servers=["nats://demo.nats.io:4222"])await nc.subscribe("time", cb=sub)# Send the requesttry: msg =await nc.request("time", b'', timeout=1)# Use the responseprint("Reply:", msg)except asyncio.TimeoutError:print("Timed out waiting for response")
// dotnet add package NATS.NetusingNATS.Net;awaitusingvar client =newNatsClient();usingCancellationTokenSourcects=new();// Process the time messages in a separate taskTask subscription =Task.Run(async () =>{awaitforeach (var msg inclient.SubscribeAsync<string>("time", cancellationToken:cts.Token)) {awaitmsg.ReplyAsync(DateTimeOffset.Now); }});// Wait for the subscription task to be readyawaitTask.Delay(1000);var reply =awaitclient.RequestAsync<DateTimeOffset>("time");Console.WriteLine($"Reply: {reply.Data:O}");awaitcts.CancelAsync();await subscription;// Output:// Reply: 2024-10-23T05:20:55.0000000+01:00
require'nats/client'require'fiber'NATS.start(servers:["nats://127.0.0.1:4222"]) do|nc| nc.subscribe("time") do|msg, reply| nc.publish(reply,"response")endFiber.newdo# Use the response msg = nc.request("time","")puts"Reply: #{msg}"end.resumeend
natsConnection *conn =NULL;natsMsg *msg =NULL;natsStatus s = NATS_OK;s =natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);// Send a request and wait for up to 1 secondif (s == NATS_OK) s =natsConnection_RequestString(&msg, conn,"request","this is the request",1000);if (s == NATS_OK){printf("Received msg: %s - %.*s\n", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg));// Destroy the message that was receivednatsMsg_Destroy(msg);}(...)// Destroy objects that were creatednatsConnection_Destroy(conn);
You can think of request-reply in the library as a subscribe, get one message, unsubscribe pattern. In Go this might look something like:
sub, err := nc.SubscribeSync(replyTo)if err !=nil { log.Fatal(err)}// Send the request immediatelync.PublishRequest(subject, replyTo, []byte(input))nc.Flush()// Wait for a single responsefor { msg, err := sub.NextMsg(1* time.Second)if err !=nil { log.Fatal(err) } response =string(msg.Data)break}sub.Unsubscribe()
Scatter-Gather
You can expand the request-reply pattern into something often called scatter-gather. To receive multiple messages, with a timeout, you could do something like the following, where the loop getting messages is using time as the limitation, not the receipt of a single message:
sub, err := nc.SubscribeSync(replyTo)if err !=nil { log.Fatal(err)}nc.Flush()// Send the requestnc.PublishRequest(subject, replyTo, []byte(input))// Wait for a single responsemax :=100* time.Millisecondstart := time.Now()for time.Now().Sub(start) < max { msg, err := sub.NextMsg(1* time.Second)if err !=nil {break } responses =append(responses, string(msg.Data))}sub.Unsubscribe()
Or, you can loop on a counter and a timeout to try to get at least N responses:
sub, err := nc.SubscribeSync(replyTo)if err !=nil { log.Fatal(err)}nc.Flush()// Send the requestnc.PublishRequest(subject, replyTo, []byte(input))// Wait for a single responsemax :=500* time.Millisecondstart := time.Now()for time.Now().Sub(start) < max { msg, err := sub.NextMsg(1* time.Second)if err !=nil {break } responses =append(responses, string(msg.Data))iflen(responses) >= minResponses {break }}sub.Unsubscribe()