arrow-left

All pages
gitbookPowered by GitBook
1 of 7

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

JetStream

hashtag
Deciding to use streaming and higher qualities of service

In modern systems, applications can expose services or produce and consume data streams. A basic aspect of publish-subscribe messaging is temporal coupling: the subscribers need to be up and running to receive the message when it is published. At a high level, if observability is required, applications need to consume messages in the future, need to consume at their own pace, or need all messages, then JetStream's streaming functionalities provide the temporal de-coupling between publishers and consumers.

Using streaming and its associated higher qualities of service is the facet of messaging with the highest cost in terms of compute and storage.

hashtag
When to use streaming

Streaming is ideal when:

  • Data producers and consumers are highly decoupled. They may be online at different times and consumers must receive messages.

  • A historical record of the data in the stream is required. This is when a replay of data is required by a consumer.

  • The last message on a stream is required for initialization and the producer may be offline.

Note that no assumptions should ever be made of who will receive and process data in the future, or for what purpose.

hashtag
When to use Core NATS

Using core NATS is ideal as the fast request path for scalable services where there is tolerance for message loss or when applications themselves handle message delivery guarantees.

These include:

  • Service patterns where there is a tightly coupled request-reply

    • A request is made, and the application handles error cases upon timeout

      (resends, errors, etc). __Relying on a messaging system to resend here is

      considered an anti-pattern.__

hashtag
JetStream functionality overview

hashtag
Streams

  • You can use 'Add Stream' to idempotently define streams and their attributes (i.e. source subjects, retention and storage policies, limits)

  • You can use 'Purge' to purge the messages in a stream

  • You can use 'Delete' to delete a stream

hashtag
Publish to a stream

There is interoperability between 'Core NATS' and JetStream in the fact that the streams are listening to core NATS messages. However you will notice that the NATS client libraries' JetStream calls include some 'Publish' calls and so may be wondering what is the difference between a 'Core NATS Publish' and a 'JetStream Publish'.

So yes, when a 'Core NATS' application publishes a message on a Stream's subject, that message will indeed get stored in the stream, but that's not really the intent as you are then publishing with the lower quality of service provided by Core NATS. So, while it will definitely work to just use the Core NATS Publish call to publish to a stream, look at it more as a convenience that you can use to help ease the migration of your applications to use streaming rather the desired end state or ideal design.

Instead, it is better for applications to use the JetStream Publish calls (which Core NATS subscribers not using Streams will still receive like any other publication) when publishing to a stream as:

  • JetStream publish calls are acknowledged by the JetStream enabled servers, which allows for the following higher qualities of service

    • If the publisher receives the acknowledgement from the server it can safely discard any state it has for that publication, the message has not only been received correctly by the server, but it has also been successfully persisted.

    • Whether you use the synchronous or the asynchronous JetStream publish calls, there is an implied flow control between the publisher and the JetStream infrastructure.

hashtag
See Also

hashtag
Create a consumer

are 'views' into a stream, with their own cursor. They are how client applications get messages from a stream (i.e. 'replayed') for processing or consumption. They can filter messages in the stream according to a 'filtering subject' and define which part of the stream is replayed according to a 'replay policy'.

You can create push or pull consumers:

  • Push consumers (specifically ordered push consumers) are the best way for an application to receive its own complete copy of the selected messages in the stream.

  • Pull consumers are the best way to scale horizontally the processing (or consuming) of the selected messages in the stream using multiple client applications sharing the same pull consumer, and allow for the processing of messages in batches.

Consumers can be ephemeral or durable, and support different sets of acknowledgement policies; none, this sequence number, this sequence number and all before it.

hashtag
Replay policy

You select which of the messages in the stream you want to have delivered to your consumer

  • all

  • from a sequence number

  • from a point in time

And you can select the replay speed to be instant or to match the initial publication rate into the stream

hashtag
Subscribe from a consumer

Client applications 'subscribe' from consumers using the JetStream's Subscribe, QueueSubscribe or PullSubscribe (and variations) calls. Note that since the initial release of JetStream, clients have developed a more ergonomic API to work with to process messages.

hashtag
Acknowledging messages

Some consumers require the client application code to acknowledge the processing or consumption of the message, but there is more than one way to acknowledge (or not) a message

  • Ack Acknowledges a message was completely handled

  • Nak Signals that the message will not be processed now and processing can move onto the next message, NAK'd message will be retried

  • InProgress

hashtag
See Also

  • Java

A-priori knowledge of consumers is not available, but consumers must receive messages. This is often a false assumption.

  • The data in messages being sent have a lifespan beyond that of the intended application lifespan.

  • Applications need to consume data at their own pace.

  • You want decoupled flow control between the publishers and the consumers of the stream

  • You need 'exactly once' quality of service with de-duplication of publications and double-acknowledged consumption

  • Where only the last message received is important and new messages will be received frequently enough for applications to tolerate a lost message. This might be a stock ticker stream, frequent exchange of messages in a service control plane, or device telemetry.

  • Message TTL is low, where the value of the data being transmitted degrades or expires quickly.

  • The expected consumer set for a message is available a-priori and consumers are expected to be live. The request-reply pattern works well here or consumers can send an application level acknowledgement.

  • Control plane messages.

  • You can have 'exactly-once' quality of service by the JetStream publishing application inserting a unique publication ID in a header field of the message.

  • the last message
  • the last message(s) for all the subject(s) in the stream

  • When sent before the AckWait period indicates that work is ongoing and the period should be extended by another equal to
    AckWait
  • Term Instructs the server to stop redelivery of a message without acknowledging it as successfully processed

  • Sync and Async JetStream publishing in Javaarrow-up-right
    Consumers
    Consumersarrow-up-right
    JetStream Java tutorialarrow-up-right
    JetStream stream creation in Javaarrow-up-right
    JetStream publishing in Javaarrow-up-right
    Consumers in Javaarrow-up-right
    Push consumers in Javaarrow-up-right
    Pull consumers in Javaarrow-up-right

    JetStream Model Deep Dive

    hashtag
    Stream Limits, Retention, and Policy

    Streams store data on disk, but we cannot store all data forever, so we need ways to control their size automatically.

    There are 3 features that come into play when Streams decide how long they store data.

    The Retention Policy describes based on what criteria a set will evict messages from its storage:

    Retention Policy
    Description

    In all Retention Policies the basic limits apply as upper bounds, these are MaxMsgs for how many messages are kept in total, MaxBytes for how big the set can be in total and MaxAge for what is the oldest message that will be kept. These are the only limits in play with LimitsPolicy retention.

    One can then define additional ways a message may be removed from the Stream earlier than these limits. In WorkQueuePolicy the messages will be removed as soon as the Consumer received an Acknowledgement. In InterestPolicy messages will be removed as soon as all Consumers of the stream for that subject have received an Acknowledgement for the message.

    In both WorkQueuePolicy and InterestPolicy the age, size and count limits will still apply as upper bounds.

    A final control is the Maximum Size any single message may have. NATS have it's own limit for maximum size (1 MiB by default), but you can say a Stream will only accept messages up to 1024 bytes using MaxMsgSize.

    The Discard Policy sets how messages are discarded when limits set by LimitsPolicy are reached. The DiscardOld option removes old messages making space for new, while DiscardNew refuses any new messages.

    The WorkQueuePolicy mode is a specialized mode where a message, once consumed and acknowledged, is removed from the Stream.

    hashtag
    Message Deduplication

    JetStream support idempotent message writes by ignoring duplicate messages as indicated by the Nats-Msg-Id header.

    Here we set a Nats-Msg-Id:1 header which tells JetStream to ensure we do not have duplicates of this message - we only consult the message ID not the body.

    and in the output you can see that the duplicate publications were detected and only one message (the first one) is actually stored in the stream

    The default window to track duplicates in is 2 minutes, this can be set on the command line using --dupe-window when creating a stream, though we would caution against large windows.

    hashtag
    Acknowledgement Models

    Streams support acknowledging receiving a message, if you send a Request() to a subject covered by the configuration of the Stream the service will reply to you once it stored the message. If you just publish, it will not. A Stream can be set to disable Acknowledgements by setting NoAck to true in it's configuration.

    Consumers have 3 acknowledgement modes:

    Mode
    Description

    To understand how Consumers track messages we will start with a clean ORDERS Stream and DISPATCH Consumer.

    The Set is entirely empty

    The Consumer has no messages outstanding and has never had any (Consumer sequence is 1).

    We publish one message to the Stream and see that the Stream received it:

    As the Consumer is pull-based, we can fetch the message, ack it, and check the Consumer state:

    The message got delivered and acknowledged - Acknowledgement floor is 1 and 1, the sequence of the Consumer is 2 which means its had only the one message through and got acked. Since it was acked, nothing is pending or redelivering.

    We'll publish another message, fetch it but not Ack it this time and see the status:

    Get the next message from the consumer (but do not acknowledge it)

    Show the consumer info

    Now we can see the Consumer has processed 2 messages (obs sequence is 3, next message will be 3) but the Ack floor is still 1 - thus 1 message is pending acknowledgement. Indeed this is confirmed in the Pending messages.

    If I fetch it again and again do not ack it:

    Show the consumer info again

    The Consumer sequence increases - each delivery attempt increases the sequence - and our redelivered count also goes up.

    Finally, if I then fetch it again and ack it this time:

    Show the consumer info

    Having now Acked the message there are no more pending.

    Additionally, there are a few types of acknowledgements:

    Type
    Bytes
    Description

    So far all of the examples were the AckAck type of acknowledgement, by replying to the Ack with the body as indicated in Bytes you can pick what mode of acknowledgement you want. Note that this description is documenting the internal JetStream protocol. Client libraries offer APIs for performing all the above acknowledgments using specific APIs where you don't worry about the internal protocol payloads.

    All of these acknowledgement modes, except AckNext, support double acknowledgement - if you set a reply subject when acknowledging the server will in turn acknowledge having received your ACK.

    The +NXT acknowledgement can have a few formats: +NXT 10 requests 10 messages and +NXT {"no_wait": true} which is the same data that can be sent in a Pull Request.

    hashtag
    Exactly Once Semantics

    JetStream supports Exactly Once publication and consumption by combining Message Deduplication and double acks.

    On the publishing side you can avoid duplicate message ingestion using the feature.

    Consumers can be 100% sure a message was correctly processed by requesting the server Acknowledge having received your acknowledgement (sometimes referred to as double-acking) by calling the message's AckSync() (rather than Ack()) function which sets a reply subject on the Ack and waits for a response from the server on the reception and processing of the acknowledgement. If the response received from the server indicates success you can be sure that the message will never be re-delivered by the consumer (due to a loss of your acknowledgement).

    hashtag
    Consumer Starting Position

    When setting up a Consumer you can decide where to start, the system supports the following for the DeliverPolicy:

    Policy
    Description

    Regardless of what mode you set, this is only the starting point. Once started it will always give you what you have not seen or acknowledged. So this is merely how it picks the very first message.

    Let's look at each of these, first we make a new Stream ORDERS and add 100 messages to it.

    Now create a DeliverAll pull-based Consumer:

    Now create a DeliverLast pull-based Consumer:

    Now create a MsgSetSeq pull-based Consumer:

    And finally a time-based Consumer. Let's add some messages a minute apart:

    Then create a Consumer that starts 2 minutes ago:

    hashtag
    Ephemeral Consumers

    So far, all the Consumers you have seen were Durable, meaning they exist even after you disconnect from JetStream. In our Orders scenario, though the MONITOR a Consumer could very well be a short-lived thing there just while an operator is debugging the system, there is no need to remember the last seen position if all you are doing is wanting to observe the real-time state.

    In this case, we can make an Ephemeral Consumer by first subscribing to the delivery subject, then creating a durable and giving it no durable name. An Ephemeral Consumer exists as long as any subscription is active on its delivery subject. It is automatically be removed, after a short grace period to handle restarts, when there are no subscribers.

    Terminal 1:

    Terminal 2:

    The --ephemeral switch tells the system to make an Ephemeral Consumer.

    hashtag
    Consumer Message Rates

    Typically, what you want is if a new Consumer is made the selected messages are delivered to you as quickly as possible. You might want to replay messages at the rate they arrived though, meaning if messages first arrived 1 minute apart, and you make a new Consumer it will get the messages a minute apart.

    This is useful in load testing scenarios etc. This is called the ReplayPolicy and have values of ReplayInstant and ReplayOriginal.

    You can only set ReplayPolicy on push-based Consumers.

    Now let's publish messages into the Set 10 seconds apart:

    And when we consume them they will come to us 10 seconds apart:

    hashtag
    Ack Sampling

    In the earlier sections we saw that samples are being sent to a monitoring system. Let's look at that in depth; how the monitoring system works and what it contains.

    As messages pass through a Consumer you'd be interested in knowing how many are being redelivered and how many times but also how long it takes for messages to be acknowledged.

    Consumers can sample Ack'ed messages for you and publish samples so your monitoring system can observe the health of a Consumer. We will add support for this to .

    hashtag
    Configuration

    You can configure a Consumer for sampling bypassing the --sample 80 option to nats consumer add, this tells the system to sample 80% of Acknowledgements.

    When viewing info of a Consumer you can tell if it's sampled or not:

    Output contains

    hashtag
    Storage Overhead

    JetStream file storage is very efficient, storing as little extra information about the message as possible.

    We do store some message data with each message, namely:

    • Message headers

    • The subject it was received on

    • The time it was received

    Without any headers the size is:

    A 5 byte hello message without headers will take 39 bytes.

    With headers:

    So if you are publishing many small messages the overhead will be, relatively speaking, quite large, but for larger messages the overhead is very small. If you publish many small messages it's worth trying to optimize the subject length.

    AckTerm

    +TERM

    Instructs the server to stop redelivery of a message without acknowledging it as successfully processed

    The message payload
  • A hash of the message

  • The message sequence

  • A few other bits like the length of the subject and the length of headers

  • LimitsPolicy

    Limits are set for how many messages, how big the storage and how old messages may be.

    WorkQueuePolicy

    Messages are kept until they are consumed: meaning delivered ( by the consumer filtering on the message's subject (in this mode of operation you can not have any overlapping consumers defined on the Stream - each subject captured by the stream can only have one consumer at a time)) to a subscribing application and explicitly acknowledged by that application.

    InterestPolicy

    Messages are kept as long as there are Consumers on the stream (matching the message's subject if they are filtered consumers) for which the message has not yet been ACKed. Once all currently defined consumers have received explicit acknowledgement from a subscribing application for the message it is then removed from the stream.

    AckExplicit

    This requires every message to be specifically acknowledged, it's the only supported option for pull-based Consumers

    AckAll

    In this mode if you acknowledge message 100 it will also acknowledge message 1-99, this is good for processing batches and to reduce ack overhead

    AckNone

    No acknowledgements are supported

    AckAck

    nil, +ACK

    Acknowledges a message was completely handled

    AckNak

    -NAK

    Signals that the message will not be processed now and processing can move onto the next message, NAK'd message will be retried

    AckProgress

    +WPI

    When sent before the AckWait period indicates that work is ongoing and the period should be extended by another equal to AckWait

    AckNext

    +NXT

    all

    Delivers all messages that are available

    last

    Delivers the latest message, like a tail -n 1 -f

    new

    Delivers only new messages that arrive after subscribe time

    by_start_time

    Delivers from a specific time onward. Requires OptStartTime to be set

    by_start_sequence

    Delivers from a specific stream sequence. Requires OptStartSeq to be set

    Message Deduplication
    NATS Surveyorarrow-up-right

    Acknowledges the message was handled and requests delivery of the next message to the reply subject. Only applies to Pull-mode.

    nats req -H Nats-Msg-Id:1 ORDERS.new hello1
    nats req -H Nats-Msg-Id:1 ORDERS.new hello2
    nats req -H Nats-Msg-Id:1 ORDERS.new hello3
    nats req -H Nats-Msg-Id:1 ORDERS.new hello4
    nats stream info ORDERS
    ....
    State:
    
                Messages: 1
                   Bytes: 67 B
    nats str info ORDERS
    ...
    Statistics:
    
                Messages: 0
                   Bytes: 0 B
                FirstSeq: 0
                 LastSeq: 0
        Active Consumers: 1
    nats con info ORDERS DISPATCH
    ...
    State:
    
      Last Delivered Message: Consumer sequence: 1 Stream sequence: 1
        Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
            Pending Messages: 0
        Redelivered Messages: 0
    nats pub ORDERS.processed "order 4"
    Published 7 bytes to ORDERS.processed
    $ nats str info ORDERS
    ...
    Statistics:
    
                Messages: 1
                   Bytes: 53 B
                FirstSeq: 1
                 LastSeq: 1
        Active Consumers: 1
    nats con next ORDERS DISPATCH
    --- received on ORDERS.processed
    order 4
    
    Acknowledged message
    
    $ nats con info ORDERS DISPATCH
    ...
    State:
    
      Last Delivered Message: Consumer sequence: 2 Stream sequence: 2
        Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
            Pending Messages: 0
        Redelivered Messages: 0
    nats pub ORDERS.processed "order 5"
    Published 7 bytes to ORDERS.processed
    nats consumer next ORDERS DISPATCH --no-ack
    --- received on ORDERS.processed
    order 5
    nats consumer info ORDERS DISPATCH
    State:
    
      Last Delivered Message: Consumer sequence: 3 Stream sequence: 3
        Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
            Pending Messages: 1
        Redelivered Messages: 0
    nats consumer next ORDERS DISPATCH --no-ack
    --- received on ORDERS.processed
    order 5
    nats consumer info ORDERS DISPATCH
    State:
    
      Last Delivered Message: Consumer sequence: 4 Stream sequence: 3
        Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
            Pending Messages: 1
        Redelivered Messages: 1
    nats consumer next ORDERS DISPATCH 
    --- received on ORDERS.processed
    order 5
    
    Acknowledged message
    nats consumer info ORDERS DISPATCH
    State:
    
      Last Delivered Message: Consumer sequence: 5 Stream sequence: 3
        Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
            Pending Messages: 0
        Redelivered Messages: 0
    nats consumer add ORDERS ALL --pull --filter ORDERS.processed --ack none --replay instant --deliver all 
    nats consumer next ORDERS ALL
    --- received on ORDERS.processed
    order 1
    
    Acknowledged message
    nats consumer add ORDERS LAST --pull --filter ORDERS.processed --ack none --replay instant --deliver last
    nats consumer next ORDERS LAST
    --- received on ORDERS.processed
    order 100
    
    Acknowledged message
    nats consumer add ORDERS TEN --pull --filter ORDERS.processed --ack none --replay instant --deliver 10
    nats consumer next ORDERS TEN
    --- received on ORDERS.processed
    order 10
    
    Acknowledged message
    nats stream purge ORDERS
    for i in 1 2 3
    do
      nats pub ORDERS.processed "order ${i}"
      sleep 60
    done
    nats consumer add ORDERS 2MIN --pull --filter ORDERS.processed --ack none --replay instant --deliver 2m
    nats consumer next ORDERS 2MIN
    --- received on ORDERS.processed
    order 2
    
    Acknowledged message
    nats sub my.monitor
    nats consumer add ORDERS --filter '' --ack none --target 'my.monitor' --deliver last --replay instant --ephemeral
    nats consumer add ORDERS REPLAY --target out.original --filter ORDERS.processed --ack none --deliver all --sample 100 --replay original
    ...
         Replay Policy: original
    ...
    for i in 1 2 3                                                                                                                                                      <15:15:35
    do
      nats pub ORDERS.processed "order ${i}"
      sleep 10
    done
    Published [ORDERS.processed] : 'order 1'
    Published [ORDERS.processed] : 'order 2'
    Published [ORDERS.processed] : 'order 3'
    nats sub -t out.original
    Listening on [out.original]
    2020/01/03 15:17:26 [#1] Received on [ORDERS.processed]: 'order 1'
    2020/01/03 15:17:36 [#2] Received on [ORDERS.processed]: 'order 2'
    2020/01/03 15:17:46 [#3] Received on [ORDERS.processed]: 'order 3'
    ^C
    nats consumer info ORDERS NEW
    ...
         Sampling Rate: 100
    ...
    length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + msg + hash(8)
    length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + hdr_len(4) + hdr + msg + hash(8)

    Using the Object Store

    The Object Store allows you to store data of any (i.e. large) size by implementing a chunking mechanism, allowing you to for example store and retrieve files (i.e. the object) of any size by associating them with a path and a file name (i.e. the key). You obtain a ObjectStoreManager object from your JetStream contextarrow-up-right.

    // ObjectStoreManager is used to manage object stores. It provides methods
    // for CRUD operations on object stores.
    type ObjectStoreManager interface
    

    See more at jetstream/object.goarrow-up-right

    Publishing to Streams

    /**
     * Object Store Management context for creation and access to key value buckets.
     */
    public interface ObjectStore {
    
        /**
         * Get the name of the object store's bucket.
         * @return the name
         */
        String getBucketName();
    
        /**
         * Place the contents of the input stream into a new object.
         */
        ObjectInfo put(ObjectMeta meta, InputStream inputStream) throws IOException, JetStreamApiException, NoSuchAlgorithmException;
    
        /**
         * Place the contents of the input stream into a new object.
         */
        ObjectInfo put(String objectName, InputStream inputStream) throws IOException, JetStreamApiException, NoSuchAlgorithmException;
    
        /**
         * Place the bytes into a new object.
         */
        ObjectInfo put(String objectName, byte[] input) throws IOException, JetStreamApiException, NoSuchAlgorithmException;
    
        /**
         * Place the contents of the file into a new object using the file name as the object name.
         */
        ObjectInfo put(File file) throws IOException, JetStreamApiException, NoSuchAlgorithmException;
    
        /**
         * Get an object by name from the store, reading it into the output stream, if the object exists.
         */
        ObjectInfo get(String objectName, OutputStream outputStream) throws IOException, JetStreamApiException, InterruptedException, NoSuchAlgorithmException;
    
        /**
         * Get the info for an object if the object exists / is not deleted.
         */
        ObjectInfo getInfo(String objectName) throws IOException, JetStreamApiException;
    
        /**
         * Get the info for an object if the object exists, optionally including deleted.
         */
        ObjectInfo getInfo(String objectName, boolean includingDeleted) throws IOException, JetStreamApiException;
    
        /**
         * Update the metadata of name, description or headers. All other changes are ignored.
         */
        ObjectInfo updateMeta(String objectName, ObjectMeta meta) throws IOException, JetStreamApiException;
    
        /**
         * Delete the object by name. A No-op if the object is already deleted.
         */
        ObjectInfo delete(String objectName) throws IOException, JetStreamApiException;
    
        /**
         * Add a link to another object. A link cannot be for another link.
         */
        ObjectInfo addLink(String objectName, ObjectInfo toInfo) throws IOException, JetStreamApiException;
    
        /**
         * Add a link to another object store (bucket).
         */
        ObjectInfo addBucketLink(String objectName, ObjectStore toStore) throws IOException, JetStreamApiException;
    
        /**
         * Close (seal) the bucket to changes. The store (bucket) will be read only.
         */
        ObjectStoreStatus seal() throws IOException, JetStreamApiException;
    
        /**
         * Get a list of all object [infos] in the store.
         */
        List<ObjectInfo> getList() throws IOException, JetStreamApiException, InterruptedException;
    
        /**
         * Create a watch on the store (bucket).
         */
        NatsObjectStoreWatchSubscription watch(ObjectStoreWatcher watcher, ObjectStoreWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException;
    
        /**
         * Get the ObjectStoreStatus object.
         */
        ObjectStoreStatus getStatus() throws IOException, JetStreamApiException;
    
        async def object_store(self, bucket: str) -> ObjectStore:
    
        async def create_object_store(
            self,
            bucket: str = None,
            config: Optional[api.ObjectStoreConfig] = None,
            **params,
        ) -> ObjectStore:
            """
            create_object_store takes an api.ObjectStoreConfig and creates a OBJ in JetStream.
            """
        async def delete_object_store(self, bucket: str) -> bool:
            """
            delete_object_store will delete the underlying stream for the named object.
            """
    // dotnet add package NATS.Net
    
    /// <summary>
    /// NATS Object Store context.
    /// </summary>
    public interface INatsObjContext
    {
        /// <summary>
        /// Provides access to the JetStream context associated with the Object Store operations.
        /// </summary>
        INatsJSContext JetStreamContext { get; }
    
        /// <summary>
        /// Create a new object store.
        /// </summary>
        /// <param name="bucket">Bucket name.</param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>Object store object.</returns>
        ValueTask<INatsObjStore> CreateObjectStoreAsync(string bucket, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Create a new object store.
        /// </summary>
        /// <param name="config">Object store configuration.</param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>Object store object.</returns>
        ValueTask<INatsObjStore> CreateObjectStoreAsync(NatsObjConfig config, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Get an existing object store.
        /// </summary>
        /// <param name="bucket">Bucket name</param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>The Object Store object</returns>
        ValueTask<INatsObjStore> GetObjectStoreAsync(string bucket, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Delete an object store.
        /// </summary>
        /// <param name="bucket">Name of the bucket.</param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>Whether delete was successful or not.</returns>
        ValueTask<bool> DeleteObjectStore(string bucket, CancellationToken cancellationToken);
    }
    
    /// <summary>
    /// NATS Object Store.
    /// </summary>
    public interface INatsObjStore
    {
        /// <summary>
        /// Provides access to the JetStream context associated with the Object Store operations.
        /// </summary>
        INatsJSContext JetStreamContext { get; }
    
        /// <summary>
        /// Object store bucket name.
        /// </summary>
        string Bucket { get; }
    
        /// <summary>
        /// Get object by key.
        /// </summary>
        /// <param name="key">Object key.</param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>Object value as a byte array.</returns>
        ValueTask<byte[]> GetBytesAsync(string key, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Get object by key.
        /// </summary>
        /// <param name="key">Object key.</param>
        /// <param name="stream">Stream to write the object value to.</param>
        /// <param name="leaveOpen"><c>true</c> to not close the underlying stream when async method returns; otherwise, <c>false</c></param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>Object metadata.</returns>
        /// <exception cref="NatsObjException">Metadata didn't match the value retrieved e.g. the SHA digest.</exception>
        ValueTask<ObjectMetadata> GetAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Put an object by key.
        /// </summary>
        /// <param name="key">Object key.</param>
        /// <param name="value">Object value as a byte array.</param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>Object metadata.</returns>
        ValueTask<ObjectMetadata> PutAsync(string key, byte[] value, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Put an object by key.
        /// </summary>
        /// <param name="key">Object key.</param>
        /// <param name="stream">Stream to read the value from.</param>
        /// <param name="leaveOpen"><c>true</c> to not close the underlying stream when async method returns; otherwise, <c>false</c></param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>Object metadata.</returns>
        /// <exception cref="NatsObjException">There was an error calculating SHA digest.</exception>
        /// <exception cref="NatsJSApiException">Server responded with an error.</exception>
        ValueTask<ObjectMetadata> PutAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Put an object by key.
        /// </summary>
        /// <param name="meta">Object metadata.</param>
        /// <param name="stream">Stream to read the value from.</param>
        /// <param name="leaveOpen"><c>true</c> to not close the underlying stream when async method returns; otherwise, <c>false</c></param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>Object metadata.</returns>
        /// <exception cref="NatsObjException">There was an error calculating SHA digest.</exception>
        /// <exception cref="NatsJSApiException">Server responded with an error.</exception>
        ValueTask<ObjectMetadata> PutAsync(ObjectMetadata meta, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Update object metadata
        /// </summary>
        /// <param name="key">Object key</param>
        /// <param name="meta">Object metadata</param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>Object metadata</returns>
        /// <exception cref="NatsObjException">There is already an object with the same name</exception>
        ValueTask<ObjectMetadata> UpdateMetaAsync(string key, ObjectMetadata meta, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Add a link to another object
        /// </summary>
        /// <param name="link">Link name</param>
        /// <param name="target">Target object's name</param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>Metadata of the new link object</returns>
        ValueTask<ObjectMetadata> AddLinkAsync(string link, string target, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Add a link to another object
        /// </summary>
        /// <param name="link">Link name</param>
        /// <param name="target">Target object's metadata</param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>Metadata of the new link object</returns>
        ValueTask<ObjectMetadata> AddLinkAsync(string link, ObjectMetadata target, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Add a link to another object store
        /// </summary>
        /// <param name="link">Object's name to be linked</param>
        /// <param name="target">Target object store</param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>Metadata of the new link object</returns>
        /// <exception cref="NatsObjException">Object with the same name already exists</exception>
        ValueTask<ObjectMetadata> AddBucketLinkAsync(string link, INatsObjStore target, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Seal the object store. No further modifications will be allowed.
        /// </summary>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <exception cref="NatsObjException">Update operation failed</exception>
        ValueTask SealAsync(CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Get object metadata by key.
        /// </summary>
        /// <param name="key">Object key.</param>
        /// <param name="showDeleted">Also retrieve deleted objects.</param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>Object metadata.</returns>
        /// <exception cref="NatsObjException">Object was not found.</exception>
        ValueTask<ObjectMetadata> GetInfoAsync(string key, bool showDeleted = false, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// List all the objects in this store.
        /// </summary>
        /// <param name="opts">List options</param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>An async enumerable object metadata to be used in an <c>await foreach</c></returns>
        IAsyncEnumerable<ObjectMetadata> ListAsync(NatsObjListOpts? opts = default, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Retrieves run-time status about the backing store of the bucket.
        /// </summary>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>Object store status</returns>
        ValueTask<NatsObjStatus> GetStatusAsync(CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Watch for changes in the underlying store and receive meta information updates.
        /// </summary>
        /// <param name="opts">Watch options</param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <returns>An async enumerable object metadata to be used in an <c>await foreach</c></returns>
        IAsyncEnumerable<ObjectMetadata> WatchAsync(NatsObjWatchOpts? opts = default, CancellationToken cancellationToken = default);
    
        /// <summary>
        /// Delete an object by key.
        /// </summary>
        /// <param name="key">Object key.</param>
        /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
        /// <exception cref="NatsObjException">Object metadata was invalid or chunks can't be purged.</exception>
        ValueTask DeleteAsync(string key, CancellationToken cancellationToken = default);
    }
    {
    // ObjectStore will look up and bind to an existing object store
    // instance.
    //
    // If the object store with given name does not exist, ErrBucketNotFound
    // will be returned.
    ObjectStore(ctx context.Context, bucket string) (ObjectStore, error)
    // CreateObjectStore will create a new object store with the given
    // configuration.
    //
    // If the object store with given name already exists, ErrBucketExists
    // will be returned.
    CreateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error)
    // UpdateObjectStore will update an existing object store with the given
    // configuration.
    //
    // If the object store with given name does not exist, ErrBucketNotFound
    // will be returned.
    UpdateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error)
    // CreateOrUpdateObjectStore will create a new object store with the given
    // configuration if it does not exist, or update an existing object store
    // with the given configuration.
    CreateOrUpdateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error)
    // DeleteObjectStore will delete the provided object store.
    //
    // If the object store with given name does not exist, ErrBucketNotFound
    // will be returned.
    DeleteObjectStore(ctx context.Context, bucket string) error
    // ObjectStoreNames is used to retrieve a list of bucket names.
    // It returns an ObjectStoreNamesLister exposing a channel to receive
    // the names of the object stores.
    //
    // The lister will always close the channel when done (either all names
    // have been read or an error occurred) and therefore can be used in a
    // for-range loop.
    ObjectStoreNames(ctx context.Context) ObjectStoreNamesLister
    // ObjectStores is used to retrieve a list of bucket statuses.
    // It returns an ObjectStoresLister exposing a channel to receive
    // the statuses of the object stores.
    //
    // The lister will always close the channel when done (either all statuses
    // have been read or an error occurred) and therefore can be used in a
    // for-range loop.
    ObjectStores(ctx context.Context) ObjectStoresLister
    }
    // ObjectStore contains methods to operate on an object store.
    // Using the ObjectStore interface, it is possible to:
    //
    // - Perform CRUD operations on objects (Get, Put, Delete).
    // Get and put expose convenience methods to work with
    // byte slices, strings and files, in addition to streaming [io.Reader]
    // - Get information about an object without retrieving it.
    // - Update the metadata of an object.
    // - Add links to other objects or object stores.
    // - Watch for updates to a store
    // - List information about objects in a store
    // - Retrieve status and configuration of an object store.
    type ObjectStore interface {
    // Put will place the contents from the reader into a new object. If the
    // object already exists, it will be overwritten. The object name is
    // required and is taken from the ObjectMeta.Name field.
    //
    // The reader will be read until EOF. ObjectInfo will be returned, containing
    // the object's metadata, digest and instance information.
    Put(ctx context.Context, obj ObjectMeta, reader io.Reader) (*ObjectInfo, error)
    // PutBytes is convenience function to put a byte slice into this object
    // store under the given name.
    //
    // ObjectInfo will be returned, containing the object's metadata, digest
    // and instance information.
    PutBytes(ctx context.Context, name string, data []byte) (*ObjectInfo, error)
    // PutString is convenience function to put a string into this object
    // store under the given name.
    //
    // ObjectInfo will be returned, containing the object's metadata, digest
    // and instance information.
    PutString(ctx context.Context, name string, data string) (*ObjectInfo, error)
    // PutFile is convenience function to put a file contents into this
    // object store. The name of the object will be the path of the file.
    //
    // ObjectInfo will be returned, containing the object's metadata, digest
    // and instance information.
    PutFile(ctx context.Context, file string) (*ObjectInfo, error)
    // Get will pull the named object from the object store. If the object
    // does not exist, ErrObjectNotFound will be returned.
    //
    // The returned ObjectResult will contain the object's metadata and a
    // reader to read the object's contents. The reader will be closed when
    // all data has been read or an error occurs.
    //
    // A GetObjectShowDeleted option can be supplied to return an object
    // even if it was marked as deleted.
    Get(ctx context.Context, name string, opts ...GetObjectOpt) (ObjectResult, error)
    // GetBytes is a convenience function to pull an object from this object
    // store and return it as a byte slice.
    //
    // If the object does not exist, ErrObjectNotFound will be returned.
    //
    // A GetObjectShowDeleted option can be supplied to return an object
    // even if it was marked as deleted.
    GetBytes(ctx context.Context, name string, opts ...GetObjectOpt) ([]byte, error)
    // GetString is a convenience function to pull an object from this
    // object store and return it as a string.
    //
    // If the object does not exist, ErrObjectNotFound will be returned.
    //
    // A GetObjectShowDeleted option can be supplied to return an object
    // even if it was marked as deleted.
    GetString(ctx context.Context, name string, opts ...GetObjectOpt) (string, error)
    // GetFile is a convenience function to pull an object from this object
    // store and place it in a file. If the file already exists, it will be
    // overwritten, otherwise it will be created.
    //
    // If the object does not exist, ErrObjectNotFound will be returned.
    // A GetObjectShowDeleted option can be supplied to return an object
    // even if it was marked as deleted.
    GetFile(ctx context.Context, name, file string, opts ...GetObjectOpt) error
    // GetInfo will retrieve the current information for the object, containing
    // the object's metadata and instance information.
    //
    // If the object does not exist, ErrObjectNotFound will be returned.
    //
    // A GetObjectInfoShowDeleted option can be supplied to return an object
    // even if it was marked as deleted.
    GetInfo(ctx context.Context, name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error)
    // UpdateMeta will update the metadata for the object.
    //
    // If the object does not exist, ErrUpdateMetaDeleted will be returned.
    // If the new name is different from the old name, and an object with the
    // new name already exists, ErrObjectAlreadyExists will be returned.
    UpdateMeta(ctx context.Context, name string, meta ObjectMeta) error
    // Delete will delete the named object from the object store. If the object
    // does not exist, ErrObjectNotFound will be returned. If the object is
    // already deleted, no error will be returned.
    //
    // All chunks for the object will be purged, and the object will be marked
    // as deleted.
    Delete(ctx context.Context, name string) error
    // AddLink will add a link to another object. A link is a reference to
    // another object. The provided name is the name of the link object.
    // The provided ObjectInfo is the info of the object being linked to.
    //
    // If an object with given name already exists, ErrObjectAlreadyExists
    // will be returned.
    // If object being linked to is deleted, ErrNoLinkToDeleted will be
    // returned.
    // If the provided object is a link, ErrNoLinkToLink will be returned.
    // If the provided object is nil or the name is empty, ErrObjectRequired
    // will be returned.
    AddLink(ctx context.Context, name string, obj *ObjectInfo) (*ObjectInfo, error)
    // AddBucketLink will add a link to another object store. A link is a
    // reference to another object store. The provided name is the name of
    // the link object.
    // The provided ObjectStore is the object store being linked to.
    //
    // If an object with given name already exists, ErrObjectAlreadyExists
    // will be returned.
    // If the provided object store is nil ErrBucketRequired will be returned.
    AddBucketLink(ctx context.Context, name string, bucket ObjectStore) (*ObjectInfo, error)
    // Seal will seal the object store, no further modifications will be allowed.
    Seal(ctx context.Context) error
    // Watch for any updates to objects in the store. By default, the watcher will send the latest
    // info for each object and all future updates. Watch will send a nil
    // entry when it has received all initial values. There are a few ways
    // to configure the watcher:
    //
    // - IncludeHistory will have the watcher send all historical information
    // for each object.
    // - IgnoreDeletes will have the watcher not pass any objects with
    // delete markers.
    // - UpdatesOnly will have the watcher only pass updates on objects
    // (without latest info when started).
    Watch(ctx context.Context, opts ...WatchOpt) (ObjectWatcher, error)
    // List will list information about objects in the store.
    //
    // If the object store is empty, ErrNoObjectsFound will be returned.
    List(ctx context.Context, opts ...ListObjectsOpt) ([]*ObjectInfo, error)
    // Status retrieves the status and configuration of the bucket.
    Status(ctx context.Context) (ObjectStoreStatus, error)
    }
    func ExampleJetStream() {
    	nc, err := nats.Connect("localhost")
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	// Use the JetStream context to produce and consumer messages
    	// that have been persisted.
    	js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	js.AddStream(&nats.StreamConfig{
    		Name:     "example-stream",
    		Subjects: []string{"example-subject"},
    	})
    
    	js.Publish("example-subject", []byte("Hello JS!"))
    
    	// Publish messages asynchronously.
    	for i := 0; i < 500; i++ {
    		js.PublishAsync("example-subject", []byte("Hello JS Async!"))
    	}
    	select {
    	case <-js.PublishAsyncComplete():
    	case <-time.After(5 * time.Second):
    		fmt.Println("Did not resolve in time")
    	}
    }
    try (Connection nc = Nats.connect("localhost")) {
        JetStreamManagement jsm = nc.jetStreamManagement();
        jsm.addStream(StreamConfiguration.builder()
            .name("example-stream")
            .subjects("example-subject")
            .build());
    
        JetStream js = jsm.jetStream();
    
        // Publish Synchronously
        PublishAck pa = js.publish("example-subject", "Hello JS Sync!".getBytes());
        System.out.println("Publish Sequence: " + pa.getSeqno());
    
        // Publish Asynchronously
        CompletableFuture<PublishAck> future =
            js.publishAsync("example-subject", "Hello JS Async!".getBytes());
    
        try {
            pa = future.get(1, TimeUnit.SECONDS);
            System.out.println("Publish Sequence: " + pa.getSeqno());
        }
        catch (ExecutionException e) {
            // Might have been a problem with the publish,
            // such as a failed expectation (advanced feature)
            // Also could be that the publish ack did not return in time
            // from the internal request timeout
        }
        catch (TimeoutException e) {
            // The future timed out meaning it's timeout was shorter than
            // the publish async's request timeout
        }
        catch (InterruptedException e) {
            // The future.get() thread was interrupted.
        }
    }
    import { connect, Empty } from "../../src/mod.ts";
    
    const nc = await connect();
    
    const jsm = await nc.jetstreamManager();
    await jsm.streams.add({ name: "example-stream", subjects: ["example-subject"] });
    
    const js = await nc.jetstream();
    // the jetstream client provides a publish that returns
    // a confirmation that the message was received and stored
    // by the server. You can associate various expectations
    // when publishing a message to prevent duplicates.
    // If the expectations are not met, the message is rejected.
    let pa = await js.publish("example-subject", Empty, {
      msgID: "a",
      expect: { streamName: "example-stream" },
    });
    console.log(`${pa.stream}[${pa.seq}]: duplicate? ${pa.duplicate}`);
    
    pa = await js.publish("example-subject", Empty, {
      msgID: "a",
      expect: { lastSequence: 1 },
    });
    console.log(`${pa.stream}[${pa.seq}]: duplicate? ${pa.duplicate}`);
    
    await jsm.streams.delete("example-stream");
    await nc.drain();
    import asyncio
    
    import nats
    from nats.errors import TimeoutError
    
    
    async def main():
        nc = await nats.connect("localhost")
    
        # Create JetStream context.
        js = nc.jetstream()
    
        # Persist messages on 'example-subject'.
        await js.add_stream(name="example-stream", subjects=["example-subject"])
    
        for i in range(0, 10):
            ack = await js.publish("example-subject", f"hello world: {i}".encode())
            print(ack)
    
        await nc.close()
    
    if __name__ == '__main__':
        asyncio.run(main())
    // dotnet add package NATS.Net
    using NATS.Net;
    using NATS.Client.JetStream;
    using NATS.Client.JetStream.Models;
    
    await using var client = new NatsClient();
    
    INatsJSContext js = client.CreateJetStreamContext();
    
    // Create a stream
    var streamConfig = new StreamConfig(name: "example-stream", subjects: ["example-subject"]);
    await js.CreateStreamAsync(streamConfig);
    
    // Publish a message
    {
        PubAckResponse ack = await js.PublishAsync("example-subject", "Hello, JetStream!");
        ack.EnsureSuccess();
    }
    
    // Publish messages concurrently
    List<NatsJSPublishConcurrentFuture> futures = new();
    for (var i = 0; i < 500; i++)
    {
        NatsJSPublishConcurrentFuture future
            = await js.PublishConcurrentAsync("example-subject", "Hello, JetStream 1!");
        futures.Add(future);
    }
    
    foreach (var future in futures)
    {
        await using (future)
        {
            PubAckResponse ack = await future.GetResponseAsync();
            ack.EnsureSuccess();
        }
    }
    #include "examples.h"
    
    static const char *usage = ""\
    "-stream        stream name (default is 'foo')\n" \
    "-txt           text to send (default is 'hello')\n" \
    "-count         number of messages to send\n" \
    "-sync          publish synchronously (default is async)\n";
    
    static void
    _jsPubErr(jsCtx *js, jsPubAckErr *pae, void *closure)
    {
        int *errors = (int*) closure;
    
        printf("Error: %u - Code: %u - Text: %s\n", pae->Err, pae->ErrCode, pae->ErrText);
        printf("Original message: %.*s\n", natsMsg_GetDataLength(pae->Msg), natsMsg_GetData(pae->Msg));
    
        *errors = (*errors + 1);
    
        // If we wanted to resend the original message, we would do something like that:
        //
        // js_PublishMsgAsync(js, &(pae->Msg), NULL);
        //
        // Note that we use `&(pae->Msg)` so that the library set it to NULL if it takes
        // ownership, and the library will not destroy the message when this callback returns.
    
        // No need to destroy anything, everything is handled by the library.
    }
    
    int main(int argc, char **argv)
    {
        natsConnection      *conn  = NULL;
        natsStatistics      *stats = NULL;
        natsOptions         *opts  = NULL;
        jsCtx               *js    = NULL;
        jsOptions           jsOpts;
        jsErrCode           jerr   = 0;
        natsStatus          s;
        int                 dataLen=0;
        volatile int        errors = 0;
        bool                delStream = false;
    
        opts = parseArgs(argc, argv, usage);
        dataLen = (int) strlen(payload);
    
        s = natsConnection_Connect(&conn, opts);
    
        if (s == NATS_OK)
            s = jsOptions_Init(&jsOpts);
    
        if (s == NATS_OK)
        {
            if (async)
            {
                jsOpts.PublishAsync.ErrHandler           = _jsPubErr;
                jsOpts.PublishAsync.ErrHandlerClosure    = (void*) &errors;
            }
            s = natsConnection_JetStream(&js, conn, &jsOpts);
        }
    
        if (s == NATS_OK)
        {
            jsStreamInfo    *si = NULL;
    
            // First check if the stream already exists.
            s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
            if (s == NATS_NOT_FOUND)
            {
                jsStreamConfig  cfg;
    
                // Since we are the one creating this stream, we can delete at the end.
                delStream = true;
    
                // Initialize the configuration structure.
                jsStreamConfig_Init(&cfg);
                cfg.Name = stream;
                // Set the subject
                cfg.Subjects = (const char*[1]){subj};
                cfg.SubjectsLen = 1;
                // Make it a memory stream.
                cfg.Storage = js_MemoryStorage;
                // Add the stream,
                s = js_AddStream(&si, js, &cfg, NULL, &jerr);
            }
            if (s == NATS_OK)
            {
                printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
                    si->Config->Name, si->State.Msgs, si->State.Bytes);
    
                // Need to destroy the returned stream object.
                jsStreamInfo_Destroy(si);
            }
        }
    
        if (s == NATS_OK)
            s = natsStatistics_Create(&stats);
    
        if (s == NATS_OK)
        {
            printf("\nSending %" PRId64 " messages to subject '%s'\n", total, stream);
            start = nats_Now();
        }
    
        for (count = 0; (s == NATS_OK) && (count < total); count++)
        {
            if (async)
                s = js_PublishAsync(js, subj, (const void*) payload, dataLen, NULL);
            else
            {
                jsPubAck *pa = NULL;
    
                s = js_Publish(&pa, js, subj, (const void*) payload, dataLen, NULL, &jerr);
                if (s == NATS_OK)
                {
                    if (pa->Duplicate)
                        printf("Got a duplicate message! Sequence=%" PRIu64 "\n", pa->Sequence);
    
                    jsPubAck_Destroy(pa);
                }
            }
        }
    
        if ((s == NATS_OK) && async)
        {
            jsPubOptions    jsPubOpts;
    
            jsPubOptions_Init(&jsPubOpts);
            // Let's set it to 30 seconds, if getting "Timeout" errors,
            // this may need to be increased based on the number of messages
            // being sent.
            jsPubOpts.MaxWait = 30000;
            s = js_PublishAsyncComplete(js, &jsPubOpts);
            if (s == NATS_TIMEOUT)
            {
                // Let's get the list of pending messages. We could resend,
                // etc, but for now, just destroy them.
                natsMsgList list;
    
                js_PublishAsyncGetPendingList(&list, js);
                natsMsgList_Destroy(&list);
            }
        }
    
        if (s == NATS_OK)
        {
            jsStreamInfo *si = NULL;
    
            elapsed = nats_Now() - start;
            printStats(STATS_OUT, conn, NULL, stats);
            printPerf("Sent");
    
            if (errors != 0)
                printf("There were %d asynchronous errors\n", errors);
    
            // Let's report some stats after the run
            s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
            if (s == NATS_OK)
            {
                printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
                    si->Config->Name, si->State.Msgs, si->State.Bytes);
    
                jsStreamInfo_Destroy(si);
            }
        }
        if (delStream && (js != NULL))
        {
            printf("\nDeleting stream %s: ", stream);
            s = js_DeleteStream(js, stream, NULL, &jerr);
            if (s == NATS_OK)
                printf("OK!");
            printf("\n");
        }
        if (s != NATS_OK)
        {
            printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
            nats_PrintLastErrorStack(stderr);
        }
    
        // Destroy all our objects to avoid report of memory leak
        jsCtx_Destroy(js);
        natsStatistics_Destroy(stats);
        natsConnection_Destroy(conn);
        natsOptions_Destroy(opts);
    
        // To silence reports of memory still in used with valgrind
        nats_Close();
    
        return 0;
    }

    Managing Streams and consumers

    Streams and durable consumers can be defined administratively outside the application (typically using the NATS CLI Tool) in which case the application only needs to know about the well-known names of the durable consumers it wants to use. But you can also manage streams and consumers programmatically.

    Common stream management operations are:

    • Add a stream. Adding a stream is an idempotent function, which means that if a stream does not exist, it will be created, and if a stream already exists, then the add operation will succeed only if the existing stream matches exactly the attributes specified in the 'add' call.

    • Delete a stream.

    • Purge a stream (delete all the messages stored in the stream)

    • Get or remove a specific message from a stream by sequence number

    • Add or update (or delete) a consumer

    • Get info and statistics on streams/consumers/account. Get/remove/get information on individual messages stored in a stream.

    func ExampleJetStreamManager() {
    	nc, _ := nats.Connect("localhost")
    
    	js, _ := nc.JetStream()
    
    	// Create a stream
    	js.AddStream(&nats.StreamConfig{
    		Name:     "example-stream",
    		Subjects: []string{"example-subject"},
    		MaxBytes: 1024,
    	})
    
    	// Update a stream
    	js.UpdateStream(&nats.StreamConfig{
    		Name:     "example-stream",
    		MaxBytes: 2048,
    	})
    
    	// Create a durable consumer
    	js.AddConsumer("example-stream", &nats.ConsumerConfig{
    		Durable: "example-consumer-name",
    	})
    
    	// Get information about all streams (with Context JSOpt)
    	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second
    	defer cancel()
    	for info := range js.StreamsInfo(nats.Context(ctx)) {
    		fmt.Println("stream name: ", info.Config.Name)
    	}
    
    	// Get information about all consumers (with MaxWait JSOpt)
    	for info := range js.ConsumersInfo("example-stream", nats.MaxWait(
    		fmt.Println("consumer name: ", info.Name)
    	}
    
    	// Delete a consumer
    	js.DeleteConsumer("example-stream", "example-consumer-name")
    
    	// Delete a stream
    	js.DeleteStream("example-stream")
    }

    Using the Key/Value Store

    As the Key Value Store is built on top of the JetStream persistence layer you obtain a KeyValueManager object from your JetStream contextarrow-up-right.

    The key must be in the same format as a NATS subject, i.e. it can be a dot separated list of tokens (which means that you can then use wildcards to match hierarchies of keys when watching a bucket), and can only contain valid characters. The value can be any byte array.

    hashtag
    Creating, and deleting KV buckets

    You can create as many independent key/value store instance, called 'buckets', as you need. Buckets are typically created, purged or deleted administratively (e.g. using the nats CLI tool), but this can also be done using one of the following KeyValueManager calls:

    hashtag
    Getting

    You can do a get to get the current value on a key, or ask to get a specific revision of the value.

    hashtag
    Putting

    The key is always a string, you can simply use Put to store a byte array, or the convenience PutString to put a string. For 'compare and set' functionality you can use Create and Update.

    hashtag
    Deleting

    You can delete a specific key, or purge the whole key/value bucket.

    hashtag
    Getting all the keys

    You can get the list of all the keys currently having a value associated using Keys()

    hashtag
    Getting the history for a key

    The JetStream key/value store has a feature you don't usually find in key/value stores: the ability to keep a history of the values associated with a key (rather than just the current value). The depth of the history is specified when the key/value bucket is created, and the default is a history depth of 1 (i.e. no history). The maximum history size is 64, if you need more your use case will be better implemented using the Stream functionality directly (where you can set the max number of messages per subject to any value you want) rather than the KV abstraction.

    hashtag
    Watching for changes

    Watching a key/value bucket is like subscribing to updates: you provide a callback and you can watch all of the keys in the bucket or specify which specific key(s) you want to be kept updated about.

    try (Connection nc = Nats.connect("localhost")) {
        JetStreamManagement jsm = nc.jetStreamManagement();
    
        // Create a stream
        StreamInfo si = jsm.addStream(StreamConfiguration.builder()
            .name("example-stream")
            .subjects("example-subject")
            .maxBytes(1024)
            .build());
        StreamConfiguration config = si.getConfiguration();
        System.out.println("stream name: " + config.getName() + ", max_bytes: " + config.getMaxBytes());
    
        // Update a stream
        si = jsm.updateStream(StreamConfiguration.builder()
            .name("example-stream")
            .maxBytes(2048)
            .build());
        config = si.getConfiguration();
        System.out.println("stream name: " + config.getName() + ", max_bytes: " + config.getMaxBytes());
    
        // Create a durable consumer
        jsm.createConsumer("example-stream", ConsumerConfiguration.builder()
            .durable("example-consumer-name")
            .build());
    
        // Get information about all streams
        List<StreamInfo> streams = jsm.getStreams();
        for (StreamInfo info : streams) {
            System.out.println("stream name: " + info.getConfiguration().getName());
        }
    
        // Get information about all consumers
        List<ConsumerInfo> consumers = jsm.getConsumers("example-stream");
        for (ConsumerInfo ci : consumers) {
            System.out.println("consumer name: " + ci.getName());
        }
    
        // Delete a consumer
        jsm.deleteConsumer("example-stream", "example-consumer-name");
    
        // Delete a stream
        jsm.deleteStream("example-stream");
    }
    import { AckPolicy, connect, Empty } from "../../src/mod.ts";
    
    const nc = await connect();
    const jsm = await nc.jetstreamManager();
    
    // list all the streams, the `next()` function
    // retrieves a paged result.
    const streams = await jsm.streams.list().next();
    streams.forEach((si) => {
        console.log(si);
    });
    
    // add a stream
    const stream = "mystream";
    const subj = `mystream.*`;
    await jsm.streams.add({ name: stream, subjects: [subj] });
    
    // publish a reg nats message directly to the stream
    for (let i = 0; i < 10; i++) {
        nc.publish(`${subj}.a`, Empty);
    }
    
    // find a stream that stores a specific subject:
    const name = await jsm.streams.find("mystream.A");
    
    // retrieve info about the stream by its name
    const si = await jsm.streams.info(name);
    
    // update a stream configuration
    si.config.subjects?.push("a.b");
    await jsm.streams.update(name, si.config);
    
    // get a particular stored message in the stream by sequence
    // this is not associated with a consumer
    const sm = await jsm.streams.getMessage(stream, { seq: 1 });
    console.log(sm.seq);
    
    // delete the 5th message in the stream, securely erasing it
    await jsm.streams.deleteMessage(stream, 5);
    
    // purge all messages in the stream, the stream itself
    // remains.
    await jsm.streams.purge(stream);
    
    // purge all messages with a specific subject (filter can be a wildcard)
    await jsm.streams.purge(stream, { filter: "a.b" });
    
    // purge messages with a specific subject keeping some messages
    await jsm.streams.purge(stream, { filter: "a.c", keep: 5 });
    
    // purge all messages with upto (not including seq)
    await jsm.streams.purge(stream, { seq: 100 });
    
    // purge all messages with upto sequence that have a matching subject
    await jsm.streams.purge(stream, { filter: "a.d", seq: 100 });
    
    // list all consumers for a stream:
    const consumers = await jsm.consumers.list(stream).next();
    consumers.forEach((ci) => {
        console.log(ci);
    });
    
    // add a new durable pull consumer
    await jsm.consumers.add(stream, {
        durable_name: "me",
        ack_policy: AckPolicy.Explicit,
    });
    
    // retrieve a consumer's configuration
    const ci = await jsm.consumers.info(stream, "me");
    console.log(ci);
    
    // delete a particular consumer
    await jsm.consumers.delete(stream, "me");
    import asyncio
    
    import nats
    from nats.errors import TimeoutError
        
    async def main():
        nc = await nats.connect("localhost")
    
        # Create JetStream context.
        js = nc.jetstream()
            
        # Persist messages on 'foo's subject.
        await js.add_stream(name="sample-stream", subjects=["foo"])
    
        await nc.close()
    
    if __name__ == '__main__':
        asyncio.run(main())    
    // dotnet add package NATS.Net
    using NATS.Net;
    using NATS.Client.JetStream;
    using NATS.Client.JetStream.Models;
    
    await using var client = new NatsClient();
    
    INatsJSContext js = client.CreateJetStreamContext();
    
    // Create a stream
    var streamConfig = new StreamConfig(name: "example-stream", subjects: ["example-subject"])
    {
        MaxBytes = 1024,
    };
    await js.CreateStreamAsync(streamConfig);
    
    // Update the stream
    var streamConfigUpdated = streamConfig with { MaxBytes = 2048 };
    await js.UpdateStreamAsync(streamConfigUpdated);
    
    // Create a durable consumer
    await js.CreateConsumerAsync("example-stream", new ConsumerConfig("example-consumer-name"));
    
    // Get information about all streams
    await foreach (var stream in js.ListStreamsAsync())
    {
        Console.WriteLine($"stream name: {stream.Info.Config.Name}");
    }
    
    // Get information about all consumers in a stream
    await foreach (var consumer in js.ListConsumersAsync("example-stream"))
    {
        Console.WriteLine($"consumer name: {consumer.Info.Config.Name}");
    }
    
    // Delete a consumer
    await js.DeleteConsumerAsync("example-stream", "example-consumer-name");
    
    // Delete a stream
    await js.DeleteStreamAsync("example-stream");
    
    // Output:
    // stream name: example-stream
    // consumer name: example-consumer-name
    #include "examples.h"
    
    static const char *usage = ""\
    "-stream        stream name (default is 'foo')\n" \
    "-txt           text to send (default is 'hello')\n" \
    "-count         number of messages to send\n" \
    "-sync          publish synchronously (default is async)\n";
    
    static void
    _jsPubErr(jsCtx *js, jsPubAckErr *pae, void *closure)
    {
        int *errors = (int*) closure;
    
        printf("Error: %u - Code: %u - Text: %s\n", pae->Err, pae->ErrCode, pae->ErrText);
        printf("Original message: %.*s\n", natsMsg_GetDataLength(pae->Msg), natsMsg_GetData(pae->Msg));
    
        *errors = (*errors + 1);
    
        // If we wanted to resend the original message, we would do something like that:
        //
        // js_PublishMsgAsync(js, &(pae->Msg), NULL);
        //
        // Note that we use `&(pae->Msg)` so that the library set it to NULL if it takes
        // ownership, and the library will not destroy the message when this callback returns.
    
        // No need to destroy anything, everything is handled by the library.
    }
    
    int main(int argc, char **argv)
    {
        natsConnection      *conn  = NULL;
        natsStatistics      *stats = NULL;
        natsOptions         *opts  = NULL;
        jsCtx               *js    = NULL;
        jsOptions           jsOpts;
        jsErrCode           jerr   = 0;
        natsStatus          s;
        int                 dataLen=0;
        volatile int        errors = 0;
        bool                delStream = false;
    
        opts = parseArgs(argc, argv, usage);
        dataLen = (int) strlen(payload);
    
        s = natsConnection_Connect(&conn, opts);
    
        if (s == NATS_OK)
            s = jsOptions_Init(&jsOpts);
    
        if (s == NATS_OK)
        {
            if (async)
            {
                jsOpts.PublishAsync.ErrHandler           = _jsPubErr;
                jsOpts.PublishAsync.ErrHandlerClosure    = (void*) &errors;
            }
            s = natsConnection_JetStream(&js, conn, &jsOpts);
        }
    
        if (s == NATS_OK)
        {
            jsStreamInfo    *si = NULL;
    
            // First check if the stream already exists.
            s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
            if (s == NATS_NOT_FOUND)
            {
                jsStreamConfig  cfg;
    
                // Since we are the one creating this stream, we can delete at the end.
                delStream = true;
    
                // Initialize the configuration structure.
                jsStreamConfig_Init(&cfg);
                cfg.Name = stream;
                // Set the subject
                cfg.Subjects = (const char*[1]){subj};
                cfg.SubjectsLen = 1;
                // Make it a memory stream.
                cfg.Storage = js_MemoryStorage;
                // Add the stream,
                s = js_AddStream(&si, js, &cfg, NULL, &jerr);
            }
            if (s == NATS_OK)
            {
                printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
                    si->Config->Name, si->State.Msgs, si->State.Bytes);
    
                // Need to destroy the returned stream object.
                jsStreamInfo_Destroy(si);
            }
        }
    
        if (s == NATS_OK)
            s = natsStatistics_Create(&stats);
    
        if (s == NATS_OK)
        {
            printf("\nSending %" PRId64 " messages to subject '%s'\n", total, stream);
            start = nats_Now();
        }
    
        for (count = 0; (s == NATS_OK) && (count < total); count++)
        {
            if (async)
                s = js_PublishAsync(js, subj, (const void*) payload, dataLen, NULL);
            else
            {
                jsPubAck *pa = NULL;
    
                s = js_Publish(&pa, js, subj, (const void*) payload, dataLen, NULL, &jerr);
                if (s == NATS_OK)
                {
                    if (pa->Duplicate)
                        printf("Got a duplicate message! Sequence=%" PRIu64 "\n", pa->Sequence);
    
                    jsPubAck_Destroy(pa);
                }
            }
        }
    
        if ((s == NATS_OK) && async)
        {
            jsPubOptions    jsPubOpts;
    
            jsPubOptions_Init(&jsPubOpts);
            // Let's set it to 30 seconds, if getting "Timeout" errors,
            // this may need to be increased based on the number of messages
            // being sent.
            jsPubOpts.MaxWait = 30000;
            s = js_PublishAsyncComplete(js, &jsPubOpts);
            if (s == NATS_TIMEOUT)
            {
                // Let's get the list of pending messages. We could resend,
                // etc, but for now, just destroy them.
                natsMsgList list;
    
                js_PublishAsyncGetPendingList(&list, js);
                natsMsgList_Destroy(&list);
            }
        }
    
        if (s == NATS_OK)
        {
            jsStreamInfo *si = NULL;
    
            elapsed = nats_Now() - start;
            printStats(STATS_OUT, conn, NULL, stats);
            printPerf("Sent");
    
            if (errors != 0)
                printf("There were %d asynchronous errors\n", errors);
    
            // Let's report some stats after the run
            s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
            if (s == NATS_OK)
            {
                printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
                    si->Config->Name, si->State.Msgs, si->State.Bytes);
    
                jsStreamInfo_Destroy(si);
            }
        }
        if (delStream && (js != NULL))
        {
            printf("\nDeleting stream %s: ", stream);
            s = js_DeleteStream(js, stream, NULL, &jerr);
            if (s == NATS_OK)
                printf("OK!");
            printf("\n");
        }
        if (s != NATS_OK)
        {
            printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
            nats_PrintLastErrorStack(stderr);
        }
    
        // Destroy all our objects to avoid report of memory leak
        jsCtx_Destroy(js);
        natsStatistics_Destroy(stats);
        natsConnection_Destroy(conn);
        natsOptions_Destroy(opts);
    
        // To silence reports of memory still in used with valgrind
        nats_Close();
    
        return 0;
    }
    )
    10
    *
    time
    .
    Second
    ))
    {
    // KeyValue will lookup and bind to an existing KeyValue store.
    KeyValue(bucket string) (KeyValue, error)
    // CreateKeyValue will create a KeyValue store with the following configuration.
    CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error)
    // DeleteKeyValue will delete this KeyValue store (JetStream stream).
    DeleteKeyValue(bucket string) error
    /**
     * Create a key value store.
     * @param config the key value configuration
     * @return bucket info
     * @throws IOException covers various communication issues with the NATS
     *         server such as timeout or interruption
     * @throws JetStreamApiException the request had an error related to the data
     * @throws IllegalArgumentException the server is not JetStream enabled
     */
    KeyValueStatus create(KeyValueConfiguration config) throws IOException, JetStreamApiException;
    
    /**
    * Get the list of bucket names.
    * @return list of bucket names
    * @throws IOException covers various communication issues with the NATS
    *         server such as timeout or interruption
    * @throws JetStreamApiException the request had an error related to the data
    * @throws InterruptedException if the thread is interrupted
    */
    List<String> getBucketNames() throws IOException, JetStreamApiException, InterruptedException;
    
    /**
    * Gets the info for an existing bucket.
    * @param bucketName the bucket name to use
    * @throws IOException covers various communication issues with the NATS
    *         server such as timeout or interruption
    * @throws JetStreamApiException the request had an error related to the data
    * @return the bucket status object
    */
    KeyValueStatus getBucketInfo(String bucketName) throws IOException, JetStreamApiException;
    
    /**
    * Deletes an existing bucket. Will throw a JetStreamApiException if the delete fails.
    * @param bucketName the stream name to use.
    * @throws IOException covers various communication issues with the NATS
    *         server such as timeout or interruption
    * @throws JetStreamApiException the request had an error related to the data
    */
    void delete(String bucketName) throws IOException, JetStreamApiException;
    // Get returns the latest value for the key.
    Get(key string) (entry KeyValueEntry, err error)
    // GetRevision returns a specific revision value for the key.
    GetRevision(key string, revision uint64) (entry KeyValueEntry, err error)
    /**
    * Get the entry for a key
    * @param key the key
    * @return the KvEntry object or null if not found.
    * @throws IOException covers various communication issues with the NATS
    *         server such as timeout or interruption
    * @throws JetStreamApiException the request had an error related to the data
    * @throws IllegalArgumentException the server is not JetStream enabled
    */
    KeyValueEntry get(String key) throws IOException, JetStreamApiException;
    
    /**
    * Get the specific revision of an entry for a key.
    * @param key the key
    * @param revision the revision
    * @return the KvEntry object or null if not found.
    * @throws IOException covers various communication issues with the NATS
    *         server such as timeout or interruption
    * @throws JetStreamApiException the request had an error related to the data
    * @throws IllegalArgumentException the server is not JetStream enabled
    */
    KeyValueEntry get(String key, long revision) throws IOException, JetStreamApiException;
      static async create(
        js: JetStreamClient,
        name: string,
        opts: Partial<KvOptions> = {},
      ): Promise<KV>
    
    static async bind(
        js: JetStreamClient,
        name: string,
        opts: Partial<{ codec: KvCodecs }> = {},
    ): Promise<KV>
    
    destroy(): Promise<boolean>
    # from the JetStreamContext
    
    async def key_value(self, bucket: str) -> KeyValue:
    
    async def create_key_value(
        self,
        config: Optional[api.KeyValueConfig] = None,
        **params,
    ) -> KeyValue:
        """
        create_key_value takes an api.KeyValueConfig and creates a KV in JetStream.
        """
        
    async def delete_key_value(self, bucket: str) -> bool:
        """
        delete_key_value deletes a JetStream KeyValue store by destroying
        the associated stream.
        """  
    // dotnet add package NATS.Net
    
    // Create a new Key Value Store or get an existing one
    ValueTask<INatsKVStore> CreateStoreAsync(string bucket, CancellationToken cancellationToken = default);
    
    // Get a list of bucket names
    IAsyncEnumerable<string> GetBucketNamesAsync(CancellationToken cancellationToken = default);
    
    // Gets the status for all buckets
    IAsyncEnumerable<NatsKVStatus> GetStatusesAsync(CancellationToken cancellationToken = default);
    
    // Delete a Key Value Store
    ValueTask<bool> DeleteStoreAsync(string bucket, CancellationToken cancellationToken = default);
    
    //
    NATS_EXTERN natsStatus 	kvConfig_Init (kvConfig *cfg)
     	Initializes a KeyValue configuration structure.
     
    NATS_EXTERN natsStatus 	js_CreateKeyValue (kvStore **new_kv, jsCtx *js, kvConfig *cfg)
     	Creates a KeyValue store with a given configuration.
     
    NATS_EXTERN natsStatus 	js_KeyValue (kvStore **new_kv, jsCtx *js, const char *bucket)
     	Looks-up and binds to an existing KeyValue store.
     
    NATS_EXTERN natsStatus 	js_DeleteKeyValue (jsCtx *js, const char *bucket)
     	Deletes a KeyValue store.
     
    NATS_EXTERN void 	kvStore_Destroy (kvStore *kv)
     	Destroys a KeyValue store object.
    async get(k: string): Promise<KvEntry | null>
    async def get(self, key: str) -> Entry:
       """
       get returns the latest value for the key.
       """
    // dotnet add package NATS.Net
    
    // Get an entry from the bucket using the key
    ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);
    
    //
    NATS_EXTERN natsStatus 	kvStore_Get (kvEntry **new_entry, kvStore *kv, const char *key)
     	Returns the latest entry for the key.
     
    NATS_EXTERN natsStatus 	kvStore_GetRevision (kvEntry **new_entry, kvStore *kv, const char *key, uint64_t revision)
     	Returns the entry at the specific revision for the key.
    Put(key string, value []byte) (revision uint64, err error)
    // PutString will place the string for the key into the store.
    PutString(key string, value string) (revision uint64, err error)
    // Create will add the key/value pair if it does not exist.
    Create(key string, value []byte) (revision uint64, err error)
    // Update will update the value if the latest revision matches.
    Update(key string, value []byte, last uint64) (revision uint64, err error)
    /**
     * Put a byte[] as the value for a key
     * @param key the key
     * @param value the bytes of the value
     * @return the revision number for the key
     * @throws IOException covers various communication issues with the NATS
     *         server such as timeout or interruption
     * @throws JetStreamApiException the request had an error related to the data
     * @throws IllegalArgumentException the server is not JetStream enabled
     */
    long put(String key, byte[] value) throws IOException, JetStreamApiException;
    
    /**
     * Put a string as the value for a key
     * @param key the key
     * @param value the UTF-8 string
     * @return the revision number for the key
     * @throws IOException covers various communication issues with the NATS
     *         server such as timeout or interruption
     * @throws JetStreamApiException the request had an error related to the data
     * @throws IllegalArgumentException the server is not JetStream enabled
     */
    long put(String key, String value) throws IOException, JetStreamApiException;
    
    /**
     * Put a long as the value for a key
     * @param key the key
     * @param value the number
     * @return the revision number for the key
     * @throws IOException covers various communication issues with the NATS
     *         server such as timeout or interruption
     * @throws JetStreamApiException the request had an error related to the data
     * @throws IllegalArgumentException the server is not JetStream enabled
     */
    long put(String key, Number value) throws IOException, JetStreamApiException;
    
    /**
     * Put as the value for a key iff the key does not exist (there is no history)
     * or is deleted (history shows the key is deleted)
     * @param key the key
     * @param value the bytes of the value
     * @return the revision number for the key
     * @throws IOException covers various communication issues with the NATS
     *         server such as timeout or interruption
     * @throws JetStreamApiException the request had an error related to the data
     * @throws IllegalArgumentException the server is not JetStream enabled
     */
    long create(String key, byte[] value) throws IOException, JetStreamApiException;
    
    /**
     * Put as the value for a key iff the key exists and its last revision matches the expected
     * @param key the key
     * @param value the bytes of the value
     * @param expectedRevision the expected last revision
     * @return the revision number for the key
     * @throws IOException covers various communication issues with the NATS
     *         server such as timeout or interruption
     * @throws JetStreamApiException the request had an error related to the data
     * @throws IllegalArgumentException the server is not JetStream enabled
     */
    long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException;
      async put(
        k: string,
        data: Uint8Array,
        opts: Partial<KvPutOptions> = {},
      ): Promise<number>
    
    create(k: string, data: Uint8Array): Promise<number>    
        
    update(k: string, data: Uint8Array, version: number): Promise<number>
    async def put(self, key: str, value: bytes) -> int:
        """
        put will place the new value for the key into the store
        and return the revision number.
        """
        
    async def update(self, key: str, value: bytes, last: int) -> int:
        """
        update will update the value iff the latest revision matches.
        """    
    // dotnet add package NATS.Net
    
    // Put a value into the bucket using the key
    // returns revision number
    ValueTask<ulong> PutAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);
    
    //
    NATS_EXTERN natsStatus 	kvStore_Put (uint64_t *rev, kvStore *kv, const char *key, const void *data, int len)
     	Places the new value for the key into the store.
     
    NATS_EXTERN natsStatus 	kvStore_PutString (uint64_t *rev, kvStore *kv, const char *key, const char *data)
     	Places the new value (as a string) for the key into the store.
     
    NATS_EXTERN natsStatus 	kvStore_Create (uint64_t *rev, kvStore *kv, const char *key, const void *data, int len)
     	Places the value for the key into the store if and only if the key does not exist.
     
    NATS_EXTERN natsStatus 	kvStore_CreateString (uint64_t *rev, kvStore *kv, const char *key, const char *data)
     	Places the value (as a string) for the key into the store if and only if the key does not exist.
     
    NATS_EXTERN natsStatus 	kvStore_Update (uint64_t *rev, kvStore *kv, const char *key, const void *data, int len, uint64_t last)
     	Updates the value for the key into the store if and only if the latest revision matches.
     
    NATS_EXTERN natsStatus 	kvStore_UpdateString (uint64_t *rev, kvStore *kv, const char *key, const char *data, uint64_t last)
     	Updates the value (as a string) for the key into the store if and only if the latest revision matches.
    // Delete will place a delete marker and leave all revisions.
    Delete(key string) error
    // Purge will place a delete marker and remove all previous revisions.
    Purge(key string) error
    /**
    * Soft deletes the key by placing a delete marker.
    * @param key the key
    * @throws IOException covers various communication issues with the NATS
    *         server such as timeout or interruption
    * @throws JetStreamApiException the request had an error related to the data
    */
    void delete(String key) throws IOException, JetStreamApiException;
    
    /**
    * Purge all values/history from the specific key
    * @param key the key
    * @throws IOException covers various communication issues with the NATS
    *         server such as timeout or interruption
    * @throws JetStreamApiException the request had an error related to the data
    */
    void purge(String key) throws IOException, JetStreamApiException;
    delete(k: string): Promise<void>
        
    purge(k: string): Promise<void>
    async def delete(self, key: str) -> bool:
        """
        delete will place a delete marker and remove all previous revisions.
        """
        
    async def purge(self, key: str) -> bool:
        """
        purge will remove the key and all revisions.
        """    
    // dotnet add package NATS.Net
    
    // Delete an entry from the bucket
    ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default);
    
    // Purge an entry from the bucket
    ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default);
    
    //
    NATS_EXTERN natsStatus 	kvStore_Delete (kvStore *kv, const char *key)
     	Deletes a key by placing a delete marker and leaving all revisions.
     
    NATS_EXTERN natsStatus 	kvStore_Purge (kvStore *kv, const char *key, kvPurgeOptions *opts)
     	Deletes a key by placing a purge marker and removing all revisions.
     	
    NATS_EXTERN natsStatus 	kvStore_PurgeDeletes (kvStore *kv, kvPurgeOptions *opts)
     	Purge and removes delete markers.
    // Keys will return all keys.
    Keys(opts ...WatchOpt) ([]string, error)
    /**
     * Get a list of the keys in a bucket.
     * @return List of keys
     * @throws IOException covers various communication issues with the NATS
     *         server such as timeout or interruption
     * @throws JetStreamApiException the request had an error related to the data
     * @throws InterruptedException if the thread is interrupted
     */
    List<String> keys() throws IOException, JetStreamApiException, InterruptedException;
    async keys(k = ">"): Promise<QueuedIterator<string>>
    // dotnet add package NATS.Net
    
    // Get all the keys in the bucket
    IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
    
    // Get a filtered set of keys in the bucket
    IAsyncEnumerable<string> GetKeysAsync(IEnumerable<string> filters, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
    
    //
    NATS_EXTERN natsStatus 	kvStore_Keys (kvKeysList *list, kvStore *kv, kvWatchOptions *opts)
     	Returns all keys in the bucket.
     
    NATS_EXTERN void 	kvKeysList_Destroy (kvKeysList *list)
     	Destroys this list of KeyValue store key strings.
    // History will return all historical values for the key.
    History(key string, opts ...WatchOpt) ([]KeyValueEntry, error)
    /**
     * Get the history (list of KeyValueEntry) for a key
     * @param key the key
     * @return List of KvEntry
     * @throws IOException covers various communication issues with the NATS
     *         server such as timeout or interruption
     * @throws JetStreamApiException the request had an error related to the data
     * @throws InterruptedException if the thread is interrupted
     */
    List<KeyValueEntry> history(String key) throws IOException, JetStreamApiException, InterruptedException;
    async history(
        opts: { key?: string; headers_only?: boolean } = {},
      ): Promise<QueuedIterator<KvEntry>>
    // dotnet add package NATS.Net
    
    // Get the history of an entry by key
    IAsyncEnumerable<NatsKVEntry<T>> HistoryAsync<T>(string key, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
    
    //
    NATS_EXTERN natsStatus 	kvStore_History (kvEntryList *list, kvStore *kv, const char *key, kvWatchOptions *opts)
     	Returns all historical entries for the key.
     
    NATS_EXTERN void 	kvEntryList_Destroy (kvEntryList *list)
     	Destroys this list of KeyValue store entries.
    // Watch for any updates to keys that match the keys argument which could include wildcards.
    // Watch will send a nil entry when it has received all initial values.
    Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
    // WatchAll will invoke the callback for all updates.
    WatchAll(opts ...WatchOpt) (KeyWatcher, error)
    /**
     * Watch updates for a specific key
     */
    NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException;
    
    /**
     * Watch updates for all keys
     */
    NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException;
      async watch(
        opts: {
          key?: string;
          headers_only?: boolean;
          initializedFn?: callbackFn;
        } = {},
      ): Promise<QueuedIterator<KvEntry>>
    // dotnet add package NATS.Net
    
    // Start a watcher for specific keys
    // Key to watch is subject-based and wildcards may be used
    IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(string key, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
    
    // Start a watcher for specific keys
    // Key to watch are subject-based and wildcards may be used
    IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(IEnumerable<string> keys, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
    
    // Start a watcher for all the keys in the bucket
    IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
    
    //
    NATS_EXTERN natsStatus 	kvStore_Watch (kvWatcher **new_watcher, kvStore *kv, const char *keys, kvWatchOptions *opts)
     	Returns a watcher for any updates to keys that match the keys argument.
     
    NATS_EXTERN natsStatus 	kvStore_WatchAll (kvWatcher **new_watcher, kvStore *kv, kvWatchOptions *opts)
     	Returns a watcher for any updates to any keys of the KeyValue store bucket.

    Consumer Details

    Consumers are how client applications get the messages stored in the streams. You can have many consumers on a single stream. Consumers are like a view on a stream, can filter messages and have some state (maintained by the servers) associated with them.

    Consumers can be 'durable' or 'ephemeral'.

    hashtag
    Durable versus ephemeral consumers

    Durable consumer persist message delivery progress on the server side. A durable consumer can be retrieved by name and shared between client instance for load balancing. It can be made highly available through replicas.

    An ephemeral consumer does not persist delivery progress and will automatically be deleted when there are no more client instances connected.

    hashtag
    Durable consumers

    Durable consumers are meant to be used by multiple instances of an application, either to distribute and scale out the processing, or to persist the position of the consumer over the stream between runs of an application.

    Durable consumers as the name implies are meant to last 'forever' and are typically created and deleted administratively rather than by the application code which only needs to specify the durable's well known name to use it.

    You create a durable consumer using the nats consumer add CLI tool command, or programmatically by passing a durable name option to the subscription creation call.

    hashtag
    Ephemeral consumers

    Ephemeral consumers are meant to be used by a single instance of an application (e.g. to get its own replay of the messages in the stream).

    Ephemeral consumers are not meant to last 'forever', they are defined automatically at subscription time by the client library and disappear after the application disconnect.

    You (automatically) create an ephemeral consumer when you call the js.Subscribe function without specifying the Durable or Bind subscription options. Calling Drain on that subscription automatically deletes the underlying ephemeral consumer. You can also explicitly create an ephemeral consumer by not passing a durable name option to the jsm.AddConsumer call.

    Ephemeral consumers otherwise have the same control over message acknowledged and re-delivery as durable consumers.

    hashtag
    Push and Pull consumers

    Clients implement two implementations of consumers identified as 'push' or 'pull'.

    hashtag
    Push consumers

    Push consumers receive messages on a specific subject where message flow is controlled by the server. Load balancing is supported through NATS core queue groups. The messages from the stream are distributed automatically between the subscribing clients to the push consumers.

    hashtag
    Pull consumers

    Pull consumers request messages explicitly from the server in batches, giving the client full control over dispatching, flow control, pending (unacknowledged) messages and load balancing. Pull consuming client make fetch() calls in a dispatch loop.

    circle-info

    We recommend using pull consumers for new projects. In particular when scalability, detailed flow control or error handling are a design focus. Most client API have been updated to provide convenient interfaces for consuming messages through callback handler or iterators without the need to manage message retrieval.

    fetch() calls can be immediate or have a defined timeout, allowing for either controlled (1 by 1) consumption or realtime delivery with minimal polling overhead.

    Pull consumers create less CPU load on the NATS servers and therefore scale better (note that the push consumers are still quite fast and scalable, you may only notice the difference between the two if you have sustained high message rates).

    hashtag
    Pull

    A push consumer can also be used in some other use cases such as without a queue group, or with no acknowledgement or cumulative acknowledgements.

    hashtag
    Push

    hashtag
    Ordered Consumers

    Ordered consumers are a convenient form of ephemeral push consumer for applications, that want to efficiently consume a stream for data inspection or analysis.

    The API consumer is guaranteed delivery of messages in sequence and without gaps.

    • Always ephemeral - minimal overhead for the server

    • Single threaded in sequence dispatching

    • Client checks message sequence and will prevent gaps in the delivery

    hashtag
    Delivery reliability

    JetStream consumers can ensure not just the reliability of message delivery but also the reliability of the processing of the messages, even in the face of client application or downstream failures. It does so by using message level acknowledgements and message re-deliveries.

    Consumers have an specifying the level of reliability required. In increasing order of reliability the available policies are: 'none' for no application level acknowledgements, 'all' where acknowledging a specific message also implicitly acknowledges all previous messages in the stream, and 'explicit' where each message must be individually acknowledged.

    When the consumer is set to require explicit acknowledgements the client applications are able to use more than one kind of to indicate successful (or not) reception and processing of the messages being received from the consumer.

    Applications can:

    • Acknowledge the successfull processing of a message (Ack()).

    • Acknowledge the successfull processing of a message and request an acknowledgement of the reception of the acknowledgement by the consumer (AckSync()).

    • Indicate that the processing is still in progress and more time is needed (

    After a message is sent from the consumer to a subscribing client application by the server an 'AckWait' timer is started. This timer is deleted when either a positive (Ack()) or a termination (Term()) acknowledgement is received from the client application. The timer gets reset upon reception of an in-progress (inProgress()) acknowledgement.

    If at the end of a period of time no acknowledgement has been received from the client application, the server will attempt to re-deliver the message. If there is more than one client application instance subscribing to the consumer, there is no guarantee that the re-delivery would be to any particular client instance.

    You can control the timing of re-deliveries using either the single AckWait duration attribute of the consumer, or as a sequence of durations in the BackOff attribute (which overrides AckWait).

    You can also control the timing of re-deliveries when messages are negatively acknowledged with Nak(), by passing a nakDelay() option (or using NakWithDelay()), otherwise the re-delivery attempt will happen right after the reception of the Nak by the server.

    hashtag
    "Dead Letter Queues" type functionality

    You can set a maximum number of delivery attempts using the consumer's MaxDeliver setting.

    Whenever a message reaches its maximum number of delivery attempts an advisory message is published on the $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.<STREAM>.<CONSUMER> subject. The advisory message's payload (use nats schema info io.nats.jetstream.advisory.v1.max_deliver for specific information) contains a stream_seq field that contains the sequence number of the message in the stream.

    Similarly, whenever a client application terminates delivery attempts for the message using AckTerm an advisory message is published on the $JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED.<STREAM>.<CONSUMER> subject, and its payload (see nats schema info io.nats.jetstream.advisory.v1.terminated) contains a stream_seq field.

    You can leverage those advisory messages to implement "Dead Letter Queue" (DLQ) types of functionalities. For example:

    • If you only need to know about each time a message is 'dead' (considered un-re-deliverable by the consumer), then listening to the advisories is enough.

    • If you also need to have access to the message in question then you can use the message's sequence number included in the advisory to retrieve that specific message by sequence number from the stream. If a message reaches its maximum level of delivery attempts, it will still stay in the stream until it is manually deleted or manually acknowledged.

    Can recover from server node failure and reconnect
  • Does not recover from client failure as it is ephemeral

  • inProgress()
    ).
  • Negatively acknowledge a message, indicating that the client application is currently (temporarily) unable to process the message and that the consumer should attempt to re-deliver it (Nak()).

  • Terminate a message (typically, because there is a problem with the data inside the message such that the client application is never going to be able to process it), indicating that the consumer should not attempt to re-deliver the message (Term()).

  • Acknowledgement Policy
    acknowledgement
    func ExampleJetStream() {
        nc, err := nats.Connect("localhost")
        if err != nil {
            log.Fatal(err)
        }
    
    	// Use the JetStream context to produce and consumer messages
    	// that have been persisted.
    	js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	js.AddStream(&nats.StreamConfig{
    		Name:     "FOO",
    		Subjects: []string{"foo"},
    	})
    
    	js.Publish("foo", []byte("Hello JS!"))
    
    	// Publish messages asynchronously.
    	for i := 0; i < 500; i++ {
    		js.PublishAsync("foo", []byte("Hello JS Async!"))
    	}
    	select {
    	case <-js.PublishAsyncComplete():
    	case <-time.After(5 * time.Second):
    		fmt.Println("Did not resolve in time")
    	}
    
    	// Create Pull based consumer with maximum 128 inflight.
    	sub, _ := js.PullSubscribe("foo", "wq", nats.PullMaxWaiting(128))
    
    	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    	defer cancel()
    
    	for {
    		select {
    		case <-ctx.Done():
    			return
    		default:
    		}
    
            // Fetch will return as soon as any message is available rather than wait until the full batch size is available, using a batch size of more than 1 allows for higher throughput when needed.
    		msgs, _ := sub.Fetch(10, nats.Context(ctx))
    		for _, msg := range msgs {
    			msg.Ack()
    		}
    	}
    }
    package io.nats.examples.jetstream.simple;
    
    import io.nats.client.*;
    import io.nats.client.api.ConsumerConfiguration;
    import io.nats.examples.jetstream.ResilientPublisher;
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.atomic.AtomicInteger;
    import static io.nats.examples.jetstream.NatsJsUtils.createOrReplaceStream;
    
    /**
    * This example will demonstrate simplified consume with a handler
    */
    public class MessageConsumerExample {
     private static final String STREAM = "consume-stream";
     private static final String SUBJECT = "consume-subject";
     private static final String CONSUMER_NAME = "consume-consumer";
     private static final String MESSAGE_PREFIX = "consume";
     private static final int STOP_COUNT = 500;
     private static final int REPORT_EVERY = 100;
     private static final String SERVER = "nats://localhost:4222";
    
     public static void main(String[] args) {
         Options options = Options.builder().server(SERVER).build();
         try (Connection nc = Nats.connect(options)) {
             JetStreamManagement jsm = nc.jetStreamManagement();
             createOrReplaceStream(jsm, STREAM, SUBJECT);
    
             //Utility for filling the stream with some messages
             System.out.println("Starting publish...");
             ResilientPublisher publisher = new ResilientPublisher(nc, jsm, STREAM, SUBJECT).basicDataPrefix(MESSAGE_PREFIX).jitter(10);
             Thread pubThread = new Thread(publisher);
             pubThread.start();
    
             // get stream context, create consumer and get the consumer context
             StreamContext streamContext;
             ConsumerContext consumerContext;
             CountDownLatch latch = new CountDownLatch(1);
             AtomicInteger atomicCount = new AtomicInteger();
             long start = System.nanoTime();
    
             streamContext = nc.getStreamContext(STREAM);
             streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
             consumerContext = streamContext.getConsumerContext(CONSUMER_NAME);
    
             MessageHandler handler = msg -> {
                 msg.ack();
                 int count = atomicCount.incrementAndGet();
                 if (count % REPORT_EVERY == 0) {
                	 System.out.println("Handler" + ": Received " + count + " messages in " + (System.nanoTime() - start) / 1_000_000 + "ms.");
                 }
                 if (count == STOP_COUNT) {
                     latch.countDown();
                 }
             };
    
         	 // create the consumer and install handler
         	 MessageConsumer consumer = consumerContext.consume(handler);
         	 //Waiting for the handler signalling us to stop
             latch.await();
             // When stop is called, no more pull requests will be made, but messages already requested
             // will still come across the wire to the client.
             System.out.println("Stopping the consumer...");
             consumer.stop();
             // wait until the consumer is finished processing backlog
             while (!consumer.isFinished()) {
                 Thread.sleep(10);
             }
             System.out.println("Final" + ": Received " + atomicCount.get() + " messages in " + (System.nanoTime() - start) / 1_000_000 + "ms.");
    
             publisher.stop(); // otherwise the ConsumerContext background thread will complain when the connection goes away
             pubThread.join();
         }
         catch (JetStreamApiException | IOException e) {
             // JetStreamApiException:
             //      1. the stream or consumer did not exist
             //      2. api calls under the covers theoretically this could fail, but practically it won't.
             // IOException:
             //      likely a connection problem
             System.err.println("Exception should not handled, exiting.");
             System.exit(-1);
         }
         catch (Exception e) {
             System.err.println("Exception should not handled, exiting.");
             System.exit(-1);
         }
     }
    }
    import { AckPolicy, connect, nanos } from "../../src/mod.ts";
    import { nuid } from "../../nats-base-client/nuid.ts";
    
    const nc = await connect();
    
    const stream = nuid.next();
    const subj = nuid.next();
    const durable = nuid.next();
    
    const jsm = await nc.jetstreamManager();
    await jsm.streams.add({ name: stream, subjects: [subj] });
    
    const js = nc.jetstream();
    await js.publish(subj);
    await js.publish(subj);
    await js.publish(subj);
    await js.publish(subj);
    
    const psub = await js.pullSubscribe(subj, {
      mack: true,
      // artificially low ack_wait, to show some messages
      // not getting acked being redelivered
      config: {
        durable_name: durable,
        ack_policy: AckPolicy.Explicit,
        ack_wait: nanos(4000),
      },
    });
    
    (async () => {
      for await (const m of psub) {
        console.log(
          `[${m.seq}] ${
            m.redelivered ? `- redelivery ${m.info.redeliveryCount}` : ""
          }`
        );
        if (m.seq % 2 === 0) {
          m.ack();
        }
      }
    })();
    
    const fn = () => {
      console.log("[PULL]");
      psub.pull({ batch: 1000, expires: 10000 });
    };
    
    // do the initial pull
    fn();
    // and now schedule a pull every so often
    const interval = setInterval(fn, 10000); // and repeat every 2s
    
    setTimeout(() => {
      clearInterval(interval);
      nc.drain();
    }, 20000);
    import asyncio
    
    import nats
    from nats.errors import TimeoutError
    
    async def main():
        nc = await nats.connect("localhost")
    
        # Create JetStream context.
        js = nc.jetstream()
    
        # Persist messages on 'foo's subject.
        await js.add_stream(name="sample-stream", subjects=["foo"])
    
        for i in range(0, 10):
            ack = await js.publish("foo", f"hello world: {i}".encode())
            print(ack)
    
        # Create pull based consumer on 'foo'.
        psub = await js.pull_subscribe("foo", "psub")
    
        # Fetch and ack messagess from consumer.
        for i in range(0, 10):
            msgs = await psub.fetch(1)
            for msg in msgs:
                print(msg)
    
        await nc.close()
    
    if __name__ == '__main__':
        asyncio.run(main())
    // dotnet add package NATS.Net
    using NATS.Net;
    using NATS.Client.JetStream;
    using NATS.Client.JetStream.Models;
    
    await using var client = new NatsClient();
    
    INatsJSContext js = client.CreateJetStreamContext();
    
    // Create a stream
    var streamConfig = new StreamConfig(name: "FOO", subjects: ["foo"]);
    await js.CreateStreamAsync(streamConfig);
    
    // Publish a message
    {
        PubAckResponse ack = await js.PublishAsync("foo", "Hello, JetStream!");
        ack.EnsureSuccess();
    }
    
    // Publish messages concurrently
    List<NatsJSPublishConcurrentFuture> futures = new();
    for (var i = 0; i < 500; i++)
    {
        NatsJSPublishConcurrentFuture future
            = await js.PublishConcurrentAsync("foo", "Hello, JetStream 1!");
        futures.Add(future);
    }
    
    foreach (var future in futures)
    {
        await using (future)
        {
            PubAckResponse ack = await future.GetResponseAsync();
            ack.EnsureSuccess();
        }
    }
    
    
    // Create a consumer with a maximum 128 inflight messages
    INatsJSConsumer consumer = await js.CreateConsumerAsync("FOO", new ConsumerConfig(name: "foo")
    {
        MaxWaiting = 128,
    });
    
    using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));
    
    while (cts.IsCancellationRequested == false)
    {
        var opts = new NatsJSFetchOpts { MaxMsgs = 10 };
        await foreach (NatsJSMsg<string> msg in consumer.FetchAsync<string>(opts, cancellationToken: cts.Token))
        {
            await msg.AckAsync(cancellationToken: cts.Token);
        }
    }
    #include "examples.h"
    
    static const char *usage = ""\
    "-gd            use global message delivery thread pool\n" \
    "-sync          receive synchronously (default is asynchronous)\n" \
    "-pull          use pull subscription\n" \
    "-fc            enable flow control\n" \
    "-count         number of expected messages\n";
    
    static void
    onMsg(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
    {
        if (print)
            printf("Received msg: %s - %.*s\n",
                   natsMsg_GetSubject(msg),
                   natsMsg_GetDataLength(msg),
                   natsMsg_GetData(msg));
    
        if (start == 0)
            start = nats_Now();
    
        // We should be using a mutex to protect those variables since
        // they are used from the subscription's delivery and the main
        // threads. For demo purposes, this is fine.
        if (++count == total)
            elapsed = nats_Now() - start;
    
        // Since this is auto-ack callback, we don't need to ack here.
        natsMsg_Destroy(msg);
    }
    
    static void
    asyncCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure)
    {
        printf("Async error: %u - %s\n", err, natsStatus_GetText(err));
    
        natsSubscription_GetDropped(sub, (int64_t*) &dropped);
    }
    
    int main(int argc, char **argv)
    {
        natsConnection      *conn  = NULL;
        natsStatistics      *stats = NULL;
        natsOptions         *opts  = NULL;
        natsSubscription    *sub   = NULL;
        natsMsg             *msg   = NULL;
        jsCtx               *js    = NULL;
        jsErrCode           jerr   = 0;
        jsOptions           jsOpts;
        jsSubOptions        so;
        natsStatus          s;
        bool                delStream = false;
    
        opts = parseArgs(argc, argv, usage);
    
        printf("Created %s subscription on '%s'.\n",
            (pull ? "pull" : (async ? "asynchronous" : "synchronous")), subj);
    
        s = natsOptions_SetErrorHandler(opts, asyncCb, NULL);
    
        if (s == NATS_OK)
            s = natsConnection_Connect(&conn, opts);
    
        if (s == NATS_OK)
            s = jsOptions_Init(&jsOpts);
    
        if (s == NATS_OK)
            s = jsSubOptions_Init(&so);
        if (s == NATS_OK)
        {
            so.Stream = stream;
            so.Consumer = durable;
            if (flowctrl)
            {
                so.Config.FlowControl = true;
                so.Config.Heartbeat = (int64_t)1E9;
            }
        }
    
        if (s == NATS_OK)
            s = natsConnection_JetStream(&js, conn, &jsOpts);
    
        if (s == NATS_OK)
        {
            jsStreamInfo    *si = NULL;
    
            // First check if the stream already exists.
            s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
            if (s == NATS_NOT_FOUND)
            {
                jsStreamConfig  cfg;
    
                // Since we are the one creating this stream, we can delete at the end.
                delStream = true;
    
                // Initialize the configuration structure.
                jsStreamConfig_Init(&cfg);
                cfg.Name = stream;
                // Set the subject
                cfg.Subjects = (const char*[1]){subj};
                cfg.SubjectsLen = 1;
                // Make it a memory stream.
                cfg.Storage = js_MemoryStorage;
                // Add the stream,
                s = js_AddStream(&si, js, &cfg, NULL, &jerr);
            }
            if (s == NATS_OK)
            {
                printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
                    si->Config->Name, si->State.Msgs, si->State.Bytes);
    
                // Need to destroy the returned stream object.
                jsStreamInfo_Destroy(si);
            }
        }
    
        if (s == NATS_OK)
        {
            if (pull)
                s = js_PullSubscribe(&sub, js, subj, durable, &jsOpts, &so, &jerr);
            else if (async)
                s = js_Subscribe(&sub, js, subj, onMsg, NULL, &jsOpts, &so, &jerr);
            else
                s = js_SubscribeSync(&sub, js, subj, &jsOpts, &so, &jerr);
        }
        if (s == NATS_OK)
            s = natsSubscription_SetPendingLimits(sub, -1, -1);
    
        if (s == NATS_OK)
            s = natsStatistics_Create(&stats);
    
        if ((s == NATS_OK) && pull)
        {
            natsMsgList list;
            int         i;
    
            for (count = 0; (s == NATS_OK) && (count < total); )
            {
                s = natsSubscription_Fetch(&list, sub, 1024, 5000, &jerr);
                if (s != NATS_OK)
                    break;
    
                if (start == 0)
                    start = nats_Now();
    
                count += (int64_t) list.Count;
                for (i=0; (s == NATS_OK) && (i<list.Count); i++)
                    s = natsMsg_Ack(list.Msgs[i], &jsOpts);
    
                natsMsgList_Destroy(&list);
            }
        }
        else if ((s == NATS_OK) && async)
        {
            while (s == NATS_OK)
            {
                if (count + dropped == total)
                    break;
    
                nats_Sleep(1000);
            }
        }
        else if (s == NATS_OK)
        {
            for (count = 0; (s == NATS_OK) && (count < total); count++)
            {
                s = natsSubscription_NextMsg(&msg, sub, 5000);
                if (s != NATS_OK)
                    break;
    
                if (start == 0)
                    start = nats_Now();
    
                s = natsMsg_Ack(msg, &jsOpts);
                natsMsg_Destroy(msg);
            }
        }
    
        if (s == NATS_OK)
        {
            printStats(STATS_IN|STATS_COUNT, conn, sub, stats);
            printPerf("Received");
        }
        if (s == NATS_OK)
        {
            jsStreamInfo *si = NULL;
    
            // Let's report some stats after the run
            s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
            if (s == NATS_OK)
            {
                printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
                    si->Config->Name, si->State.Msgs, si->State.Bytes);
    
                jsStreamInfo_Destroy(si);
            }
            if (delStream)
            {
                printf("\nDeleting stream %s: ", stream);
                s = js_DeleteStream(js, stream, NULL, &jerr);
                if (s == NATS_OK)
                    printf("OK!");
                printf("\n");
            }
        }
        else
        {
            printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
            nats_PrintLastErrorStack(stderr);
        }
    
        // Destroy all our objects to avoid report of memory leak
        jsCtx_Destroy(js);
        natsStatistics_Destroy(stats);
        natsSubscription_Destroy(sub);
        natsConnection_Destroy(conn);
        natsOptions_Destroy(opts);
    
        // To silence reports of memory still in used with valgrind
        nats_Close();
    
        return 0;
    }
    func ExampleJetStream() {
    	nc, err := nats.Connect("localhost")
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	// Use the JetStream context to produce and consumer messages
    	// that have been persisted.
    	js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	js.AddStream(&nats.StreamConfig{
    		Name:     "FOO",
    		Subjects: []string{"foo"},
    	})
    
    	js.Publish("foo", []byte("Hello JS!"))
    
    	// Publish messages asynchronously.
    	for i := 0; i < 500; i++ {
    		js.PublishAsync("foo", []byte("Hello JS Async!"))
    	}
    	select {
    	case <-js.PublishAsyncComplete():
    	case <-time.After(5 * time.Second):
    		fmt.Println("Did not resolve in time")
    	}
    
    	// Create async consumer on subject 'foo'. Async subscribers
    	// ack a message once exiting the callback.
    	js.Subscribe("foo", func(msg *nats.Msg) {
    		meta, _ := msg.Metadata()
    		fmt.Printf("Stream Sequence  : %v\n", meta.Sequence.Stream)
    		fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer)
    	})
    
    	// Async subscriber with manual acks.
    	js.Subscribe("foo", func(msg *nats.Msg) {
    		msg.Ack()
    	}, nats.ManualAck())
    
    	// Async queue subscription where members load balance the
    	// received messages together.
    	// If no consumer name is specified, either with nats.Bind()
    	// or nats.Durable() options, the queue name is used as the
    	// durable name (that is, as if you were passing the
    	// nats.Durable(<queue group name>) option.
    	// It is recommended to use nats.Bind() or nats.Durable()
    	// and preferably create the JetStream consumer beforehand
    	// (using js.AddConsumer) so that the JS consumer is not
    	// deleted on an Unsubscribe() or Drain() when the member
    	// that created the consumer goes away first.
    	// Check Godoc for the QueueSubscribe() API for more details.
    	js.QueueSubscribe("foo", "group", func(msg *nats.Msg) {
    		msg.Ack()
    	}, nats.ManualAck())
    
    	// Subscriber to consume messages synchronously.
    	sub, _ := js.SubscribeSync("foo")
    	msg, _ := sub.NextMsg(2 * time.Second)
    	msg.Ack()
    
    	// We can add a member to the group, with this member using
    	// the synchronous version of the QueueSubscribe.
    	sub, _ = js.QueueSubscribeSync("foo", "group")
    	msg, _ = sub.NextMsg(2 * time.Second)
    	msg.Ack()
    
    	// ChanSubscribe
    	msgCh := make(chan *nats.Msg, 8192)
    	sub, _ = js.ChanSubscribe("foo", msgCh)
    
    	select {
    	case msg := <-msgCh:
    		fmt.Println("[Received]", msg)
    	case <-time.After(1 * time.Second):
    	}
    }
    package io.nats.examples.jetstream;
    
    import io.nats.client.*;
    import io.nats.client.api.PublishAck;
    import io.nats.examples.ExampleArgs;
    import io.nats.examples.ExampleUtils;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import static io.nats.examples.jetstream.NatsJsUtils.createStreamExitWhenExists;
    
    /**
     * This example will demonstrate JetStream push subscribing using a durable consumer and a queue
     */
    public class NatsJsPushSubQueueDurable {
        static final String usageString =
            "\nUsage: java -cp <classpath> NatsJsPushSubQueueDurable [-s server] [-strm stream] [-sub subject] [-q queue] [-dur durable] [-mcnt msgCount] [-scnt subCount]"
                + "\n\nDefault Values:"
                + "\n   [-strm stream]   qdur-stream"
                + "\n   [-sub subject]   qdur-subject"
                + "\n   [-q queue]       qdur-queue"
                + "\n   [-dur durable]   qdur-durable"
                + "\n   [-mcnt msgCount] 100"
                + "\n   [-scnt subCount] 5"
                + "\n\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n"
                + "\nSet the environment variable NATS_NKEY to use challenge response authentication by setting a file containing your private key.\n"
                + "\nSet the environment variable NATS_CREDS to use JWT/NKey authentication by setting a file containing your user creds.\n"
                + "\nUse the URL in the -s server parameter for user/pass/token authentication.\n";
    
        public static void main(String[] args) {
            ExampleArgs exArgs = ExampleArgs.builder("Push Subscribe, Durable Consumer, Queue", args, usageString)
                    .defaultStream("qdur-stream")
                    .defaultSubject("qdur-subject")
                    .defaultQueue("qdur-queue")
                    .defaultDurable("qdur-durable")
                    .defaultMsgCount(100)
                    .defaultSubCount(5)
                    .build();
    
            try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions(exArgs.server, true))) {
    
                // Create a JetStreamManagement context.
                JetStreamManagement jsm = nc.jetStreamManagement();
    
                // Use the utility to create a stream stored in memory.
                createStreamExitWhenExists(jsm, exArgs.stream, exArgs.subject);
    
                // Create our JetStream context
                JetStream js = nc.jetStream();
    
                System.out.println();
    
                // Setup the subscribers
                // - the PushSubscribeOptions can be re-used since all the subscribers are the same
                // - use a concurrent integer to track all the messages received
                // - have a list of subscribers and threads so I can track them
                PushSubscribeOptions pso = PushSubscribeOptions.builder().durable(exArgs.durable).build();
                AtomicInteger allReceived = new AtomicInteger();
                List<JsQueueSubscriber> subscribers = new ArrayList<>();
                List<Thread> subThreads = new ArrayList<>();
                for (int id = 1; id <= exArgs.subCount; id++) {
                    // setup the subscription
                    JetStreamSubscription sub = js.subscribe(exArgs.subject, exArgs.queue, pso);
                    // create and track the runnable
                    JsQueueSubscriber qs = new JsQueueSubscriber(id, exArgs, js, sub, allReceived);
                    subscribers.add(qs);
                    // create, track and start the thread
                    Thread t = new Thread(qs);
                    subThreads.add(t);
                    t.start();
                }
                nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server
    
                // create and start the publishing
                Thread pubThread = new Thread(new JsPublisher(js, exArgs));
                pubThread.start();
    
                // wait for all threads to finish
                pubThread.join();
                for (Thread t : subThreads) {
                    t.join();
                }
    
                // report
                for (JsQueueSubscriber qs : subscribers) {
                    qs.report();
                }
    
                System.out.println();
    
                // delete the stream since we are done with it.
                jsm.deleteStream(exArgs.stream);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        static class JsPublisher implements Runnable {
            JetStream js;
            ExampleArgs exArgs;
    
            public JsPublisher(JetStream js, ExampleArgs exArgs) {
                this.js = js;
                this.exArgs = exArgs;
            }
    
            @Override
            public void run() {
                for (int x = 1; x <= exArgs.msgCount; x++) {
                    try {
                        PublishAck pa = js.publish(exArgs.subject, ("Data # " + x).getBytes(StandardCharsets.US_ASCII));
                    } catch (IOException | JetStreamApiException e) {
                        // something pretty wrong here
                        e.printStackTrace();
                        System.exit(-1);
                    }
                }
            }
        }
    
        static class JsQueueSubscriber implements Runnable {
            int id;
            int thisReceived;
            List<String> datas;
    
            ExampleArgs exArgs;
            JetStream js;
            JetStreamSubscription sub;
            AtomicInteger allReceived;
    
            public JsQueueSubscriber(int id, ExampleArgs exArgs, JetStream js, JetStreamSubscription sub, AtomicInteger allReceived) {
                this.id = id;
                thisReceived = 0;
                datas = new ArrayList<>();
                this.exArgs = exArgs;
                this.js = js;
                this.sub = sub;
                this.allReceived = allReceived;
            }
    
            public void report() {
                System.out.printf("Sub # %d handled %d messages.\n", id, thisReceived);
            }
    
            @Override
            public void run() {
                while (allReceived.get() < exArgs.msgCount) {
                    try {
                        Message msg = sub.nextMessage(Duration.ofMillis(500));
                        while (msg != null) {
                            thisReceived++;
                            allReceived.incrementAndGet();
                            String data = new String(msg.getData(), StandardCharsets.US_ASCII);
                            datas.add(data);
                            System.out.printf("QS # %d message # %d %s\n", id, thisReceived, data);
                            msg.ack();
    
                            msg = sub.nextMessage(Duration.ofMillis(500));
                        }
                    } catch (InterruptedException e) {
                        // just try again
                    }
                }
                System.out.printf("QS # %d completed.\n", id);
            }
        }
    }
    import { AckPolicy, connect } from "../../src/mod.ts";
    import { nuid } from "../../nats-base-client/nuid.ts";
    
    const nc = await connect();
    
    // create a regular subscription - this is plain nats
    const sub = nc.subscribe("my.messages", { max: 5 });
    const done = (async () => {
      for await (const m of sub) {
        console.log(m.subject);
        m.respond();
      }
    })();
    
    const jsm = await nc.jetstreamManager();
    const stream = nuid.next();
    const subj = nuid.next();
    await jsm.streams.add({ name: stream, subjects: [`${subj}.>`] });
    
    // create a consumer that delivers to the subscription
    await jsm.consumers.add(stream, {
      ack_policy: AckPolicy.Explicit,
      deliver_subject: "my.messages",
    });
    
    // publish some old nats messages
    nc.publish(`${subj}.A`);
    nc.publish(`${subj}.B`);
    nc.publish(`${subj}.C`);
    nc.publish(`${subj}.D.A`);
    nc.publish(`${subj}.F.A.B`);
    
    await done;
    await nc.close();
    import asyncio
    
    import nats
    from nats.errors import TimeoutError
    
    
    async def main():
        nc = await nats.connect("localhost")
    
        # Create JetStream context.
        js = nc.jetstream()
    
        # Persist messages on 'foo's subject.
        await js.add_stream(name="sample-stream", subjects=["foo"])
    
        for i in range(0, 10):
            ack = await js.publish("foo", f"hello world: {i}".encode())
            print(ack)
    
        # Create pull based consumer on 'foo'.
        psub = await js.pull_subscribe("foo", "psub")
    
        # Fetch and ack messagess from consumer.
        for i in range(0, 10):
            msgs = await psub.fetch(1)
            for msg in msgs:
                print(msg)
    
        # Create single push based subscriber that is durable across restarts.
        sub = await js.subscribe("foo", durable="myapp")
        msg = await sub.next_msg()
        await msg.ack()
    
        # Create deliver group that will be have load balanced messages.
        async def qsub_a(msg):
            print("QSUB A:", msg)
            await msg.ack()
    
        async def qsub_b(msg):
            print("QSUB B:", msg)
            await msg.ack()
        await js.subscribe("foo", "workers", cb=qsub_a)
        await js.subscribe("foo", "workers", cb=qsub_b)
    
        for i in range(0, 10):
            ack = await js.publish("foo", f"hello world: {i}".encode())
            print("\t", ack)
    
        await nc.close()
    
    if __name__ == '__main__':
        asyncio.run(main())
    // NATS .NET doesn't publicly support push consumers and treats all consumers
    // as just consumers. The mecahnics of the consuming messages are abstracted
    // away from the applications and are handled by the library.
    #include "examples.h"
    
    static const char *usage = ""\
    "-gd            use global message delivery thread pool\n" \
    "-sync          receive synchronously (default is asynchronous)\n" \
    "-pull          use pull subscription\n" \
    "-fc            enable flow control\n" \
    "-count         number of expected messages\n";
    
    static void
    onMsg(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
    {
        if (print)
            printf("Received msg: %s - %.*s\n",
                   natsMsg_GetSubject(msg),
                   natsMsg_GetDataLength(msg),
                   natsMsg_GetData(msg));
    
        if (start == 0)
            start = nats_Now();
    
        // We should be using a mutex to protect those variables since
        // they are used from the subscription's delivery and the main
        // threads. For demo purposes, this is fine.
        if (++count == total)
            elapsed = nats_Now() - start;
    
        // Since this is auto-ack callback, we don't need to ack here.
        natsMsg_Destroy(msg);
    }
    
    static void
    asyncCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure)
    {
        printf("Async error: %u - %s\n", err, natsStatus_GetText(err));
    
        natsSubscription_GetDropped(sub, (int64_t*) &dropped);
    }
    
    int main(int argc, char **argv)
    {
        natsConnection      *conn  = NULL;
        natsStatistics      *stats = NULL;
        natsOptions         *opts  = NULL;
        natsSubscription    *sub   = NULL;
        natsMsg             *msg   = NULL;
        jsCtx               *js    = NULL;
        jsErrCode           jerr   = 0;
        jsOptions           jsOpts;
        jsSubOptions        so;
        natsStatus          s;
        bool                delStream = false;
    
        opts = parseArgs(argc, argv, usage);
    
        printf("Created %s subscription on '%s'.\n",
            (pull ? "pull" : (async ? "asynchronous" : "synchronous")), subj);
    
        s = natsOptions_SetErrorHandler(opts, asyncCb, NULL);
    
        if (s == NATS_OK)
            s = natsConnection_Connect(&conn, opts);
    
        if (s == NATS_OK)
            s = jsOptions_Init(&jsOpts);
    
        if (s == NATS_OK)
            s = jsSubOptions_Init(&so);
        if (s == NATS_OK)
        {
            so.Stream = stream;
            so.Consumer = durable;
            if (flowctrl)
            {
                so.Config.FlowControl = true;
                so.Config.Heartbeat = (int64_t)1E9;
            }
        }
    
        if (s == NATS_OK)
            s = natsConnection_JetStream(&js, conn, &jsOpts);
    
        if (s == NATS_OK)
        {
            jsStreamInfo    *si = NULL;
    
            // First check if the stream already exists.
            s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
            if (s == NATS_NOT_FOUND)
            {
                jsStreamConfig  cfg;
    
                // Since we are the one creating this stream, we can delete at the end.
                delStream = true;
    
                // Initialize the configuration structure.
                jsStreamConfig_Init(&cfg);
                cfg.Name = stream;
                // Set the subject
                cfg.Subjects = (const char*[1]){subj};
                cfg.SubjectsLen = 1;
                // Make it a memory stream.
                cfg.Storage = js_MemoryStorage;
                // Add the stream,
                s = js_AddStream(&si, js, &cfg, NULL, &jerr);
            }
            if (s == NATS_OK)
            {
                printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
                    si->Config->Name, si->State.Msgs, si->State.Bytes);
    
                // Need to destroy the returned stream object.
                jsStreamInfo_Destroy(si);
            }
        }
    
        if (s == NATS_OK)
        {
            if (pull)
                s = js_PullSubscribe(&sub, js, subj, durable, &jsOpts, &so, &jerr);
            else if (async)
                s = js_Subscribe(&sub, js, subj, onMsg, NULL, &jsOpts, &so, &jerr);
            else
                s = js_SubscribeSync(&sub, js, subj, &jsOpts, &so, &jerr);
        }
        if (s == NATS_OK)
            s = natsSubscription_SetPendingLimits(sub, -1, -1);
    
        if (s == NATS_OK)
            s = natsStatistics_Create(&stats);
    
        if ((s == NATS_OK) && pull)
        {
            natsMsgList list;
            int         i;
    
            for (count = 0; (s == NATS_OK) && (count < total); )
            {
                s = natsSubscription_Fetch(&list, sub, 1024, 5000, &jerr);
                if (s != NATS_OK)
                    break;
    
                if (start == 0)
                    start = nats_Now();
    
                count += (int64_t) list.Count;
                for (i=0; (s == NATS_OK) && (i<list.Count); i++)
                    s = natsMsg_Ack(list.Msgs[i], &jsOpts);
    
                natsMsgList_Destroy(&list);
            }
        }
        else if ((s == NATS_OK) && async)
        {
            while (s == NATS_OK)
            {
                if (count + dropped == total)
                    break;
    
                nats_Sleep(1000);
            }
        }
        else if (s == NATS_OK)
        {
            for (count = 0; (s == NATS_OK) && (count < total); count++)
            {
                s = natsSubscription_NextMsg(&msg, sub, 5000);
                if (s != NATS_OK)
                    break;
    
                if (start == 0)
                    start = nats_Now();
    
                s = natsMsg_Ack(msg, &jsOpts);
                natsMsg_Destroy(msg);
            }
        }
    
        if (s == NATS_OK)
        {
            printStats(STATS_IN|STATS_COUNT, conn, sub, stats);
            printPerf("Received");
        }
        if (s == NATS_OK)
        {
            jsStreamInfo *si = NULL;
    
            // Let's report some stats after the run
            s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
            if (s == NATS_OK)
            {
                printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
                    si->Config->Name, si->State.Msgs, si->State.Bytes);
    
                jsStreamInfo_Destroy(si);
            }
            if (delStream)
            {
                printf("\nDeleting stream %s: ", stream);
                s = js_DeleteStream(js, stream, NULL, &jerr);
                if (s == NATS_OK)
                    printf("OK!");
                printf("\n");
            }
        }
        else
        {
            printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
            nats_PrintLastErrorStack(stderr);
        }
    
        // Destroy all our objects to avoid report of memory leak
        jsCtx_Destroy(js);
        natsStatistics_Destroy(stats);
        natsSubscription_Destroy(sub);
        natsConnection_Destroy(conn);
        natsOptions_Destroy(opts);
    
        // To silence reports of memory still in used with valgrind
        nats_Close();.
    
        return 0;
    }
    func ExampleJetStream() {
    	nc, err := nats.Connect("localhost")
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	// Use the JetStream context to produce and consumer messages
    	// that have been persisted.
    	js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	js.AddStream(&nats.StreamConfig{
    		Name:     "FOO",
    		Subjects: []string{"foo"},
    	})
    
    	js.Publish("foo", []byte("Hello JS!"))
    
    	// ordered push consumer
    	js.Subscribe("foo", func(msg *nats.Msg) {
    		meta, _ := msg.Metadata()
    		fmt.Printf("Stream Sequence  : %v\n", meta.Sequence.Stream)
    		fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer)
    	}, nats.OrderedConsumer())
    }
    package io.nats.examples.jetstream;
    
    import io.nats.client.*;
    import io.nats.client.api.PublishAck;
    import io.nats.client.impl.NatsMessage;
    import io.nats.examples.ExampleArgs;
    import io.nats.examples.ExampleUtils;
    
    import java.nio.charset.StandardCharsets;
    import java.time.Duration;
    import java.time.temporal.TemporalUnit;
    
    public class myExample {
     public static void main(String[] args) {
      final String subject = "foo";
    
      try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions("localhost"))) {
    
       // Create a JetStream context.  This hangs off the original connection
       // allowing us to produce data to streams and consume data from
       // JetStream consumers.
       JetStream js = nc.jetStream();
    
       // This example assumes there is a stream already created on subject "foo" and some messages already stored in that stream
    
       // create our message handler.
       MessageHandler handler = msg -> {
    
        System.out.println("\nMessage Received:");
    
        if (msg.hasHeaders()) {
         System.out.println("  Headers:");
         for (String key : msg.getHeaders().keySet()) {
          for (String value : msg.getHeaders().get(key)) {
           System.out.printf("    %s: %s\n", key, value);
          }
         }
        }
    
        System.out.printf("  Subject: %s\n  Data: %s\n",
                msg.getSubject(), new String(msg.getData(), StandardCharsets.UTF_8));
        System.out.println("  " + msg.metaData());
       };
    
       Dispatcher dispatcher = nc.createDispatcher();
       PushSubscribeOptions pso = PushSubscribeOptions.builder().ordered(true).build();
       JetStreamSubscription sub = js.subscribe(subject, dispatcher, handler, false, pso);
    
       Thread.sleep(100);
    
       sub.drain(Duration.ofMillis(100));
    
       nc.drain(Duration.ofMillis(100));
      }
      catch(Exception e)
      {
       e.printStackTrace();
      }
     }
    }
    import { connect, consumerOpts } from "../../src/mod.ts";
    
    const nc = await connect();
    const js = nc.jetstream();
    
    // note the consumer is not a durable - so when after the
    // subscription ends, the server will auto destroy the
    // consumer
    const opts = consumerOpts();
    opts.manualAck();
    opts.maxMessages(2);
    opts.deliverTo("xxx");
    const sub = await js.subscribe("a.>", opts);
    await (async () => {
      for await (const m of sub) {
        console.log(m.seq, m.subject);
        m.ack();
      }
    })();
    
    await nc.close();
    import asyncio
    
    import nats
    from nats.errors import TimeoutError
    
    
    async def main():
        nc = await nats.connect("localhost")
    
        # Create JetStream context.
        js = nc.jetstream()
    
        # Create ordered consumer with flow control and heartbeats
        # that auto resumes on failures.
        osub = await js.subscribe("foo", ordered_consumer=True)
        data = bytearray()
    
        while True:
            try:
                msg = await osub.next_msg()
                data.extend(msg.data)
            except TimeoutError:
                break
        print("All data in stream:", len(data))
    
        await nc.close()
    
    if __name__ == '__main__':
        asyncio.run(main())
    // dotnet add package NATS.Net
    using NATS.Net;
    using NATS.Client.JetStream;
    using NATS.Client.JetStream.Models;
    
    await using var client = new NatsClient();
    
    INatsJSContext js = client.CreateJetStreamContext();
    
    var streamConfig = new StreamConfig(name: "FOO", subjects: ["foo"]);
    await js.CreateStreamAsync(streamConfig);
    
    PubAckResponse ack = await js.PublishAsync("foo", "Hello, JetStream!");
    ack.EnsureSuccess();
    
    INatsJSConsumer orderedConsumer = await js.CreateOrderedConsumerAsync("FOO");
    
    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
    
    await foreach (NatsJSMsg<string> msg in orderedConsumer.ConsumeAsync<string>(cancellationToken: cts.Token))
    {
        NatsJSMsgMetadata? meta = msg.Metadata;
        Console.WriteLine($"Stream Sequence  : {meta?.Sequence.Stream}");
        Console.WriteLine($"Consumer Sequence: {meta?.Sequence.Consumer}");
    }