CNCF and Synadia Align on Securing the Future of the NATS.io Project. Read the joint press release.
NATS Docs
NATS.ioNATS by ExampleGitHubSlackTwitter
  • Welcome
  • Release Notes
    • What's New!
      • NATS 2.11
      • NATS 2.10
      • NATS 2.2
      • NATS 2.0
  • NATS Concepts
    • Overview
      • Compare NATS
    • What is NATS
      • Walkthrough Setup
    • Subject-Based Messaging
    • Core NATS
      • Publish-Subscribe
        • Pub/Sub Walkthrough
      • Request-Reply
        • Request-Reply Walkthrough
      • Queue Groups
        • Queueing Walkthrough
    • JetStream
      • Streams
      • Source and Mirror Streams
        • Example
      • Consumers
        • Example
      • JetStream Walkthrough
      • Key/Value Store
        • Key/Value Store Walkthrough
      • Object Store
        • Object Store Walkthrough
      • Headers
    • Subject Mapping and Partitioning
    • NATS Service Infrastructure
      • NATS Adaptive Deployment Architectures
    • Security
    • Connectivity
  • Using NATS
    • NATS Tools
      • nats
        • nats bench
      • nk
      • nsc
        • Basics
        • Streams
        • Services
        • Signing Keys
        • Revocation
        • Managed Operators
      • nats-top
        • Tutorial
    • Developing With NATS
      • Anatomy of a NATS application
      • Connecting
        • Connecting to the Default Server
        • Connecting to a Specific Server
        • Connecting to a Cluster
        • Connection Name
        • Authenticating with a User and Password
        • Authenticating with a Token
        • Authenticating with an NKey
        • Authenticating with a Credentials File
        • Encrypting Connections with TLS
        • Setting a Connect Timeout
        • Ping/Pong Protocol
        • Turning Off Echo'd Messages
        • Miscellaneous functionalities
        • Automatic Reconnections
          • Disabling Reconnect
          • Set the Number of Reconnect Attempts
          • Avoiding the Thundering Herd
          • Pausing Between Reconnect Attempts
          • Listening for Reconnect Events
          • Buffering Messages During Reconnect Attempts
        • Monitoring the Connection
          • Listen for Connection Events
          • Slow Consumers
      • Receiving Messages
        • Synchronous Subscriptions
        • Asynchronous Subscriptions
        • Unsubscribing
        • Unsubscribing After N Messages
        • Replying to a Message
        • Wildcard Subscriptions
        • Queue Subscriptions
        • Draining Messages Before Disconnect
        • Receiving Structured Data
      • Sending Messages
        • Including a Reply Subject
        • Request-Reply Semantics
        • Caches, Flush and Ping
        • Sending Structured Data
      • Building Services
      • JetStream
        • JetStream Model Deep Dive
        • Managing Streams and consumers
        • Consumer Details
        • Publishing to Streams
        • Using the Key/Value Store
        • Using the Object Store
      • Tutorials
        • Advanced Connect and Custom Dialer in Go
    • Running Workloads on NATS
      • Getting Started
        • Installing Nex
        • Building a Service
        • Starting a Node
        • Deploying Services
        • Building a Function
        • Deploying Functions
      • Host Services
        • Javascript | V8
      • Nex Internals
        • Architecture Overview
        • Node Process
        • Nex Agent
        • No Sandbox Mode
        • Root File System
        • Control Interface
      • FAQ
  • Running a NATS service
    • Installing, running and deploying a NATS Server
      • Installing a NATS Server
      • Running and deploying a NATS Server
      • Windows Service
      • Flags
    • Environmental considerations
    • NATS and Docker
      • Tutorial
      • Docker Swarm
      • Python and NGS Running in Docker
      • JetStream
      • NGS Leaf Nodes
    • NATS and Kubernetes
    • NATS Server Clients
    • Configuring NATS Server
      • Configuring JetStream
        • Configuration Management
          • NATS Admin CLI
          • Terraform
          • GitHub Actions
          • Kubernetes Controller
      • Clustering
        • Clustering Configuration
        • v2 Routes
        • JetStream Clustering
          • Administration
          • Troubleshooting
      • Super-cluster with Gateways
        • Configuration
      • Leaf Nodes
        • Configuration
        • JetStream on Leaf Nodes
      • Securing NATS
        • Enabling TLS
        • Authentication
          • Tokens
          • Username/Password
          • TLS Authentication
            • TLS Authentication in clusters
          • NKeys
          • Authentication Timeout
          • Decentralized JWT Authentication/Authorization
            • Account lookup using Resolver
            • Memory Resolver Tutorial
            • Mixed Authentication/Authorization Setup
        • Authorization
        • Multi Tenancy using Accounts
        • OCSP Stapling
        • Auth Callout
      • Logging
      • Enabling Monitoring
      • MQTT
        • Configuration
      • Configuring Subject Mapping
      • System Events
        • System Events & Decentralized JWT Tutorial
      • WebSocket
        • Configuration
    • Managing and Monitoring your NATS Server Infrastructure
      • Monitoring
        • Monitoring JetStream
      • Managing JetStream
        • Account Information
        • Naming Streams, Consumers, and Accounts
        • Streams
        • Consumers
        • Data Replication
        • Disaster Recovery
        • Encryption at Rest
      • Managing JWT Security
        • In Depth JWT Guide
      • Upgrading a Cluster
      • Slow Consumers
      • Signals
      • Lame Duck Mode
      • Profiling
  • Reference
    • FAQ
    • NATS Protocols
      • Protocol Demo
      • Client Protocol
        • Developing a Client
      • NATS Cluster Protocol
      • JetStream wire API Reference
    • Roadmap
    • Contributing
  • Legacy
    • nats-account-server
Powered by GitBook
On this page

Was this helpful?

Edit on GitHub
Export as PDF
  1. Using NATS
  2. Developing With NATS
  3. Receiving Messages

Receiving Structured Data

Client libraries may provide tools to help receive structured data, like JSON. The core traffic to the NATS server will always be opaque byte arrays. The server does not process message payloads in any form. For libraries that don't provide helpers, you can always encode and decode data before sending the associated bytes to the NATS client.

For example, to receive JSON you could do:

nc, err := nats.Connect("demo.nats.io",
    nats.ErrorHandler(func(nc *nats.Conn, s *nats.Subscription, err error) {
        if s != nil {
        log.Printf("Async error in %q/%q: %v", s.Subject, s.Queue, err)
        } else {
        log.Printf("Async error outside subscription: %v", err)
        }
    }))
if err != nil {
    log.Fatal(err)
}
defer nc.Close()
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
    log.Fatal(err)
}
defer ec.Close()

// Define the object
type stock struct {
    Symbol string
    Price  int
}

wg := sync.WaitGroup{}
wg.Add(1)

// Subscribe
// Decoding errors will be passed to the function supplied via
// nats.ErrorHandler above, and the callback supplied here will
// not be invoked.
if _, err := ec.Subscribe("updates", func(s *stock) {
    log.Printf("Stock: %s - Price: %v", s.Symbol, s.Price)
    wg.Done()
}); err != nil {
    log.Fatal(err)
}

// Wait for a message to come in
wg.Wait()
class StockForJsonSub {
    public String symbol;
    public float price;

    public String toString() {
        return symbol + " is at " + price;
    }
}

public class SubscribeJSON {
    public static void main(String[] args) {

        try {
            Connection nc = Nats.connect("nats://demo.nats.io:4222");

            // Use a latch to wait for 10 messages to arrive
            CountDownLatch latch = new CountDownLatch(10);

            // Create a dispatcher and inline message handler
            Dispatcher d = nc.createDispatcher((msg) -> {
                Gson gson = new Gson();

                String json = new String(msg.getData(), StandardCharsets.UTF_8);
                StockForJsonSub stk = gson.fromJson(json, StockForJsonSub.class);

                // Use the object
                System.out.println(stk);

                latch.countDown();
            });

            // Subscribe
            d.subscribe("updates");

            // Wait for a message to come in
            latch.await(); 

            // Close the connection
            nc.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
const jc = JSONCodec();
const sub = nc.subscribe(subj, {
  callback: (_err, msg) => {
    t.log(`${jc.decode(msg.data)}`);
  },
  max: 1,
});
import asyncio
import json
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout

async def run(loop):
    nc = NATS()

    await nc.connect(servers=["nats://127.0.0.1:4222"], loop=loop)

    async def message_handler(msg):
        data = json.loads(msg.data.decode())
        print(data)

    sid = await nc.subscribe("updates", cb=message_handler)
    await nc.flush()

    await nc.auto_unsubscribe(sid, 2)
    await nc.publish("updates", json.dumps({"symbol": "GOOG", "price": 1200 }).encode())
    await asyncio.sleep(1, loop=loop)
    await nc.close()
// dotnet add package NATS.Net
using NATS.Net;

// NATS .NET has a built-in serializer that does the 'unsurprising' thing
// for most types. Most primitive types are serialized as expected.
// For any other type, JSON serialization is used. You can also provide
// your own serializers by implementing the INatsSerializer and
// INasSerializerRegistry interfaces. See also for more information:
// https://nats-io.github.io/nats.net/documentation/advanced/serialization.html
await using var nc = new NatsClient();

CancellationTokenSource cts = new();

// Subscribe for int, string, bytes, json
List<Task> tasks =
[
    Task.Run(async () =>
    {
        await foreach (var msg in nc.SubscribeAsync<int>("x.int", cancellationToken: cts.Token))
        {
            Console.WriteLine($"Received int: {msg.Data}");
        }
    }),

    Task.Run(async () =>
    {
        await foreach (var msg in nc.SubscribeAsync<string>("x.string", cancellationToken: cts.Token))
        {
            Console.WriteLine($"Received string: {msg.Data}");
        }
    }),

    Task.Run(async () =>
    {
        await foreach (var msg in nc.SubscribeAsync<byte[]>("x.bytes", cancellationToken: cts.Token))
        {
            if (msg.Data != null)
            {
                Console.Write($"Received bytes: ");
                foreach (var b in msg.Data)
                {
                    Console.Write("0x{0:X2} ", b);
                }
                Console.WriteLine();
            }
        }
    }),

    Task.Run(async () =>
    {
        await foreach (var msg in nc.SubscribeAsync<MyData>("x.json", cancellationToken: cts.Token))
        {
            Console.WriteLine($"Received data: {msg.Data}");
        }
    }),
];

// Give the subscriber tasks some time to subscribe
await Task.Delay(1000);

await nc.PublishAsync<int>("x.int", 100);
await nc.PublishAsync<string>("x.string", "Hello, World!");
await nc.PublishAsync<byte[]>("x.bytes", new byte[] { 0x41, 0x42, 0x43 });
await nc.PublishAsync<MyData>("x.json", new MyData(30, "bar"));

await cts.CancelAsync();

await Task.WhenAll(tasks);

public record MyData(int Id, string Name);

// Output:
// Received int: 100
// Received bytes: 0x41 0x42 0x43
// Received string: Hello, World!
// Received data: MyData { Id = 30, Name = bar }

// See also for more information:
// https://nats-io.github.io/nats.net/documentation/advanced/serialization.html
require 'nats/client'
require 'json'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  nc.subscribe("updates") do |msg|
    m = JSON.parse(msg)

    # {"symbol"=>"GOOG", "price"=>12}
    p m
  end
end
// Structured data is not configurable in C NATS Client.
PreviousDraining Messages Before DisconnectNextSending Messages

Last updated 6 months ago

Was this helpful?