CNCF and Synadia Align on Securing the Future of the NATS.io Project. Read the joint press release.
NATS Docs
NATS.ioNATS by ExampleGitHubSlackTwitter
  • Welcome
  • Release Notes
    • What's New!
      • NATS 2.11
      • NATS 2.10
      • NATS 2.2
      • NATS 2.0
  • NATS Concepts
    • Overview
      • Compare NATS
    • What is NATS
      • Walkthrough Setup
    • Subject-Based Messaging
    • Core NATS
      • Publish-Subscribe
        • Pub/Sub Walkthrough
      • Request-Reply
        • Request-Reply Walkthrough
      • Queue Groups
        • Queueing Walkthrough
    • JetStream
      • Streams
      • Source and Mirror Streams
        • Example
      • Consumers
        • Example
      • JetStream Walkthrough
      • Key/Value Store
        • Key/Value Store Walkthrough
      • Object Store
        • Object Store Walkthrough
      • Headers
    • Subject Mapping and Partitioning
    • NATS Service Infrastructure
      • NATS Adaptive Deployment Architectures
    • Security
    • Connectivity
  • Using NATS
    • NATS Tools
      • nats
        • nats bench
      • nk
      • nsc
        • Basics
        • Streams
        • Services
        • Signing Keys
        • Revocation
        • Managed Operators
      • nats-top
        • Tutorial
    • Developing With NATS
      • Anatomy of a NATS application
      • Connecting
        • Connecting to the Default Server
        • Connecting to a Specific Server
        • Connecting to a Cluster
        • Connection Name
        • Authenticating with a User and Password
        • Authenticating with a Token
        • Authenticating with an NKey
        • Authenticating with a Credentials File
        • Encrypting Connections with TLS
        • Setting a Connect Timeout
        • Ping/Pong Protocol
        • Turning Off Echo'd Messages
        • Miscellaneous functionalities
        • Automatic Reconnections
          • Disabling Reconnect
          • Set the Number of Reconnect Attempts
          • Avoiding the Thundering Herd
          • Pausing Between Reconnect Attempts
          • Listening for Reconnect Events
          • Buffering Messages During Reconnect Attempts
        • Monitoring the Connection
          • Listen for Connection Events
          • Slow Consumers
      • Receiving Messages
        • Synchronous Subscriptions
        • Asynchronous Subscriptions
        • Unsubscribing
        • Unsubscribing After N Messages
        • Replying to a Message
        • Wildcard Subscriptions
        • Queue Subscriptions
        • Draining Messages Before Disconnect
        • Receiving Structured Data
      • Sending Messages
        • Including a Reply Subject
        • Request-Reply Semantics
        • Caches, Flush and Ping
        • Sending Structured Data
      • Building Services
      • JetStream
        • JetStream Model Deep Dive
        • Managing Streams and consumers
        • Consumer Details
        • Publishing to Streams
        • Using the Key/Value Store
        • Using the Object Store
      • Tutorials
        • Advanced Connect and Custom Dialer in Go
    • Running Workloads on NATS
      • Getting Started
        • Installing Nex
        • Building a Service
        • Starting a Node
        • Deploying Services
        • Building a Function
        • Deploying Functions
      • Host Services
        • Javascript | V8
      • Nex Internals
        • Architecture Overview
        • Node Process
        • Nex Agent
        • No Sandbox Mode
        • Root File System
        • Control Interface
      • FAQ
  • Running a NATS service
    • Installing, running and deploying a NATS Server
      • Installing a NATS Server
      • Running and deploying a NATS Server
      • Windows Service
      • Flags
    • Environmental considerations
    • NATS and Docker
      • Tutorial
      • Docker Swarm
      • Python and NGS Running in Docker
      • JetStream
      • NGS Leaf Nodes
    • NATS and Kubernetes
    • NATS Server Clients
    • Configuring NATS Server
      • Configuring JetStream
        • Configuration Management
          • NATS Admin CLI
          • Terraform
          • GitHub Actions
          • Kubernetes Controller
      • Clustering
        • Clustering Configuration
        • v2 Routes
        • JetStream Clustering
          • Administration
          • Troubleshooting
      • Super-cluster with Gateways
        • Configuration
      • Leaf Nodes
        • Configuration
        • JetStream on Leaf Nodes
      • Securing NATS
        • Enabling TLS
        • Authentication
          • Tokens
          • Username/Password
          • TLS Authentication
            • TLS Authentication in clusters
          • NKeys
          • Authentication Timeout
          • Decentralized JWT Authentication/Authorization
            • Account lookup using Resolver
            • Memory Resolver Tutorial
            • Mixed Authentication/Authorization Setup
        • Authorization
        • Multi Tenancy using Accounts
        • OCSP Stapling
        • Auth Callout
      • Logging
      • Enabling Monitoring
      • MQTT
        • Configuration
      • Configuring Subject Mapping
      • System Events
        • System Events & Decentralized JWT Tutorial
      • WebSocket
        • Configuration
    • Managing and Monitoring your NATS Server Infrastructure
      • Monitoring
        • Monitoring JetStream
      • Managing JetStream
        • Account Information
        • Naming Streams, Consumers, and Accounts
        • Streams
        • Consumers
        • Data Replication
        • Disaster Recovery
        • Encryption at Rest
      • Managing JWT Security
        • In Depth JWT Guide
      • Upgrading a Cluster
      • Slow Consumers
      • Signals
      • Lame Duck Mode
      • Profiling
  • Reference
    • FAQ
    • NATS Protocols
      • Protocol Demo
      • Client Protocol
        • Developing a Client
      • NATS Cluster Protocol
      • JetStream wire API Reference
    • Roadmap
    • Contributing
  • Legacy
    • nats-account-server
Powered by GitBook
On this page

Was this helpful?

Edit on GitHub
Export as PDF
  1. Using NATS
  2. Developing With NATS
  3. JetStream

Managing Streams and consumers

Streams and durable consumers can be defined administratively outside the application (typically using the NATS CLI Tool) in which case the application only needs to know about the well-known names of the durable consumers it wants to use. But you can also manage streams and consumers programmatically.

Common stream management operations are:

  • Add a stream. Adding a stream is an idempotent function, which means that if a stream does not exist, it will be created, and if a stream already exists, then the add operation will succeed only if the existing stream matches exactly the attributes specified in the 'add' call.

  • Delete a stream.

  • Purge a stream (delete all the messages stored in the stream)

  • Get or remove a specific message from a stream by sequence number

  • Add or update (or delete) a consumer

  • Get info and statistics on streams/consumers/account. Get/remove/get information on individual messages stored in a stream.

func ExampleJetStreamManager() {
	nc, _ := nats.Connect("localhost")

	js, _ := nc.JetStream()

	// Create a stream
	js.AddStream(&nats.StreamConfig{
		Name:     "example-stream",
		Subjects: []string{"example-subject"},
		MaxBytes: 1024,
	})

	// Update a stream
	js.UpdateStream(&nats.StreamConfig{
		Name:     "example-stream",
		MaxBytes: 2048,
	})

	// Create a durable consumer
	js.AddConsumer("example-stream", &nats.ConsumerConfig{
		Durable: "example-consumer-name",
	})

	// Get information about all streams (with Context JSOpt)
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	for info := range js.StreamsInfo(nats.Context(ctx)) {
		fmt.Println("stream name: ", info.Config.Name)
	}

	// Get information about all consumers (with MaxWait JSOpt)
	for info := range js.ConsumersInfo("example-stream", nats.MaxWait(10*time.Second)) {
		fmt.Println("consumer name: ", info.Name)
	}

	// Delete a consumer
	js.DeleteConsumer("example-stream", "example-consumer-name")

	// Delete a stream
	js.DeleteStream("example-stream")
}
try (Connection nc = Nats.connect("localhost")) {
    JetStreamManagement jsm = nc.jetStreamManagement();

    // Create a stream
    StreamInfo si = jsm.addStream(StreamConfiguration.builder()
        .name("example-stream")
        .subjects("example-subject")
        .maxBytes(1024)
        .build());
    StreamConfiguration config = si.getConfiguration();
    System.out.println("stream name: " + config.getName() + ", max_bytes: " + config.getMaxBytes());

    // Update a stream
    si = jsm.updateStream(StreamConfiguration.builder()
        .name("example-stream")
        .maxBytes(2048)
        .build());
    config = si.getConfiguration();
    System.out.println("stream name: " + config.getName() + ", max_bytes: " + config.getMaxBytes());

    // Create a durable consumer
    jsm.createConsumer("example-stream", ConsumerConfiguration.builder()
        .durable("example-consumer-name")
        .build());

    // Get information about all streams
    List<StreamInfo> streams = jsm.getStreams();
    for (StreamInfo info : streams) {
        System.out.println("stream name: " + info.getConfiguration().getName());
    }

    // Get information about all consumers
    List<ConsumerInfo> consumers = jsm.getConsumers("example-stream");
    for (ConsumerInfo ci : consumers) {
        System.out.println("consumer name: " + ci.getName());
    }

    // Delete a consumer
    jsm.deleteConsumer("example-stream", "example-consumer-name");

    // Delete a stream
    jsm.deleteStream("example-stream");
}
import { AckPolicy, connect, Empty } from "../../src/mod.ts";

const nc = await connect();
const jsm = await nc.jetstreamManager();

// list all the streams, the `next()` function
// retrieves a paged result.
const streams = await jsm.streams.list().next();
streams.forEach((si) => {
    console.log(si);
});

// add a stream
const stream = "mystream";
const subj = `mystream.*`;
await jsm.streams.add({ name: stream, subjects: [subj] });

// publish a reg nats message directly to the stream
for (let i = 0; i < 10; i++) {
    nc.publish(`${subj}.a`, Empty);
}

// find a stream that stores a specific subject:
const name = await jsm.streams.find("mystream.A");

// retrieve info about the stream by its name
const si = await jsm.streams.info(name);

// update a stream configuration
si.config.subjects?.push("a.b");
await jsm.streams.update(name, si.config);

// get a particular stored message in the stream by sequence
// this is not associated with a consumer
const sm = await jsm.streams.getMessage(stream, { seq: 1 });
console.log(sm.seq);

// delete the 5th message in the stream, securely erasing it
await jsm.streams.deleteMessage(stream, 5);

// purge all messages in the stream, the stream itself
// remains.
await jsm.streams.purge(stream);

// purge all messages with a specific subject (filter can be a wildcard)
await jsm.streams.purge(stream, { filter: "a.b" });

// purge messages with a specific subject keeping some messages
await jsm.streams.purge(stream, { filter: "a.c", keep: 5 });

// purge all messages with upto (not including seq)
await jsm.streams.purge(stream, { seq: 100 });

// purge all messages with upto sequence that have a matching subject
await jsm.streams.purge(stream, { filter: "a.d", seq: 100 });

// list all consumers for a stream:
const consumers = await jsm.consumers.list(stream).next();
consumers.forEach((ci) => {
    console.log(ci);
});

// add a new durable pull consumer
await jsm.consumers.add(stream, {
    durable_name: "me",
    ack_policy: AckPolicy.Explicit,
});

// retrieve a consumer's configuration
const ci = await jsm.consumers.info(stream, "me");
console.log(ci);

// delete a particular consumer
await jsm.consumers.delete(stream, "me");
import asyncio

import nats
from nats.errors import TimeoutError
    
async def main():
    nc = await nats.connect("localhost")

    # Create JetStream context.
    js = nc.jetstream()
        
    # Persist messages on 'foo's subject.
    await js.add_stream(name="sample-stream", subjects=["foo"])

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())    
// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;

await using var client = new NatsClient();

INatsJSContext js = client.CreateJetStreamContext();

// Create a stream
var streamConfig = new StreamConfig(name: "example-stream", subjects: ["example-subject"])
{
    MaxBytes = 1024,
};
await js.CreateStreamAsync(streamConfig);

// Update the stream
var streamConfigUpdated = streamConfig with { MaxBytes = 2048 };
await js.UpdateStreamAsync(streamConfigUpdated);

// Create a durable consumer
await js.CreateConsumerAsync("example-stream", new ConsumerConfig("example-consumer-name"));

// Get information about all streams
await foreach (var stream in js.ListStreamsAsync())
{
    Console.WriteLine($"stream name: {stream.Info.Config.Name}");
}

// Get information about all consumers in a stream
await foreach (var consumer in js.ListConsumersAsync("example-stream"))
{
    Console.WriteLine($"consumer name: {consumer.Info.Config.Name}");
}

// Delete a consumer
await js.DeleteConsumerAsync("example-stream", "example-consumer-name");

// Delete a stream
await js.DeleteStreamAsync("example-stream");

// Output:
// stream name: example-stream
// consumer name: example-consumer-name
#include "examples.h"

static const char *usage = ""\
"-stream        stream name (default is 'foo')\n" \
"-txt           text to send (default is 'hello')\n" \
"-count         number of messages to send\n" \
"-sync          publish synchronously (default is async)\n";

static void
_jsPubErr(jsCtx *js, jsPubAckErr *pae, void *closure)
{
    int *errors = (int*) closure;

    printf("Error: %u - Code: %u - Text: %s\n", pae->Err, pae->ErrCode, pae->ErrText);
    printf("Original message: %.*s\n", natsMsg_GetDataLength(pae->Msg), natsMsg_GetData(pae->Msg));

    *errors = (*errors + 1);

    // If we wanted to resend the original message, we would do something like that:
    //
    // js_PublishMsgAsync(js, &(pae->Msg), NULL);
    //
    // Note that we use `&(pae->Msg)` so that the library set it to NULL if it takes
    // ownership, and the library will not destroy the message when this callback returns.

    // No need to destroy anything, everything is handled by the library.
}

int main(int argc, char **argv)
{
    natsConnection      *conn  = NULL;
    natsStatistics      *stats = NULL;
    natsOptions         *opts  = NULL;
    jsCtx               *js    = NULL;
    jsOptions           jsOpts;
    jsErrCode           jerr   = 0;
    natsStatus          s;
    int                 dataLen=0;
    volatile int        errors = 0;
    bool                delStream = false;

    opts = parseArgs(argc, argv, usage);
    dataLen = (int) strlen(payload);

    s = natsConnection_Connect(&conn, opts);

    if (s == NATS_OK)
        s = jsOptions_Init(&jsOpts);

    if (s == NATS_OK)
    {
        if (async)
        {
            jsOpts.PublishAsync.ErrHandler           = _jsPubErr;
            jsOpts.PublishAsync.ErrHandlerClosure    = (void*) &errors;
        }
        s = natsConnection_JetStream(&js, conn, &jsOpts);
    }

    if (s == NATS_OK)
    {
        jsStreamInfo    *si = NULL;

        // First check if the stream already exists.
        s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
        if (s == NATS_NOT_FOUND)
        {
            jsStreamConfig  cfg;

            // Since we are the one creating this stream, we can delete at the end.
            delStream = true;

            // Initialize the configuration structure.
            jsStreamConfig_Init(&cfg);
            cfg.Name = stream;
            // Set the subject
            cfg.Subjects = (const char*[1]){subj};
            cfg.SubjectsLen = 1;
            // Make it a memory stream.
            cfg.Storage = js_MemoryStorage;
            // Add the stream,
            s = js_AddStream(&si, js, &cfg, NULL, &jerr);
        }
        if (s == NATS_OK)
        {
            printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
                si->Config->Name, si->State.Msgs, si->State.Bytes);

            // Need to destroy the returned stream object.
            jsStreamInfo_Destroy(si);
        }
    }

    if (s == NATS_OK)
        s = natsStatistics_Create(&stats);

    if (s == NATS_OK)
    {
        printf("\nSending %" PRId64 " messages to subject '%s'\n", total, stream);
        start = nats_Now();
    }

    for (count = 0; (s == NATS_OK) && (count < total); count++)
    {
        if (async)
            s = js_PublishAsync(js, subj, (const void*) payload, dataLen, NULL);
        else
        {
            jsPubAck *pa = NULL;

            s = js_Publish(&pa, js, subj, (const void*) payload, dataLen, NULL, &jerr);
            if (s == NATS_OK)
            {
                if (pa->Duplicate)
                    printf("Got a duplicate message! Sequence=%" PRIu64 "\n", pa->Sequence);

                jsPubAck_Destroy(pa);
            }
        }
    }

    if ((s == NATS_OK) && async)
    {
        jsPubOptions    jsPubOpts;

        jsPubOptions_Init(&jsPubOpts);
        // Let's set it to 30 seconds, if getting "Timeout" errors,
        // this may need to be increased based on the number of messages
        // being sent.
        jsPubOpts.MaxWait = 30000;
        s = js_PublishAsyncComplete(js, &jsPubOpts);
        if (s == NATS_TIMEOUT)
        {
            // Let's get the list of pending messages. We could resend,
            // etc, but for now, just destroy them.
            natsMsgList list;

            js_PublishAsyncGetPendingList(&list, js);
            natsMsgList_Destroy(&list);
        }
    }

    if (s == NATS_OK)
    {
        jsStreamInfo *si = NULL;

        elapsed = nats_Now() - start;
        printStats(STATS_OUT, conn, NULL, stats);
        printPerf("Sent");

        if (errors != 0)
            printf("There were %d asynchronous errors\n", errors);

        // Let's report some stats after the run
        s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
        if (s == NATS_OK)
        {
            printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
                si->Config->Name, si->State.Msgs, si->State.Bytes);

            jsStreamInfo_Destroy(si);
        }
    }
    if (delStream && (js != NULL))
    {
        printf("\nDeleting stream %s: ", stream);
        s = js_DeleteStream(js, stream, NULL, &jerr);
        if (s == NATS_OK)
            printf("OK!");
        printf("\n");
    }
    if (s != NATS_OK)
    {
        printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
        nats_PrintLastErrorStack(stderr);
    }

    // Destroy all our objects to avoid report of memory leak
    jsCtx_Destroy(js);
    natsStatistics_Destroy(stats);
    natsConnection_Destroy(conn);
    natsOptions_Destroy(opts);

    // To silence reports of memory still in used with valgrind
    nats_Close();

    return 0;
}
PreviousJetStream Model Deep DiveNextConsumer Details

Last updated 4 months ago

Was this helpful?