CNCF and Synadia Align on Securing the Future of the NATS.io Project. Read the joint press release.
NATS Docs
NATS.ioNATS by ExampleGitHubSlackTwitter
  • Welcome
  • Release Notes
    • What's New!
      • NATS 2.11
      • NATS 2.10
      • NATS 2.2
      • NATS 2.0
  • NATS Concepts
    • Overview
      • Compare NATS
    • What is NATS
      • Walkthrough Setup
    • Subject-Based Messaging
    • Core NATS
      • Publish-Subscribe
        • Pub/Sub Walkthrough
      • Request-Reply
        • Request-Reply Walkthrough
      • Queue Groups
        • Queueing Walkthrough
    • JetStream
      • Streams
      • Source and Mirror Streams
        • Example
      • Consumers
        • Example
      • JetStream Walkthrough
      • Key/Value Store
        • Key/Value Store Walkthrough
      • Object Store
        • Object Store Walkthrough
      • Headers
    • Subject Mapping and Partitioning
    • NATS Service Infrastructure
      • NATS Adaptive Deployment Architectures
    • Security
    • Connectivity
  • Using NATS
    • NATS Tools
      • nats
        • nats bench
      • nk
      • nsc
        • Basics
        • Streams
        • Services
        • Signing Keys
        • Revocation
        • Managed Operators
      • nats-top
        • Tutorial
    • Developing With NATS
      • Anatomy of a NATS application
      • Connecting
        • Connecting to the Default Server
        • Connecting to a Specific Server
        • Connecting to a Cluster
        • Connection Name
        • Authenticating with a User and Password
        • Authenticating with a Token
        • Authenticating with an NKey
        • Authenticating with a Credentials File
        • Encrypting Connections with TLS
        • Setting a Connect Timeout
        • Ping/Pong Protocol
        • Turning Off Echo'd Messages
        • Miscellaneous functionalities
        • Automatic Reconnections
          • Disabling Reconnect
          • Set the Number of Reconnect Attempts
          • Avoiding the Thundering Herd
          • Pausing Between Reconnect Attempts
          • Listening for Reconnect Events
          • Buffering Messages During Reconnect Attempts
        • Monitoring the Connection
          • Listen for Connection Events
          • Slow Consumers
      • Receiving Messages
        • Synchronous Subscriptions
        • Asynchronous Subscriptions
        • Unsubscribing
        • Unsubscribing After N Messages
        • Replying to a Message
        • Wildcard Subscriptions
        • Queue Subscriptions
        • Draining Messages Before Disconnect
        • Receiving Structured Data
      • Sending Messages
        • Including a Reply Subject
        • Request-Reply Semantics
        • Caches, Flush and Ping
        • Sending Structured Data
      • Building Services
      • JetStream
        • JetStream Model Deep Dive
        • Managing Streams and consumers
        • Consumer Details
        • Publishing to Streams
        • Using the Key/Value Store
        • Using the Object Store
      • Tutorials
        • Advanced Connect and Custom Dialer in Go
    • Running Workloads on NATS
      • Getting Started
        • Installing Nex
        • Building a Service
        • Starting a Node
        • Deploying Services
        • Building a Function
        • Deploying Functions
      • Host Services
        • Javascript | V8
      • Nex Internals
        • Architecture Overview
        • Node Process
        • Nex Agent
        • No Sandbox Mode
        • Root File System
        • Control Interface
      • FAQ
  • Running a NATS service
    • Installing, running and deploying a NATS Server
      • Installing a NATS Server
      • Running and deploying a NATS Server
      • Windows Service
      • Flags
    • Environmental considerations
    • NATS and Docker
      • Tutorial
      • Docker Swarm
      • Python and NGS Running in Docker
      • JetStream
      • NGS Leaf Nodes
    • NATS and Kubernetes
    • NATS Server Clients
    • Configuring NATS Server
      • Configuring JetStream
        • Configuration Management
          • NATS Admin CLI
          • Terraform
          • GitHub Actions
          • Kubernetes Controller
      • Clustering
        • Clustering Configuration
        • v2 Routes
        • JetStream Clustering
          • Administration
          • Troubleshooting
      • Super-cluster with Gateways
        • Configuration
      • Leaf Nodes
        • Configuration
        • JetStream on Leaf Nodes
      • Securing NATS
        • Enabling TLS
        • Authentication
          • Tokens
          • Username/Password
          • TLS Authentication
            • TLS Authentication in clusters
          • NKeys
          • Authentication Timeout
          • Decentralized JWT Authentication/Authorization
            • Account lookup using Resolver
            • Memory Resolver Tutorial
            • Mixed Authentication/Authorization Setup
        • Authorization
        • Multi Tenancy using Accounts
        • OCSP Stapling
        • Auth Callout
      • Logging
      • Enabling Monitoring
      • MQTT
        • Configuration
      • Configuring Subject Mapping
      • System Events
        • System Events & Decentralized JWT Tutorial
      • WebSocket
        • Configuration
    • Managing and Monitoring your NATS Server Infrastructure
      • Monitoring
        • Monitoring JetStream
      • Managing JetStream
        • Account Information
        • Naming Streams, Consumers, and Accounts
        • Streams
        • Consumers
        • Data Replication
        • Disaster Recovery
        • Encryption at Rest
      • Managing JWT Security
        • In Depth JWT Guide
      • Upgrading a Cluster
      • Slow Consumers
      • Signals
      • Lame Duck Mode
      • Profiling
  • Reference
    • FAQ
    • NATS Protocols
      • Protocol Demo
      • Client Protocol
        • Developing a Client
      • NATS Cluster Protocol
      • JetStream wire API Reference
    • Roadmap
    • Contributing
  • Legacy
    • nats-account-server
Powered by GitBook
On this page

Was this helpful?

Edit on GitHub
Export as PDF
  1. Using NATS
  2. Developing With NATS
  3. Receiving Messages

Asynchronous Subscriptions

Asynchronous subscriptions use callbacks of some form to notify an application when a message arrives. These subscriptions are usually easier to work with, but do represent some form of internal work and resource usage, i.e. threads, by the library. Check your library's documentation for any resource usage associated with asynchronous subscriptions.

Note: For a given subscription, messages are dispatched serially, one message at a time. If your application does not care about processing ordering and would prefer the messages to be dispatched concurrently, it is the application's responsibility to move them to some internal queue to be picked up by threads/go routines.

The following example subscribes to the subject updates and handles the incoming messages:

nc, err := nats.Connect("demo.nats.io")
if err != nil {
    log.Fatal(err)
}
defer nc.Close()

// Use a WaitGroup to wait for a message to arrive
wg := sync.WaitGroup{}
wg.Add(1)

// Subscribe
if _, err := nc.Subscribe("updates", func(m *nats.Msg) {
    wg.Done()
}); err != nil {
    log.Fatal(err)
}

// Wait for a message to come in
wg.Wait()
Connection nc = Nats.connect("nats://demo.nats.io:4222");

// Use a latch to wait for a message to arrive
CountDownLatch latch = new CountDownLatch(1);

// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
    String str = new String(msg.getData(), StandardCharsets.UTF_8);
    System.out.println(str);
    latch.countDown();
});

// Subscribe
d.subscribe("updates");

// Wait for a message to come in
latch.await(); 

// Close the connection
nc.close();
const sc = StringCodec();
// this is an example of a callback subscription
// https://github.com/nats-io/nats.js/blob/master/README.md#async-vs-callbacks
nc.subscribe("updates", {
  callback: (err, msg) => {
    if (err) {
      t.error(err.message);
    } else {
      t.log(sc.decode(msg.data));
    }
  },
  max: 1,
});

// here's an iterator subscription - note the code in the
// for loop will block until the iterator completes
// either from a break/return from the iterator, an
// unsubscribe after the message arrives, or in this case
// an auto-unsubscribe after the first message is received
const sub = nc.subscribe("updates", { max: 1 });
for await (const m of sub) {
  t.log(sc.decode(m.data));
}

// subscriptions have notifications, simply wait
// the closed promise
sub.closed
  .then(() => {
    t.log("subscription closed");
  })
  .catch((err) => {
    t.err(`subscription closed with an error ${err.message}`);
  });
nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

future = asyncio.Future()

async def cb(msg):
  nonlocal future
  future.set_result(msg)

await nc.subscribe("updates", cb=cb)
await nc.publish("updates", b'All is Well')
await nc.flush()

# Wait for message to come in
msg = await asyncio.wait_for(future, 1)
// dotnet add package NATS.Net
using NATS.Net;

await using var client = new NatsClient();

// Subscribe to the "updates" subject and receive messages as <string> type.
// The default serializer understands all primitive types, strings,
// byte arrays, and uses JSON for complex types.
await foreach (var msg in client.SubscribeAsync<string>("updates"))
{
    Console.WriteLine($"Received: {msg.Data}");
    
    if (msg.Data == "exit")
    {
        // When we exit the loop, we unsubscribe from the subject
        // as a result of enumeration completion.
        break;
    }
}
require 'nats/client'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  nc.subscribe("updates") do |msg|
    puts msg
    nc.close
  end

  nc.publish("updates", "All is Well")
end
static void
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
{
    printf("Received msg: %s - %.*s\n",
           natsMsg_GetSubject(msg),
           natsMsg_GetDataLength(msg),
           natsMsg_GetData(msg));

    // Need to destroy the message!
    natsMsg_Destroy(msg);
}

(...)

natsConnection      *conn = NULL;
natsSubscription    *sub  = NULL;
natsStatus          s;

s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
if (s == NATS_OK)
{
    // Creates an asynchronous subscription on subject "foo".
    // When a message is sent on subject "foo", the callback
    // onMsg() will be invoked by the client library.
    // You can pass a closure as the last argument.
    s = natsConnection_Subscribe(&sub, conn, "foo", onMsg, NULL);
}

(...)


// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
PreviousSynchronous SubscriptionsNextUnsubscribing

Last updated 6 months ago

Was this helpful?