NATS is designed to move messages through the server quickly. As a result, NATS depends on the applications to consider and respond to changing message rates. The server will do a bit of impedance matching, but if a client is too slow the server will eventually cut them off by closing the connection. These cut off connections are called slow consumers.
One way some of the libraries deal with bursty message traffic is to buffer incoming messages for a subscription. So if an application can handle 10 messages per second and sometimes receives 20 messages per second, the library may hold the extra 10 to give the application time to catch up. To the server, the application will appear to be handling the messages and consider the connection healthy. Most client libraries will notify the application that there is a SlowConsumer error and discard messages.
Receiving and dropping messages from the server keeps the connection to the server healthy, but creates an application requirement. There are several common patterns:
Use request-reply to throttle the sender and prevent overloading the subscriber
Use a queue with multiple subscribers splitting the work
Persist messages with something like NATS streaming
Libraries that cache incoming messages may provide two controls on the incoming queue, or pending messages. These are useful if the problem is bursty publishers and not a continuous performance mismatch. Disabling these limits can be dangerous in production and although setting these limits to 0 may help find problems, it is also a dangerous proposition in production.
Check your libraries documentation for the default settings, and support for disabling these limits.
The incoming cache is usually per subscriber, but again, check the specific documentation for your client library.
Limiting Incoming/Pending Messages by Count and Bytes
The first way that the incoming queue can be limited is by message count. The second way to limit the incoming queue is by total size. For example, to limit the incoming cache to 1,000 messages or 5mb whichever comes first:
nc, err := nats.Connect("demo.nats.io")if err !=nil { log.Fatal(err)}defer nc.Close()// Subscribesub1, err := nc.Subscribe("updates", func(m *nats.Msg) {})if err !=nil { log.Fatal(err)}// Set limits of 1000 messages or 5MB, whichever comes firstsub1.SetPendingLimits(1000, 5*1024*1024)// Subscribesub2, err := nc.Subscribe("updates", func(m *nats.Msg) {})if err !=nil { log.Fatal(err)}// Set no limits for this subscriptionsub2.SetPendingLimits(-1, -1)// Close the connectionnc.Close()
// Consumer (Dispatcher, Subscription) API// void setPendingLimits(long maxMessages, long maxBytes)Connection nc =Nats.connect("nats://demo.nats.io:4222");Dispatcher d =nc.createDispatcher((msg) -> {// handle message});d.subscribe("updates");d.setPendingLimits(1_000,5*1024*1024); // Set limits on a dispatcher// SubscribeSubscription sub =nc.subscribe("updates");sub.setPendingLimits(1_000,5*1024*1024); // Set limits on a subscription// Do something// Close the connectionnc.close();
// slow pending limits are not configurable on node-nats
nc =NATS()await nc.connect(servers=["nats://demo.nats.io:4222"])future = asyncio.Future()asyncdefcb(msg):nonlocal future future.set_result(msg)# Set limits of 1000 messages or 5MBawait nc.subscribe("updates", cb=cb, pending_bytes_limit=5*1024*1024, pending_msgs_limit=1000)
// dotnet add package NATS.NetusingNATS.Net;usingSystem.Threading.Channels;usingNATS.Client.Core;awaitusingvar client =newNatsClient();// Set limits of 1000 messages.// Note: setting the channel capacity over 1024 is not recommended// as the channel's backing array will be allocated on the LOH (large object heap).// NATS .NET client does not support setting a limit on the number of bytesvar subOpts =newNatsSubOpts{ ChannelOpts =newNatsSubChannelOpts { Capacity =1000, FullMode =BoundedChannelFullMode.DropOldest }};awaitforeach (var msg inclient.SubscribeAsync<string>(subject:"updates", opts: subOpts)){Console.WriteLine($"Received: {msg.Subject}: {msg.Data}"); }
# The Ruby NATS client currently does not have option to specify a subscribers pending limits.
natsConnection *conn = NULL;
natsSubscription *sub1 = NULL;
natsSubscription *sub2 = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Subscribe
if (s == NATS_OK)
s = natsConnection_Subscribe(&sub1, conn, "updates", onMsg, NULL);
// Set limits of 1000 messages or 5MB, whichever comes first
if (s == NATS_OK)
s = natsSubscription_SetPendingLimits(sub1, 1000, 5*1024*1024);
// Subscribe
if (s == NATS_OK)
s = natsConnection_Subscribe(&sub2, conn, "updates", onMsg, NULL);
// Set no limits for this subscription
if (s == NATS_OK)
s = natsSubscription_SetPendingLimits(sub2, -1, -1);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub1);
natsSubscription_Destroy(sub2);
natsConnection_Destroy(conn);
Detect a Slow Consumer and Check for Dropped Messages
When a slow consumer is detected and messages are about to be dropped, the library may notify the application. This process may be similar to other errors or may involve a custom callback.
Some libraries, like Java, will not send this notification on every dropped message because that could be noisy. Rather the notification may be sent once per time the subscriber gets behind. Libraries may also provide a way to get a count of dropped messages so that applications can at least detect a problem is occurring.
// Set the callback that will be invoked when an asynchronous error occurs.nc, err := nats.Connect("demo.nats.io", nats.ErrorHandler(logSlowConsumer))if err !=nil { log.Fatal(err)}defer nc.Close()// Do something with the connection
classSlowConsumerReporterimplementsErrorListener {publicvoiderrorOccurred(Connection conn,String error) { }publicvoidexceptionOccurred(Connection conn,Exception exp) { }// Detect slow consumerspublicvoidslowConsumerDetected(Connection conn,Consumer consumer) {// Get the dropped countSystem.out.println("A slow consumer dropped messages: "+consumer.getDroppedCount()); }}publicclassSlowConsumerListener {publicstaticvoidmain(String[] args) {try {Options options =new Options.Builder().server("nats://demo.nats.io:4222").errorListener(new SlowConsumerReporter()).// Set the listenerbuild();Connection nc =Nats.connect(options);// Do something with the connectionnc.close(); } catch (Exception e) {e.printStackTrace(); } }}
// slow consumer detection is not configurable on NATS JavaScript client.
nc =NATS()asyncdeferror_cb(e):iftype(e)is nats.aio.errors.ErrSlowConsumer:print("Slow consumer error, unsubscribing from handling further messages...")await nc.unsubscribe(e.sid)await nc.connect( servers=["nats://demo.nats.io:4222"], error_cb=error_cb, ) msgs = [] future = asyncio.Future()asyncdefcb(msg):nonlocal msgsnonlocal futureprint(msg) msgs.append(msg)iflen(msgs)==3:# Head of line blocking on other messages caused# by single message processing taking too long...await asyncio.sleep(1)await nc.subscribe("updates", cb=cb, pending_msgs_limit=5)for i inrange(0, 10):await nc.publish("updates", "msg #{}".format(i).encode())await asyncio.sleep(0)try:await asyncio.wait_for(future, 1)except asyncio.TimeoutError:passfor msg in msgs:print("[Received]", msg)await nc.close()
// dotnet add package NATS.NetusingNATS.Net;usingSystem.Threading.Channels;usingNATS.Client.Core;awaitusingvar client =newNatsClient();// Set the event handler for slow consumersclient.Connection.MessageDropped+=async (sender, eventArgs) =>{Console.WriteLine($"Dropped message: {eventArgs.Subject}: {eventArgs.Data}");Console.WriteLine($"Current channel size: {eventArgs.Pending}");};var subOpts =newNatsSubOpts{ ChannelOpts =newNatsSubChannelOpts { Capacity =10, FullMode =BoundedChannelFullMode.DropOldest // If set to wait (default), you won't be able to detect slow consumers // FullMode = BoundedChannelFullMode.Wait, }};usingvar cts =newCancellationTokenSource();var subscription =Task.Run(async () =>{awaitforeach (var msg inclient.SubscribeAsync<string>(subject:"updates", opts: subOpts, cancellationToken:cts.Token)) {Console.WriteLine($"Received: {msg.Subject}: {msg.Data}"); }});for (int i =0; i <1_000; i++){awaitclient.PublishAsync(subject:"updates", data:$"message payload {i}");}awaitcts.CancelAsync();await subscription;
# The Ruby NATS client currently does not have option to customize slow consumer limits per sub.
static void
errorCB(natsConnection *conn, natsSubscription *sub, natsStatus s, void *closure)
{
// Do something
printf("Error: %d - %s", s, natsStatus_GetText(s));
}
(...)
natsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetErrorHandler(opts, errorCB, NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);