Deciding to use streaming and higher qualities of service
In modern systems, applications can expose services or produce and consume data streams. A basic aspect of publish-subscribe messaging is temporal coupling: the subscribers need to be up and running to receive the message when it is published. At a high level, if observability is required, applications need to consume messages in the future, need to consume at their own pace, or need all messages, then JetStream's streaming functionalities provide the temporal de-coupling between publishers and consumers.
Using streaming and its associated higher qualities of service is the facet of messaging with the highest cost in terms of compute and storage.
When to use streaming
Streaming is ideal when:
Data producers and consumers are highly decoupled. They may be online at different times and consumers must receive messages.
A historical record of the data in the stream is required. This is when a replay of data is required by a consumer.
The last message on a stream is required for initialization and the producer may be offline.
Note that no assumptions should ever be made of who will receive and process data in the future, or for what purpose.
When to use Core NATS
Using core NATS is ideal as the fast request path for scalable services where there is tolerance for message loss or when applications themselves handle message delivery guarantees.
These include:
Service patterns where there is a tightly coupled request-reply
A request is made, and the application handles error cases upon timeout
(resends, errors, etc). __Relying on a messaging system to resend here is
considered an anti-pattern.__
JetStream functionality overview
Streams
You can use 'Add Stream' to idempotently define streams and their attributes (i.e. source subjects, retention and storage policies, limits)
You can use 'Purge' to purge the messages in a stream
You can use 'Delete' to delete a stream
Publish to a stream
There is interoperability between 'Core NATS' and JetStream in the fact that the streams are listening to core NATS messages. However you will notice that the NATS client libraries' JetStream calls include some 'Publish' calls and so may be wondering what is the difference between a 'Core NATS Publish' and a 'JetStream Publish'.
So yes, when a 'Core NATS' application publishes a message on a Stream's subject, that message will indeed get stored in the stream, but that's not really the intent as you are then publishing with the lower quality of service provided by Core NATS. So, while it will definitely work to just use the Core NATS Publish call to publish to a stream, look at it more as a convenience that you can use to help ease the migration of your applications to use streaming rather the desired end state or ideal design.
Instead, it is better for applications to use the JetStream Publish calls (which Core NATS subscribers not using Streams will still receive like any other publication) when publishing to a stream as:
JetStream publish calls are acknowledged by the JetStream enabled servers, which allows for the following higher qualities of service
If the publisher receives the acknowledgement from the server it can safely discard any state it has for that publication, the message has not only been received correctly by the server, but it has also been successfully persisted.
Whether you use the synchronous or the asynchronous JetStream publish calls, there is an implied flow control between the publisher and the JetStream infrastructure.
See Also
Create a consumer
are 'views' into a stream, with their own cursor. They are how client applications get messages from a stream (i.e. 'replayed') for processing or consumption. They can filter messages in the stream according to a 'filtering subject' and define which part of the stream is replayed according to a 'replay policy'.
You can create push or pull consumers:
Push consumers (specifically ordered push consumers) are the best way for an application to receive its own complete copy of the selected messages in the stream.
Pull consumers are the best way to scale horizontally the processing (or consuming) of the selected messages in the stream using multiple client applications sharing the same pull consumer, and allow for the processing of messages in batches.
Consumers can be ephemeral or durable, and support different sets of acknowledgement policies; none, this sequence number, this sequence number and all before it.
Replay policy
You select which of the messages in the stream you want to have delivered to your consumer
all
from a sequence number
from a point in time
And you can select the replay speed to be instant or to match the initial publication rate into the stream
Subscribe from a consumer
Client applications 'subscribe' from consumers using the JetStream's Subscribe, QueueSubscribe or PullSubscribe (and variations) calls. Note that since the initial release of JetStream, clients have developed a more ergonomic API to work with to process messages.
Acknowledging messages
Some consumers require the client application code to acknowledge the processing or consumption of the message, but there is more than one way to acknowledge (or not) a message
Ack Acknowledges a message was completely handled
Nak Signals that the message will not be processed now and processing can move onto the next message, NAK'd message will be retried
InProgress
See Also
Java
A-priori knowledge of consumers is not available, but consumers must receive messages. This is often a false assumption.
The data in messages being sent have a lifespan beyond that of the intended application lifespan.
Applications need to consume data at their own pace.
You want decoupled flow control between the publishers and the consumers of the stream
You need 'exactly once' quality of service with de-duplication of publications and double-acknowledged consumption
Where only the last message received is important and new messages will be received frequently enough for applications to tolerate a lost message. This might be a stock ticker stream, frequent exchange of messages in a service control plane, or device telemetry.
Message TTL is low, where the value of the data being transmitted degrades or expires quickly.
The expected consumer set for a message is available a-priori and consumers are expected to be live. The request-reply pattern works well here or consumers can send an application level acknowledgement.
Control plane messages.
You can have 'exactly-once' quality of service by the JetStream publishing application inserting a unique publication ID in a header field of the message.
the last message
the last message(s) for all the subject(s) in the stream
When sent before the AckWait period indicates that work is ongoing and the period should be extended by another equal to
AckWait
Term Instructs the server to stop redelivery of a message without acknowledging it as successfully processed
Streams store data on disk, but we cannot store all data forever, so we need ways to control their size automatically.
There are 3 features that come into play when Streams decide how long they store data.
The Retention Policy describes based on what criteria a set will evict messages from its storage:
Retention Policy
Description
In all Retention Policies the basic limits apply as upper bounds, these are MaxMsgs for how many messages are kept in total, MaxBytes for how big the set can be in total and MaxAge for what is the oldest message that will be kept. These are the only limits in play with LimitsPolicy retention.
One can then define additional ways a message may be removed from the Stream earlier than these limits. In WorkQueuePolicy the messages will be removed as soon as the Consumer received an Acknowledgement. In InterestPolicy messages will be removed as soon as all Consumers of the stream for that subject have received an Acknowledgement for the message.
In both WorkQueuePolicy and InterestPolicy the age, size and count limits will still apply as upper bounds.
A final control is the Maximum Size any single message may have. NATS have it's own limit for maximum size (1 MiB by default), but you can say a Stream will only accept messages up to 1024 bytes using MaxMsgSize.
The Discard Policy sets how messages are discarded when limits set by LimitsPolicy are reached. The DiscardOld option removes old messages making space for new, while DiscardNew refuses any new messages.
The WorkQueuePolicy mode is a specialized mode where a message, once consumed and acknowledged, is removed from the Stream.
Message Deduplication
JetStream support idempotent message writes by ignoring duplicate messages as indicated by the Nats-Msg-Id header.
Here we set a Nats-Msg-Id:1 header which tells JetStream to ensure we do not have duplicates of this message - we only consult the message ID not the body.
and in the output you can see that the duplicate publications were detected and only one message (the first one) is actually stored in the stream
The default window to track duplicates in is 2 minutes, this can be set on the command line using --dupe-window when creating a stream, though we would caution against large windows.
Acknowledgement Models
Streams support acknowledging receiving a message, if you send a Request() to a subject covered by the configuration of the Stream the service will reply to you once it stored the message. If you just publish, it will not. A Stream can be set to disable Acknowledgements by setting NoAck to true in it's configuration.
Consumers have 3 acknowledgement modes:
Mode
Description
To understand how Consumers track messages we will start with a clean ORDERS Stream and DISPATCH Consumer.
The Set is entirely empty
The Consumer has no messages outstanding and has never had any (Consumer sequence is 1).
We publish one message to the Stream and see that the Stream received it:
As the Consumer is pull-based, we can fetch the message, ack it, and check the Consumer state:
The message got delivered and acknowledged - Acknowledgement floor is 1 and 1, the sequence of the Consumer is 2 which means its had only the one message through and got acked. Since it was acked, nothing is pending or redelivering.
We'll publish another message, fetch it but not Ack it this time and see the status:
Get the next message from the consumer (but do not acknowledge it)
Show the consumer info
Now we can see the Consumer has processed 2 messages (obs sequence is 3, next message will be 3) but the Ack floor is still 1 - thus 1 message is pending acknowledgement. Indeed this is confirmed in the Pending messages.
If I fetch it again and again do not ack it:
Show the consumer info again
The Consumer sequence increases - each delivery attempt increases the sequence - and our redelivered count also goes up.
Finally, if I then fetch it again and ack it this time:
Show the consumer info
Having now Acked the message there are no more pending.
Additionally, there are a few types of acknowledgements:
Type
Bytes
Description
So far all of the examples were the AckAck type of acknowledgement, by replying to the Ack with the body as indicated in Bytes you can pick what mode of acknowledgement you want. Note that this description is documenting the internal JetStream protocol. Client libraries offer APIs for performing all the above acknowledgments using specific APIs where you don't worry about the internal protocol payloads.
All of these acknowledgement modes, except AckNext, support double acknowledgement - if you set a reply subject when acknowledging the server will in turn acknowledge having received your ACK.
The +NXT acknowledgement can have a few formats: +NXT 10 requests 10 messages and +NXT {"no_wait": true} which is the same data that can be sent in a Pull Request.
Exactly Once Semantics
JetStream supports Exactly Once publication and consumption by combining Message Deduplication and double acks.
On the publishing side you can avoid duplicate message ingestion using the feature.
Consumers can be 100% sure a message was correctly processed by requesting the server Acknowledge having received your acknowledgement (sometimes referred to as double-acking) by calling the message's AckSync() (rather than Ack()) function which sets a reply subject on the Ack and waits for a response from the server on the reception and processing of the acknowledgement. If the response received from the server indicates success you can be sure that the message will never be re-delivered by the consumer (due to a loss of your acknowledgement).
Consumer Starting Position
When setting up a Consumer you can decide where to start, the system supports the following for the DeliverPolicy:
Policy
Description
Regardless of what mode you set, this is only the starting point. Once started it will always give you what you have not seen or acknowledged. So this is merely how it picks the very first message.
Let's look at each of these, first we make a new Stream ORDERS and add 100 messages to it.
Now create a DeliverAll pull-based Consumer:
Now create a DeliverLast pull-based Consumer:
Now create a MsgSetSeq pull-based Consumer:
And finally a time-based Consumer. Let's add some messages a minute apart:
Then create a Consumer that starts 2 minutes ago:
Ephemeral Consumers
So far, all the Consumers you have seen were Durable, meaning they exist even after you disconnect from JetStream. In our Orders scenario, though the MONITOR a Consumer could very well be a short-lived thing there just while an operator is debugging the system, there is no need to remember the last seen position if all you are doing is wanting to observe the real-time state.
In this case, we can make an Ephemeral Consumer by first subscribing to the delivery subject, then creating a durable and giving it no durable name. An Ephemeral Consumer exists as long as any subscription is active on its delivery subject. It is automatically be removed, after a short grace period to handle restarts, when there are no subscribers.
Terminal 1:
Terminal 2:
The --ephemeral switch tells the system to make an Ephemeral Consumer.
Consumer Message Rates
Typically, what you want is if a new Consumer is made the selected messages are delivered to you as quickly as possible. You might want to replay messages at the rate they arrived though, meaning if messages first arrived 1 minute apart, and you make a new Consumer it will get the messages a minute apart.
This is useful in load testing scenarios etc. This is called the ReplayPolicy and have values of ReplayInstant and ReplayOriginal.
You can only set ReplayPolicy on push-based Consumers.
Now let's publish messages into the Set 10 seconds apart:
And when we consume them they will come to us 10 seconds apart:
Ack Sampling
In the earlier sections we saw that samples are being sent to a monitoring system. Let's look at that in depth; how the monitoring system works and what it contains.
As messages pass through a Consumer you'd be interested in knowing how many are being redelivered and how many times but also how long it takes for messages to be acknowledged.
Consumers can sample Ack'ed messages for you and publish samples so your monitoring system can observe the health of a Consumer. We will add support for this to .
Configuration
You can configure a Consumer for sampling bypassing the --sample 80 option to nats consumer add, this tells the system to sample 80% of Acknowledgements.
When viewing info of a Consumer you can tell if it's sampled or not:
Output contains
Storage Overhead
JetStream file storage is very efficient, storing as little extra information about the message as possible.
We do store some message data with each message, namely:
Message headers
The subject it was received on
The time it was received
Without any headers the size is:
A 5 byte hello message without headers will take 39 bytes.
With headers:
So if you are publishing many small messages the overhead will be, relatively speaking, quite large, but for larger messages the overhead is very small. If you publish many small messages it's worth trying to optimize the subject length.
AckTerm
+TERM
Instructs the server to stop redelivery of a message without acknowledging it as successfully processed
The message payload
A hash of the message
The message sequence
A few other bits like the length of the subject and the length of headers
LimitsPolicy
Limits are set for how many messages, how big the storage and how old messages may be.
WorkQueuePolicy
Messages are kept until they are consumed: meaning delivered ( by the consumer filtering on the message's subject (in this mode of operation you can not have any overlapping consumers defined on the Stream - each subject captured by the stream can only have one consumer at a time)) to a subscribing application and explicitly acknowledged by that application.
InterestPolicy
Messages are kept as long as there are Consumers on the stream (matching the message's subject if they are filtered consumers) for which the message has not yet been ACKed. Once all currently defined consumers have received explicit acknowledgement from a subscribing application for the message it is then removed from the stream.
AckExplicit
This requires every message to be specifically acknowledged, it's the only supported option for pull-based Consumers
AckAll
In this mode if you acknowledge message 100 it will also acknowledge message 1-99, this is good for processing batches and to reduce ack overhead
AckNone
No acknowledgements are supported
AckAck
nil, +ACK
Acknowledges a message was completely handled
AckNak
-NAK
Signals that the message will not be processed now and processing can move onto the next message, NAK'd message will be retried
AckProgress
+WPI
When sent before the AckWait period indicates that work is ongoing and the period should be extended by another equal to AckWait
AckNext
+NXT
all
Delivers all messages that are available
last
Delivers the latest message, like a tail -n 1 -f
new
Delivers only new messages that arrive after subscribe time
by_start_time
Delivers from a specific time onward. Requires OptStartTime to be set
by_start_sequence
Delivers from a specific stream sequence. Requires OptStartSeq to be set
nats consumer add ORDERS REPLAY --target out.original --filter ORDERS.processed --ack none --deliver all --sample 100 --replay original
...
Replay Policy: original
...
for i in 1 2 3 <15:15:35
do
nats pub ORDERS.processed "order ${i}"
sleep 10
done
Published [ORDERS.processed] : 'order 1'
Published [ORDERS.processed] : 'order 2'
Published [ORDERS.processed] : 'order 3'
nats sub -t out.original
Listening on [out.original]
2020/01/03 15:17:26 [#1] Received on [ORDERS.processed]: 'order 1'
2020/01/03 15:17:36 [#2] Received on [ORDERS.processed]: 'order 2'
2020/01/03 15:17:46 [#3] Received on [ORDERS.processed]: 'order 3'
^C
nats consumer info ORDERS NEW
...
Sampling Rate: 100
...
length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + msg + hash(8)
length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + hdr_len(4) + hdr + msg + hash(8)
Using the Object Store
The Object Store allows you to store data of any (i.e. large) size by implementing a chunking mechanism, allowing you to for example store and retrieve files (i.e. the object) of any size by associating them with a path and a file name (i.e. the key). You obtain a ObjectStoreManager object from your JetStream context.
// ObjectStoreManager is used to manage object stores. It provides methods// for CRUD operations on object stores.typeObjectStoreManagerinterface
/**
* Object Store Management context for creation and access to key value buckets.
*/
public interface ObjectStore {
/**
* Get the name of the object store's bucket.
* @return the name
*/
String getBucketName();
/**
* Place the contents of the input stream into a new object.
*/
ObjectInfo put(ObjectMeta meta, InputStream inputStream) throws IOException, JetStreamApiException, NoSuchAlgorithmException;
/**
* Place the contents of the input stream into a new object.
*/
ObjectInfo put(String objectName, InputStream inputStream) throws IOException, JetStreamApiException, NoSuchAlgorithmException;
/**
* Place the bytes into a new object.
*/
ObjectInfo put(String objectName, byte[] input) throws IOException, JetStreamApiException, NoSuchAlgorithmException;
/**
* Place the contents of the file into a new object using the file name as the object name.
*/
ObjectInfo put(File file) throws IOException, JetStreamApiException, NoSuchAlgorithmException;
/**
* Get an object by name from the store, reading it into the output stream, if the object exists.
*/
ObjectInfo get(String objectName, OutputStream outputStream) throws IOException, JetStreamApiException, InterruptedException, NoSuchAlgorithmException;
/**
* Get the info for an object if the object exists / is not deleted.
*/
ObjectInfo getInfo(String objectName) throws IOException, JetStreamApiException;
/**
* Get the info for an object if the object exists, optionally including deleted.
*/
ObjectInfo getInfo(String objectName, boolean includingDeleted) throws IOException, JetStreamApiException;
/**
* Update the metadata of name, description or headers. All other changes are ignored.
*/
ObjectInfo updateMeta(String objectName, ObjectMeta meta) throws IOException, JetStreamApiException;
/**
* Delete the object by name. A No-op if the object is already deleted.
*/
ObjectInfo delete(String objectName) throws IOException, JetStreamApiException;
/**
* Add a link to another object. A link cannot be for another link.
*/
ObjectInfo addLink(String objectName, ObjectInfo toInfo) throws IOException, JetStreamApiException;
/**
* Add a link to another object store (bucket).
*/
ObjectInfo addBucketLink(String objectName, ObjectStore toStore) throws IOException, JetStreamApiException;
/**
* Close (seal) the bucket to changes. The store (bucket) will be read only.
*/
ObjectStoreStatus seal() throws IOException, JetStreamApiException;
/**
* Get a list of all object [infos] in the store.
*/
List<ObjectInfo> getList() throws IOException, JetStreamApiException, InterruptedException;
/**
* Create a watch on the store (bucket).
*/
NatsObjectStoreWatchSubscription watch(ObjectStoreWatcher watcher, ObjectStoreWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException;
/**
* Get the ObjectStoreStatus object.
*/
ObjectStoreStatus getStatus() throws IOException, JetStreamApiException;
async def object_store(self, bucket: str) -> ObjectStore:
async def create_object_store(
self,
bucket: str = None,
config: Optional[api.ObjectStoreConfig] = None,
**params,
) -> ObjectStore:
"""
create_object_store takes an api.ObjectStoreConfig and creates a OBJ in JetStream.
"""
async def delete_object_store(self, bucket: str) -> bool:
"""
delete_object_store will delete the underlying stream for the named object.
"""
// dotnet add package NATS.Net
/// <summary>
/// NATS Object Store context.
/// </summary>
public interface INatsObjContext
{
/// <summary>
/// Provides access to the JetStream context associated with the Object Store operations.
/// </summary>
INatsJSContext JetStreamContext { get; }
/// <summary>
/// Create a new object store.
/// </summary>
/// <param name="bucket">Bucket name.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object store object.</returns>
ValueTask<INatsObjStore> CreateObjectStoreAsync(string bucket, CancellationToken cancellationToken = default);
/// <summary>
/// Create a new object store.
/// </summary>
/// <param name="config">Object store configuration.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object store object.</returns>
ValueTask<INatsObjStore> CreateObjectStoreAsync(NatsObjConfig config, CancellationToken cancellationToken = default);
/// <summary>
/// Get an existing object store.
/// </summary>
/// <param name="bucket">Bucket name</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The Object Store object</returns>
ValueTask<INatsObjStore> GetObjectStoreAsync(string bucket, CancellationToken cancellationToken = default);
/// <summary>
/// Delete an object store.
/// </summary>
/// <param name="bucket">Name of the bucket.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Whether delete was successful or not.</returns>
ValueTask<bool> DeleteObjectStore(string bucket, CancellationToken cancellationToken);
}
/// <summary>
/// NATS Object Store.
/// </summary>
public interface INatsObjStore
{
/// <summary>
/// Provides access to the JetStream context associated with the Object Store operations.
/// </summary>
INatsJSContext JetStreamContext { get; }
/// <summary>
/// Object store bucket name.
/// </summary>
string Bucket { get; }
/// <summary>
/// Get object by key.
/// </summary>
/// <param name="key">Object key.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object value as a byte array.</returns>
ValueTask<byte[]> GetBytesAsync(string key, CancellationToken cancellationToken = default);
/// <summary>
/// Get object by key.
/// </summary>
/// <param name="key">Object key.</param>
/// <param name="stream">Stream to write the object value to.</param>
/// <param name="leaveOpen"><c>true</c> to not close the underlying stream when async method returns; otherwise, <c>false</c></param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object metadata.</returns>
/// <exception cref="NatsObjException">Metadata didn't match the value retrieved e.g. the SHA digest.</exception>
ValueTask<ObjectMetadata> GetAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default);
/// <summary>
/// Put an object by key.
/// </summary>
/// <param name="key">Object key.</param>
/// <param name="value">Object value as a byte array.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object metadata.</returns>
ValueTask<ObjectMetadata> PutAsync(string key, byte[] value, CancellationToken cancellationToken = default);
/// <summary>
/// Put an object by key.
/// </summary>
/// <param name="key">Object key.</param>
/// <param name="stream">Stream to read the value from.</param>
/// <param name="leaveOpen"><c>true</c> to not close the underlying stream when async method returns; otherwise, <c>false</c></param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object metadata.</returns>
/// <exception cref="NatsObjException">There was an error calculating SHA digest.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<ObjectMetadata> PutAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default);
/// <summary>
/// Put an object by key.
/// </summary>
/// <param name="meta">Object metadata.</param>
/// <param name="stream">Stream to read the value from.</param>
/// <param name="leaveOpen"><c>true</c> to not close the underlying stream when async method returns; otherwise, <c>false</c></param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object metadata.</returns>
/// <exception cref="NatsObjException">There was an error calculating SHA digest.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<ObjectMetadata> PutAsync(ObjectMetadata meta, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default);
/// <summary>
/// Update object metadata
/// </summary>
/// <param name="key">Object key</param>
/// <param name="meta">Object metadata</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object metadata</returns>
/// <exception cref="NatsObjException">There is already an object with the same name</exception>
ValueTask<ObjectMetadata> UpdateMetaAsync(string key, ObjectMetadata meta, CancellationToken cancellationToken = default);
/// <summary>
/// Add a link to another object
/// </summary>
/// <param name="link">Link name</param>
/// <param name="target">Target object's name</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Metadata of the new link object</returns>
ValueTask<ObjectMetadata> AddLinkAsync(string link, string target, CancellationToken cancellationToken = default);
/// <summary>
/// Add a link to another object
/// </summary>
/// <param name="link">Link name</param>
/// <param name="target">Target object's metadata</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Metadata of the new link object</returns>
ValueTask<ObjectMetadata> AddLinkAsync(string link, ObjectMetadata target, CancellationToken cancellationToken = default);
/// <summary>
/// Add a link to another object store
/// </summary>
/// <param name="link">Object's name to be linked</param>
/// <param name="target">Target object store</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Metadata of the new link object</returns>
/// <exception cref="NatsObjException">Object with the same name already exists</exception>
ValueTask<ObjectMetadata> AddBucketLinkAsync(string link, INatsObjStore target, CancellationToken cancellationToken = default);
/// <summary>
/// Seal the object store. No further modifications will be allowed.
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <exception cref="NatsObjException">Update operation failed</exception>
ValueTask SealAsync(CancellationToken cancellationToken = default);
/// <summary>
/// Get object metadata by key.
/// </summary>
/// <param name="key">Object key.</param>
/// <param name="showDeleted">Also retrieve deleted objects.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object metadata.</returns>
/// <exception cref="NatsObjException">Object was not found.</exception>
ValueTask<ObjectMetadata> GetInfoAsync(string key, bool showDeleted = false, CancellationToken cancellationToken = default);
/// <summary>
/// List all the objects in this store.
/// </summary>
/// <param name="opts">List options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>An async enumerable object metadata to be used in an <c>await foreach</c></returns>
IAsyncEnumerable<ObjectMetadata> ListAsync(NatsObjListOpts? opts = default, CancellationToken cancellationToken = default);
/// <summary>
/// Retrieves run-time status about the backing store of the bucket.
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object store status</returns>
ValueTask<NatsObjStatus> GetStatusAsync(CancellationToken cancellationToken = default);
/// <summary>
/// Watch for changes in the underlying store and receive meta information updates.
/// </summary>
/// <param name="opts">Watch options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>An async enumerable object metadata to be used in an <c>await foreach</c></returns>
IAsyncEnumerable<ObjectMetadata> WatchAsync(NatsObjWatchOpts? opts = default, CancellationToken cancellationToken = default);
/// <summary>
/// Delete an object by key.
/// </summary>
/// <param name="key">Object key.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <exception cref="NatsObjException">Object metadata was invalid or chunks can't be purged.</exception>
ValueTask DeleteAsync(string key, CancellationToken cancellationToken = default);
}
{
// ObjectStore will look up and bind to an existing object store
// instance.
//
// If the object store with given name does not exist, ErrBucketNotFound
funcExampleJetStream(){nc,err:=nats.Connect("localhost")iferr!=nil{log.Fatal(err)}// Use the JetStream context to produce and consumer messages// that have been persisted.js,err:=nc.JetStream(nats.PublishAsyncMaxPending(256))iferr!=nil{log.Fatal(err)}js.AddStream(&nats.StreamConfig{Name:"example-stream",Subjects:[]string{"example-subject"},})js.Publish("example-subject",[]byte("Hello JS!"))// Publish messages asynchronously.fori:=0;i<500;i++{js.PublishAsync("example-subject",[]byte("Hello JS Async!"))}select{case<-js.PublishAsyncComplete():case<-time.After(5*time.Second):fmt.Println("Did not resolve in time")}}
try (Connection nc = Nats.connect("localhost")) {
JetStreamManagement jsm = nc.jetStreamManagement();
jsm.addStream(StreamConfiguration.builder()
.name("example-stream")
.subjects("example-subject")
.build());
JetStream js = jsm.jetStream();
// Publish Synchronously
PublishAck pa = js.publish("example-subject", "Hello JS Sync!".getBytes());
System.out.println("Publish Sequence: " + pa.getSeqno());
// Publish Asynchronously
CompletableFuture<PublishAck> future =
js.publishAsync("example-subject", "Hello JS Async!".getBytes());
try {
pa = future.get(1, TimeUnit.SECONDS);
System.out.println("Publish Sequence: " + pa.getSeqno());
}
catch (ExecutionException e) {
// Might have been a problem with the publish,
// such as a failed expectation (advanced feature)
// Also could be that the publish ack did not return in time
// from the internal request timeout
}
catch (TimeoutException e) {
// The future timed out meaning it's timeout was shorter than
// the publish async's request timeout
}
catch (InterruptedException e) {
// The future.get() thread was interrupted.
}
}
import { connect, Empty } from "../../src/mod.ts";
const nc = await connect();
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "example-stream", subjects: ["example-subject"] });
const js = await nc.jetstream();
// the jetstream client provides a publish that returns
// a confirmation that the message was received and stored
// by the server. You can associate various expectations
// when publishing a message to prevent duplicates.
// If the expectations are not met, the message is rejected.
let pa = await js.publish("example-subject", Empty, {
msgID: "a",
expect: { streamName: "example-stream" },
});
console.log(`${pa.stream}[${pa.seq}]: duplicate? ${pa.duplicate}`);
pa = await js.publish("example-subject", Empty, {
msgID: "a",
expect: { lastSequence: 1 },
});
console.log(`${pa.stream}[${pa.seq}]: duplicate? ${pa.duplicate}`);
await jsm.streams.delete("example-stream");
await nc.drain();
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'example-subject'.
await js.add_stream(name="example-stream", subjects=["example-subject"])
for i in range(0, 10):
ack = await js.publish("example-subject", f"hello world: {i}".encode())
print(ack)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
await using var client = new NatsClient();
INatsJSContext js = client.CreateJetStreamContext();
// Create a stream
var streamConfig = new StreamConfig(name: "example-stream", subjects: ["example-subject"]);
await js.CreateStreamAsync(streamConfig);
// Publish a message
{
PubAckResponse ack = await js.PublishAsync("example-subject", "Hello, JetStream!");
ack.EnsureSuccess();
}
// Publish messages concurrently
List<NatsJSPublishConcurrentFuture> futures = new();
for (var i = 0; i < 500; i++)
{
NatsJSPublishConcurrentFuture future
= await js.PublishConcurrentAsync("example-subject", "Hello, JetStream 1!");
futures.Add(future);
}
foreach (var future in futures)
{
await using (future)
{
PubAckResponse ack = await future.GetResponseAsync();
ack.EnsureSuccess();
}
}
#include "examples.h"
static const char *usage = ""\
"-stream stream name (default is 'foo')\n" \
"-txt text to send (default is 'hello')\n" \
"-count number of messages to send\n" \
"-sync publish synchronously (default is async)\n";
static void
_jsPubErr(jsCtx *js, jsPubAckErr *pae, void *closure)
{
int *errors = (int*) closure;
printf("Error: %u - Code: %u - Text: %s\n", pae->Err, pae->ErrCode, pae->ErrText);
printf("Original message: %.*s\n", natsMsg_GetDataLength(pae->Msg), natsMsg_GetData(pae->Msg));
*errors = (*errors + 1);
// If we wanted to resend the original message, we would do something like that:
//
// js_PublishMsgAsync(js, &(pae->Msg), NULL);
//
// Note that we use `&(pae->Msg)` so that the library set it to NULL if it takes
// ownership, and the library will not destroy the message when this callback returns.
// No need to destroy anything, everything is handled by the library.
}
int main(int argc, char **argv)
{
natsConnection *conn = NULL;
natsStatistics *stats = NULL;
natsOptions *opts = NULL;
jsCtx *js = NULL;
jsOptions jsOpts;
jsErrCode jerr = 0;
natsStatus s;
int dataLen=0;
volatile int errors = 0;
bool delStream = false;
opts = parseArgs(argc, argv, usage);
dataLen = (int) strlen(payload);
s = natsConnection_Connect(&conn, opts);
if (s == NATS_OK)
s = jsOptions_Init(&jsOpts);
if (s == NATS_OK)
{
if (async)
{
jsOpts.PublishAsync.ErrHandler = _jsPubErr;
jsOpts.PublishAsync.ErrHandlerClosure = (void*) &errors;
}
s = natsConnection_JetStream(&js, conn, &jsOpts);
}
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
// First check if the stream already exists.
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_NOT_FOUND)
{
jsStreamConfig cfg;
// Since we are the one creating this stream, we can delete at the end.
delStream = true;
// Initialize the configuration structure.
jsStreamConfig_Init(&cfg);
cfg.Name = stream;
// Set the subject
cfg.Subjects = (const char*[1]){subj};
cfg.SubjectsLen = 1;
// Make it a memory stream.
cfg.Storage = js_MemoryStorage;
// Add the stream,
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
}
if (s == NATS_OK)
{
printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
// Need to destroy the returned stream object.
jsStreamInfo_Destroy(si);
}
}
if (s == NATS_OK)
s = natsStatistics_Create(&stats);
if (s == NATS_OK)
{
printf("\nSending %" PRId64 " messages to subject '%s'\n", total, stream);
start = nats_Now();
}
for (count = 0; (s == NATS_OK) && (count < total); count++)
{
if (async)
s = js_PublishAsync(js, subj, (const void*) payload, dataLen, NULL);
else
{
jsPubAck *pa = NULL;
s = js_Publish(&pa, js, subj, (const void*) payload, dataLen, NULL, &jerr);
if (s == NATS_OK)
{
if (pa->Duplicate)
printf("Got a duplicate message! Sequence=%" PRIu64 "\n", pa->Sequence);
jsPubAck_Destroy(pa);
}
}
}
if ((s == NATS_OK) && async)
{
jsPubOptions jsPubOpts;
jsPubOptions_Init(&jsPubOpts);
// Let's set it to 30 seconds, if getting "Timeout" errors,
// this may need to be increased based on the number of messages
// being sent.
jsPubOpts.MaxWait = 30000;
s = js_PublishAsyncComplete(js, &jsPubOpts);
if (s == NATS_TIMEOUT)
{
// Let's get the list of pending messages. We could resend,
// etc, but for now, just destroy them.
natsMsgList list;
js_PublishAsyncGetPendingList(&list, js);
natsMsgList_Destroy(&list);
}
}
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
elapsed = nats_Now() - start;
printStats(STATS_OUT, conn, NULL, stats);
printPerf("Sent");
if (errors != 0)
printf("There were %d asynchronous errors\n", errors);
// Let's report some stats after the run
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_OK)
{
printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
jsStreamInfo_Destroy(si);
}
}
if (delStream && (js != NULL))
{
printf("\nDeleting stream %s: ", stream);
s = js_DeleteStream(js, stream, NULL, &jerr);
if (s == NATS_OK)
printf("OK!");
printf("\n");
}
if (s != NATS_OK)
{
printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
nats_PrintLastErrorStack(stderr);
}
// Destroy all our objects to avoid report of memory leak
jsCtx_Destroy(js);
natsStatistics_Destroy(stats);
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);
// To silence reports of memory still in used with valgrind
nats_Close();
return 0;
}
Managing Streams and consumers
Streams and durable consumers can be defined administratively outside the application (typically using the NATS CLI Tool) in which case the application only needs to know about the well-known names of the durable consumers it wants to use. But you can also manage streams and consumers programmatically.
Common stream management operations are:
Add a stream. Adding a stream is an idempotent function, which means that if a stream does not exist, it will be created, and if a stream already exists, then the add operation will succeed only if the existing stream matches exactly the attributes specified in the 'add' call.
Delete a stream.
Purge a stream (delete all the messages stored in the stream)
Get or remove a specific message from a stream by sequence number
Add or update (or delete) a consumer
Get info and statistics on streams/consumers/account. Get/remove/get information on individual messages stored in a stream.
funcExampleJetStreamManager(){nc,_:=nats.Connect("localhost")js,_:=nc.JetStream()// Create a streamjs.AddStream(&nats.StreamConfig{Name:"example-stream",Subjects:[]string{"example-subject"},MaxBytes:1024,})// Update a streamjs.UpdateStream(&nats.StreamConfig{Name:"example-stream",MaxBytes:2048,})// Create a durable consumerjs.AddConsumer("example-stream",&nats.ConsumerConfig{Durable:"example-consumer-name",})// Get information about all streams (with Context JSOpt)ctx,cancel:=context.WithTimeout(context.Background(),10*time.Seconddefercancel()forinfo:=rangejs.StreamsInfo(nats.Context(ctx)){fmt.Println("stream name: ",info.Config.Name)}// Get information about all consumers (with MaxWait JSOpt)forinfo:=rangejs.ConsumersInfo("example-stream",nats.MaxWait(fmt.Println("consumer name: ",info.Name)}// Delete a consumerjs.DeleteConsumer("example-stream","example-consumer-name")// Delete a streamjs.DeleteStream("example-stream")}
Using the Key/Value Store
As the Key Value Store is built on top of the JetStream persistence layer you obtain a KeyValueManager object from your JetStream context.
The key must be in the same format as a NATS subject, i.e. it can be a dot separated list of tokens (which means that you can then use wildcards to match hierarchies of keys when watching a bucket), and can only contain valid characters. The value can be any byte array.
Creating, and deleting KV buckets
You can create as many independent key/value store instance, called 'buckets', as you need. Buckets are typically created, purged or deleted administratively (e.g. using the nats CLI tool), but this can also be done using one of the following KeyValueManager calls:
Getting
You can do a get to get the current value on a key, or ask to get a specific revision of the value.
Putting
The key is always a string, you can simply use Put to store a byte array, or the convenience PutString to put a string. For 'compare and set' functionality you can use Create and Update.
Deleting
You can delete a specific key, or purge the whole key/value bucket.
Getting all the keys
You can get the list of all the keys currently having a value associated using Keys()
Getting the history for a key
The JetStream key/value store has a feature you don't usually find in key/value stores: the ability to keep a history of the values associated with a key (rather than just the current value). The depth of the history is specified when the key/value bucket is created, and the default is a history depth of 1 (i.e. no history). The maximum history size is 64, if you need more your use case will be better implemented using the Stream functionality directly (where you can set the max number of messages per subject to any value you want) rather than the KV abstraction.
Watching for changes
Watching a key/value bucket is like subscribing to updates: you provide a callback and you can watch all of the keys in the bucket or specify which specific key(s) you want to be kept updated about.
try (Connection nc = Nats.connect("localhost")) {
JetStreamManagement jsm = nc.jetStreamManagement();
// Create a stream
StreamInfo si = jsm.addStream(StreamConfiguration.builder()
.name("example-stream")
.subjects("example-subject")
.maxBytes(1024)
.build());
StreamConfiguration config = si.getConfiguration();
System.out.println("stream name: " + config.getName() + ", max_bytes: " + config.getMaxBytes());
// Update a stream
si = jsm.updateStream(StreamConfiguration.builder()
.name("example-stream")
.maxBytes(2048)
.build());
config = si.getConfiguration();
System.out.println("stream name: " + config.getName() + ", max_bytes: " + config.getMaxBytes());
// Create a durable consumer
jsm.createConsumer("example-stream", ConsumerConfiguration.builder()
.durable("example-consumer-name")
.build());
// Get information about all streams
List<StreamInfo> streams = jsm.getStreams();
for (StreamInfo info : streams) {
System.out.println("stream name: " + info.getConfiguration().getName());
}
// Get information about all consumers
List<ConsumerInfo> consumers = jsm.getConsumers("example-stream");
for (ConsumerInfo ci : consumers) {
System.out.println("consumer name: " + ci.getName());
}
// Delete a consumer
jsm.deleteConsumer("example-stream", "example-consumer-name");
// Delete a stream
jsm.deleteStream("example-stream");
}
import { AckPolicy, connect, Empty } from "../../src/mod.ts";
const nc = await connect();
const jsm = await nc.jetstreamManager();
// list all the streams, the `next()` function
// retrieves a paged result.
const streams = await jsm.streams.list().next();
streams.forEach((si) => {
console.log(si);
});
// add a stream
const stream = "mystream";
const subj = `mystream.*`;
await jsm.streams.add({ name: stream, subjects: [subj] });
// publish a reg nats message directly to the stream
for (let i = 0; i < 10; i++) {
nc.publish(`${subj}.a`, Empty);
}
// find a stream that stores a specific subject:
const name = await jsm.streams.find("mystream.A");
// retrieve info about the stream by its name
const si = await jsm.streams.info(name);
// update a stream configuration
si.config.subjects?.push("a.b");
await jsm.streams.update(name, si.config);
// get a particular stored message in the stream by sequence
// this is not associated with a consumer
const sm = await jsm.streams.getMessage(stream, { seq: 1 });
console.log(sm.seq);
// delete the 5th message in the stream, securely erasing it
await jsm.streams.deleteMessage(stream, 5);
// purge all messages in the stream, the stream itself
// remains.
await jsm.streams.purge(stream);
// purge all messages with a specific subject (filter can be a wildcard)
await jsm.streams.purge(stream, { filter: "a.b" });
// purge messages with a specific subject keeping some messages
await jsm.streams.purge(stream, { filter: "a.c", keep: 5 });
// purge all messages with upto (not including seq)
await jsm.streams.purge(stream, { seq: 100 });
// purge all messages with upto sequence that have a matching subject
await jsm.streams.purge(stream, { filter: "a.d", seq: 100 });
// list all consumers for a stream:
const consumers = await jsm.consumers.list(stream).next();
consumers.forEach((ci) => {
console.log(ci);
});
// add a new durable pull consumer
await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
});
// retrieve a consumer's configuration
const ci = await jsm.consumers.info(stream, "me");
console.log(ci);
// delete a particular consumer
await jsm.consumers.delete(stream, "me");
// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
await using var client = new NatsClient();
INatsJSContext js = client.CreateJetStreamContext();
// Create a stream
var streamConfig = new StreamConfig(name: "example-stream", subjects: ["example-subject"])
{
MaxBytes = 1024,
};
await js.CreateStreamAsync(streamConfig);
// Update the stream
var streamConfigUpdated = streamConfig with { MaxBytes = 2048 };
await js.UpdateStreamAsync(streamConfigUpdated);
// Create a durable consumer
await js.CreateConsumerAsync("example-stream", new ConsumerConfig("example-consumer-name"));
// Get information about all streams
await foreach (var stream in js.ListStreamsAsync())
{
Console.WriteLine($"stream name: {stream.Info.Config.Name}");
}
// Get information about all consumers in a stream
await foreach (var consumer in js.ListConsumersAsync("example-stream"))
{
Console.WriteLine($"consumer name: {consumer.Info.Config.Name}");
}
// Delete a consumer
await js.DeleteConsumerAsync("example-stream", "example-consumer-name");
// Delete a stream
await js.DeleteStreamAsync("example-stream");
// Output:
// stream name: example-stream
// consumer name: example-consumer-name
#include "examples.h"
static const char *usage = ""\
"-stream stream name (default is 'foo')\n" \
"-txt text to send (default is 'hello')\n" \
"-count number of messages to send\n" \
"-sync publish synchronously (default is async)\n";
static void
_jsPubErr(jsCtx *js, jsPubAckErr *pae, void *closure)
{
int *errors = (int*) closure;
printf("Error: %u - Code: %u - Text: %s\n", pae->Err, pae->ErrCode, pae->ErrText);
printf("Original message: %.*s\n", natsMsg_GetDataLength(pae->Msg), natsMsg_GetData(pae->Msg));
*errors = (*errors + 1);
// If we wanted to resend the original message, we would do something like that:
//
// js_PublishMsgAsync(js, &(pae->Msg), NULL);
//
// Note that we use `&(pae->Msg)` so that the library set it to NULL if it takes
// ownership, and the library will not destroy the message when this callback returns.
// No need to destroy anything, everything is handled by the library.
}
int main(int argc, char **argv)
{
natsConnection *conn = NULL;
natsStatistics *stats = NULL;
natsOptions *opts = NULL;
jsCtx *js = NULL;
jsOptions jsOpts;
jsErrCode jerr = 0;
natsStatus s;
int dataLen=0;
volatile int errors = 0;
bool delStream = false;
opts = parseArgs(argc, argv, usage);
dataLen = (int) strlen(payload);
s = natsConnection_Connect(&conn, opts);
if (s == NATS_OK)
s = jsOptions_Init(&jsOpts);
if (s == NATS_OK)
{
if (async)
{
jsOpts.PublishAsync.ErrHandler = _jsPubErr;
jsOpts.PublishAsync.ErrHandlerClosure = (void*) &errors;
}
s = natsConnection_JetStream(&js, conn, &jsOpts);
}
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
// First check if the stream already exists.
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_NOT_FOUND)
{
jsStreamConfig cfg;
// Since we are the one creating this stream, we can delete at the end.
delStream = true;
// Initialize the configuration structure.
jsStreamConfig_Init(&cfg);
cfg.Name = stream;
// Set the subject
cfg.Subjects = (const char*[1]){subj};
cfg.SubjectsLen = 1;
// Make it a memory stream.
cfg.Storage = js_MemoryStorage;
// Add the stream,
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
}
if (s == NATS_OK)
{
printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
// Need to destroy the returned stream object.
jsStreamInfo_Destroy(si);
}
}
if (s == NATS_OK)
s = natsStatistics_Create(&stats);
if (s == NATS_OK)
{
printf("\nSending %" PRId64 " messages to subject '%s'\n", total, stream);
start = nats_Now();
}
for (count = 0; (s == NATS_OK) && (count < total); count++)
{
if (async)
s = js_PublishAsync(js, subj, (const void*) payload, dataLen, NULL);
else
{
jsPubAck *pa = NULL;
s = js_Publish(&pa, js, subj, (const void*) payload, dataLen, NULL, &jerr);
if (s == NATS_OK)
{
if (pa->Duplicate)
printf("Got a duplicate message! Sequence=%" PRIu64 "\n", pa->Sequence);
jsPubAck_Destroy(pa);
}
}
}
if ((s == NATS_OK) && async)
{
jsPubOptions jsPubOpts;
jsPubOptions_Init(&jsPubOpts);
// Let's set it to 30 seconds, if getting "Timeout" errors,
// this may need to be increased based on the number of messages
// being sent.
jsPubOpts.MaxWait = 30000;
s = js_PublishAsyncComplete(js, &jsPubOpts);
if (s == NATS_TIMEOUT)
{
// Let's get the list of pending messages. We could resend,
// etc, but for now, just destroy them.
natsMsgList list;
js_PublishAsyncGetPendingList(&list, js);
natsMsgList_Destroy(&list);
}
}
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
elapsed = nats_Now() - start;
printStats(STATS_OUT, conn, NULL, stats);
printPerf("Sent");
if (errors != 0)
printf("There were %d asynchronous errors\n", errors);
// Let's report some stats after the run
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_OK)
{
printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
jsStreamInfo_Destroy(si);
}
}
if (delStream && (js != NULL))
{
printf("\nDeleting stream %s: ", stream);
s = js_DeleteStream(js, stream, NULL, &jerr);
if (s == NATS_OK)
printf("OK!");
printf("\n");
}
if (s != NATS_OK)
{
printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
nats_PrintLastErrorStack(stderr);
}
// Destroy all our objects to avoid report of memory leak
jsCtx_Destroy(js);
natsStatistics_Destroy(stats);
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);
// To silence reports of memory still in used with valgrind
nats_Close();
return 0;
}
)
10
*
time
.
Second
))
{
// KeyValue will lookup and bind to an existing KeyValue store.KeyValue(bucketstring)(KeyValue,error)// CreateKeyValue will create a KeyValue store with the following configuration.CreateKeyValue(cfg*KeyValueConfig)(KeyValue,error)// DeleteKeyValue will delete this KeyValue store (JetStream stream).DeleteKeyValue(bucketstring)error
/** * Create a key value store. * @paramconfig the key value configuration * @return bucket info * @throwsIOException covers various communication issues with the NATS * server such as timeout or interruption * @throwsJetStreamApiException the request had an error related to the data * @throwsIllegalArgumentException the server is not JetStream enabled*/KeyValueStatuscreate(KeyValueConfiguration config) throws IOException, JetStreamApiException;/*** Get the list of bucket names.* @return list of bucket names* @throwsIOException covers various communication issues with the NATS* server such as timeout or interruption* @throwsJetStreamApiException the request had an error related to the data* @throwsInterruptedException if the thread is interrupted*/List<String>getBucketNames() throws IOException, JetStreamApiException, InterruptedException;/*** Gets the info for an existing bucket.* @parambucketName the bucket name to use* @throwsIOException covers various communication issues with the NATS* server such as timeout or interruption* @throwsJetStreamApiException the request had an error related to the data* @return the bucket status object*/KeyValueStatusgetBucketInfo(String bucketName) throws IOException, JetStreamApiException;/*** Deletes an existing bucket. Will throw a JetStreamApiException if the delete fails.* @parambucketName the stream name to use.* @throwsIOException covers various communication issues with the NATS* server such as timeout or interruption* @throwsJetStreamApiException the request had an error related to the data*/voiddelete(String bucketName) throws IOException, JetStreamApiException;
// Get returns the latest value for the key.Get(keystring)(entryKeyValueEntry,errerror)// GetRevision returns a specific revision value for the key.GetRevision(keystring,revisionuint64)(entryKeyValueEntry,errerror)
/*** Get the entry for a key* @paramkey the key* @return the KvEntry object or null if not found.* @throwsIOException covers various communication issues with the NATS* server such as timeout or interruption* @throwsJetStreamApiException the request had an error related to the data* @throwsIllegalArgumentException the server is not JetStream enabled*/KeyValueEntryget(String key) throws IOException, JetStreamApiException;/*** Get the specific revision of an entry for a key.* @paramkey the key* @paramrevision the revision* @return the KvEntry object or null if not found.* @throwsIOException covers various communication issues with the NATS* server such as timeout or interruption* @throwsJetStreamApiException the request had an error related to the data* @throwsIllegalArgumentException the server is not JetStream enabled*/KeyValueEntryget(String key,long revision) throws IOException, JetStreamApiException;
# from the JetStreamContext
async def key_value(self, bucket: str) -> KeyValue:
async def create_key_value(
self,
config: Optional[api.KeyValueConfig] = None,
**params,
) -> KeyValue:
"""
create_key_value takes an api.KeyValueConfig and creates a KV in JetStream.
"""
async def delete_key_value(self, bucket: str) -> bool:
"""
delete_key_value deletes a JetStream KeyValue store by destroying
the associated stream.
"""
// dotnet add package NATS.Net
// Create a new Key Value Store or get an existing one
ValueTask<INatsKVStore> CreateStoreAsync(string bucket, CancellationToken cancellationToken = default);
// Get a list of bucket names
IAsyncEnumerable<string> GetBucketNamesAsync(CancellationToken cancellationToken = default);
// Gets the status for all buckets
IAsyncEnumerable<NatsKVStatus> GetStatusesAsync(CancellationToken cancellationToken = default);
// Delete a Key Value Store
ValueTask<bool> DeleteStoreAsync(string bucket, CancellationToken cancellationToken = default);
//
NATS_EXTERN natsStatus kvConfig_Init (kvConfig *cfg)
Initializes a KeyValue configuration structure.
NATS_EXTERN natsStatus js_CreateKeyValue (kvStore **new_kv, jsCtx *js, kvConfig *cfg)
Creates a KeyValue store with a given configuration.
NATS_EXTERN natsStatus js_KeyValue (kvStore **new_kv, jsCtx *js, const char *bucket)
Looks-up and binds to an existing KeyValue store.
NATS_EXTERN natsStatus js_DeleteKeyValue (jsCtx *js, const char *bucket)
Deletes a KeyValue store.
NATS_EXTERN void kvStore_Destroy (kvStore *kv)
Destroys a KeyValue store object.
async get(k: string): Promise<KvEntry | null>
async def get(self, key: str) -> Entry:
"""
get returns the latest value for the key.
"""
// dotnet add package NATS.Net
// Get an entry from the bucket using the key
ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);
//
NATS_EXTERN natsStatus kvStore_Get (kvEntry **new_entry, kvStore *kv, const char *key)
Returns the latest entry for the key.
NATS_EXTERN natsStatus kvStore_GetRevision (kvEntry **new_entry, kvStore *kv, const char *key, uint64_t revision)
Returns the entry at the specific revision for the key.
Put(key string, value []byte) (revision uint64, err error)
// PutString will place the string for the key into the store.
PutString(key string, value string) (revision uint64, err error)
// Create will add the key/value pair if it does not exist.
Create(key string, value []byte) (revision uint64, err error)
// Update will update the value if the latest revision matches.
Update(key string, value []byte, last uint64) (revision uint64, err error)
/**
* Put a byte[] as the value for a key
* @param key the key
* @param value the bytes of the value
* @return the revision number for the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
long put(String key, byte[] value) throws IOException, JetStreamApiException;
/**
* Put a string as the value for a key
* @param key the key
* @param value the UTF-8 string
* @return the revision number for the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
long put(String key, String value) throws IOException, JetStreamApiException;
/**
* Put a long as the value for a key
* @param key the key
* @param value the number
* @return the revision number for the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
long put(String key, Number value) throws IOException, JetStreamApiException;
/**
* Put as the value for a key iff the key does not exist (there is no history)
* or is deleted (history shows the key is deleted)
* @param key the key
* @param value the bytes of the value
* @return the revision number for the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
long create(String key, byte[] value) throws IOException, JetStreamApiException;
/**
* Put as the value for a key iff the key exists and its last revision matches the expected
* @param key the key
* @param value the bytes of the value
* @param expectedRevision the expected last revision
* @return the revision number for the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException;
async def put(self, key: str, value: bytes) -> int:
"""
put will place the new value for the key into the store
and return the revision number.
"""
async def update(self, key: str, value: bytes, last: int) -> int:
"""
update will update the value iff the latest revision matches.
"""
// dotnet add package NATS.Net
// Put a value into the bucket using the key
// returns revision number
ValueTask<ulong> PutAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);
//
NATS_EXTERN natsStatus kvStore_Put (uint64_t *rev, kvStore *kv, const char *key, const void *data, int len)
Places the new value for the key into the store.
NATS_EXTERN natsStatus kvStore_PutString (uint64_t *rev, kvStore *kv, const char *key, const char *data)
Places the new value (as a string) for the key into the store.
NATS_EXTERN natsStatus kvStore_Create (uint64_t *rev, kvStore *kv, const char *key, const void *data, int len)
Places the value for the key into the store if and only if the key does not exist.
NATS_EXTERN natsStatus kvStore_CreateString (uint64_t *rev, kvStore *kv, const char *key, const char *data)
Places the value (as a string) for the key into the store if and only if the key does not exist.
NATS_EXTERN natsStatus kvStore_Update (uint64_t *rev, kvStore *kv, const char *key, const void *data, int len, uint64_t last)
Updates the value for the key into the store if and only if the latest revision matches.
NATS_EXTERN natsStatus kvStore_UpdateString (uint64_t *rev, kvStore *kv, const char *key, const char *data, uint64_t last)
Updates the value (as a string) for the key into the store if and only if the latest revision matches.
// Delete will place a delete marker and leave all revisions.
Delete(key string) error
// Purge will place a delete marker and remove all previous revisions.
Purge(key string) error
/**
* Soft deletes the key by placing a delete marker.
* @param key the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
void delete(String key) throws IOException, JetStreamApiException;
/**
* Purge all values/history from the specific key
* @param key the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
void purge(String key) throws IOException, JetStreamApiException;
async def delete(self, key: str) -> bool:
"""
delete will place a delete marker and remove all previous revisions.
"""
async def purge(self, key: str) -> bool:
"""
purge will remove the key and all revisions.
"""
// dotnet add package NATS.Net
// Delete an entry from the bucket
ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default);
// Purge an entry from the bucket
ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default);
//
NATS_EXTERN natsStatus kvStore_Delete (kvStore *kv, const char *key)
Deletes a key by placing a delete marker and leaving all revisions.
NATS_EXTERN natsStatus kvStore_Purge (kvStore *kv, const char *key, kvPurgeOptions *opts)
Deletes a key by placing a purge marker and removing all revisions.
NATS_EXTERN natsStatus kvStore_PurgeDeletes (kvStore *kv, kvPurgeOptions *opts)
Purge and removes delete markers.
// Keys will return all keys.
Keys(opts ...WatchOpt) ([]string, error)
/**
* Get a list of the keys in a bucket.
* @return List of keys
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws InterruptedException if the thread is interrupted
*/
List<String> keys() throws IOException, JetStreamApiException, InterruptedException;
// dotnet add package NATS.Net
// Get all the keys in the bucket
IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
// Get a filtered set of keys in the bucket
IAsyncEnumerable<string> GetKeysAsync(IEnumerable<string> filters, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
//
NATS_EXTERN natsStatus kvStore_Keys (kvKeysList *list, kvStore *kv, kvWatchOptions *opts)
Returns all keys in the bucket.
NATS_EXTERN void kvKeysList_Destroy (kvKeysList *list)
Destroys this list of KeyValue store key strings.
// History will return all historical values for the key.
History(key string, opts ...WatchOpt) ([]KeyValueEntry, error)
/**
* Get the history (list of KeyValueEntry) for a key
* @param key the key
* @return List of KvEntry
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws InterruptedException if the thread is interrupted
*/
List<KeyValueEntry> history(String key) throws IOException, JetStreamApiException, InterruptedException;
// dotnet add package NATS.Net
// Get the history of an entry by key
IAsyncEnumerable<NatsKVEntry<T>> HistoryAsync<T>(string key, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
//
NATS_EXTERN natsStatus kvStore_History (kvEntryList *list, kvStore *kv, const char *key, kvWatchOptions *opts)
Returns all historical entries for the key.
NATS_EXTERN void kvEntryList_Destroy (kvEntryList *list)
Destroys this list of KeyValue store entries.
// Watch for any updates to keys that match the keys argument which could include wildcards.
// Watch will send a nil entry when it has received all initial values.
Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
// WatchAll will invoke the callback for all updates.
WatchAll(opts ...WatchOpt) (KeyWatcher, error)
/**
* Watch updates for a specific key
*/
NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException;
/**
* Watch updates for all keys
*/
NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException;
// dotnet add package NATS.Net
// Start a watcher for specific keys
// Key to watch is subject-based and wildcards may be used
IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(string key, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
// Start a watcher for specific keys
// Key to watch are subject-based and wildcards may be used
IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(IEnumerable<string> keys, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
// Start a watcher for all the keys in the bucket
IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
//
NATS_EXTERN natsStatus kvStore_Watch (kvWatcher **new_watcher, kvStore *kv, const char *keys, kvWatchOptions *opts)
Returns a watcher for any updates to keys that match the keys argument.
NATS_EXTERN natsStatus kvStore_WatchAll (kvWatcher **new_watcher, kvStore *kv, kvWatchOptions *opts)
Returns a watcher for any updates to any keys of the KeyValue store bucket.
Consumer Details
Consumers are how client applications get the messages stored in the streams. You can have many consumers on a single stream. Consumers are like a view on a stream, can filter messages and have some state (maintained by the servers) associated with them.
Consumers can be 'durable' or 'ephemeral'.
Durable versus ephemeral consumers
Durable consumer persist message delivery progress on the server side. A durable consumer can be retrieved by name and shared between client instance for load balancing. It can be made highly available through replicas.
An ephemeral consumer does not persist delivery progress and will automatically be deleted when there are no more client instances connected.
Durable consumers
Durable consumers are meant to be used by multiple instances of an application, either to distribute and scale out the processing, or to persist the position of the consumer over the stream between runs of an application.
Durable consumers as the name implies are meant to last 'forever' and are typically created and deleted administratively rather than by the application code which only needs to specify the durable's well known name to use it.
You create a durable consumer using the nats consumer add CLI tool command, or programmatically by passing a durable name option to the subscription creation call.
Ephemeral consumers
Ephemeral consumers are meant to be used by a single instance of an application (e.g. to get its own replay of the messages in the stream).
Ephemeral consumers are not meant to last 'forever', they are defined automatically at subscription time by the client library and disappear after the application disconnect.
You (automatically) create an ephemeral consumer when you call the js.Subscribe function without specifying the Durable or Bind subscription options. Calling Drain on that subscription automatically deletes the underlying ephemeral consumer. You can also explicitly create an ephemeral consumer by not passing a durable name option to the jsm.AddConsumer call.
Ephemeral consumers otherwise have the same control over message acknowledged and re-delivery as durable consumers.
Push and Pull consumers
Clients implement two implementations of consumers identified as 'push' or 'pull'.
Push consumers
Push consumers receive messages on a specific subject where message flow is controlled by the server. Load balancing is supported through NATS core queue groups. The messages from the stream are distributed automatically between the subscribing clients to the push consumers.
Pull consumers
Pull consumers request messages explicitly from the server in batches, giving the client full control over dispatching, flow control, pending (unacknowledged) messages and load balancing. Pull consuming client make fetch() calls in a dispatch loop.
We recommend using pull consumers for new projects. In particular when scalability, detailed flow control or error handling are a design focus. Most client API have been updated to provide convenient interfaces for consuming messages through callback handler or iterators without the need to manage message retrieval.
fetch() calls can be immediate or have a defined timeout, allowing for either controlled (1 by 1) consumption or realtime delivery with minimal polling overhead.
Pull consumers create less CPU load on the NATS servers and therefore scale better (note that the push consumers are still quite fast and scalable, you may only notice the difference between the two if you have sustained high message rates).
Pull
A push consumer can also be used in some other use cases such as without a queue group, or with no acknowledgement or cumulative acknowledgements.
Push
Ordered Consumers
Ordered consumers are a convenient form of ephemeral push consumer for applications, that want to efficiently consume a stream for data inspection or analysis.
The API consumer is guaranteed delivery of messages in sequence and without gaps.
Always ephemeral - minimal overhead for the server
Single threaded in sequence dispatching
Client checks message sequence and will prevent gaps in the delivery
Delivery reliability
JetStream consumers can ensure not just the reliability of message delivery but also the reliability of the processing of the messages, even in the face of client application or downstream failures. It does so by using message level acknowledgements and message re-deliveries.
Consumers have an specifying the level of reliability required. In increasing order of reliability the available policies are: 'none' for no application level acknowledgements, 'all' where acknowledging a specific message also implicitly acknowledges all previous messages in the stream, and 'explicit' where each message must be individually acknowledged.
When the consumer is set to require explicit acknowledgements the client applications are able to use more than one kind of to indicate successful (or not) reception and processing of the messages being received from the consumer.
Applications can:
Acknowledge the successfull processing of a message (Ack()).
Acknowledge the successfull processing of a message and request an acknowledgement of the reception of the acknowledgement by the consumer (AckSync()).
Indicate that the processing is still in progress and more time is needed (
After a message is sent from the consumer to a subscribing client application by the server an 'AckWait' timer is started. This timer is deleted when either a positive (Ack()) or a termination (Term()) acknowledgement is received from the client application. The timer gets reset upon reception of an in-progress (inProgress()) acknowledgement.
If at the end of a period of time no acknowledgement has been received from the client application, the server will attempt to re-deliver the message. If there is more than one client application instance subscribing to the consumer, there is no guarantee that the re-delivery would be to any particular client instance.
You can control the timing of re-deliveries using either the single AckWait duration attribute of the consumer, or as a sequence of durations in the BackOff attribute (which overrides AckWait).
You can also control the timing of re-deliveries when messages are negatively acknowledged with Nak(), by passing a nakDelay() option (or using NakWithDelay()), otherwise the re-delivery attempt will happen right after the reception of the Nak by the server.
"Dead Letter Queues" type functionality
You can set a maximum number of delivery attempts using the consumer's MaxDeliver setting.
Whenever a message reaches its maximum number of delivery attempts an advisory message is published on the $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.<STREAM>.<CONSUMER> subject. The advisory message's payload (use nats schema info io.nats.jetstream.advisory.v1.max_deliver for specific information) contains a stream_seq field that contains the sequence number of the message in the stream.
Similarly, whenever a client application terminates delivery attempts for the message using AckTerm an advisory message is published on the $JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED.<STREAM>.<CONSUMER> subject, and its payload (see nats schema info io.nats.jetstream.advisory.v1.terminated) contains a stream_seq field.
You can leverage those advisory messages to implement "Dead Letter Queue" (DLQ) types of functionalities. For example:
If you only need to know about each time a message is 'dead' (considered un-re-deliverable by the consumer), then listening to the advisories is enough.
If you also need to have access to the message in question then you can use the message's sequence number included in the advisory to retrieve that specific message by sequence number from the stream. If a message reaches its maximum level of delivery attempts, it will still stay in the stream until it is manually deleted or manually acknowledged.
Can recover from server node failure and reconnect
Does not recover from client failure as it is ephemeral
inProgress()
).
Negatively acknowledge a message, indicating that the client application is currently (temporarily) unable to process the message and that the consumer should attempt to re-deliver it (Nak()).
Terminate a message (typically, because there is a problem with the data inside the message such that the client application is never going to be able to process it), indicating that the consumer should not attempt to re-deliver the message (Term()).
func ExampleJetStream() {
nc, err := nats.Connect("localhost")
if err != nil {
log.Fatal(err)
}
// Use the JetStream context to produce and consumer messages
// that have been persisted.
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
log.Fatal(err)
}
js.AddStream(&nats.StreamConfig{
Name: "FOO",
Subjects: []string{"foo"},
})
js.Publish("foo", []byte("Hello JS!"))
// Publish messages asynchronously.
for i := 0; i < 500; i++ {
js.PublishAsync("foo", []byte("Hello JS Async!"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
fmt.Println("Did not resolve in time")
}
// Create Pull based consumer with maximum 128 inflight.
sub, _ := js.PullSubscribe("foo", "wq", nats.PullMaxWaiting(128))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return
default:
}
// Fetch will return as soon as any message is available rather than wait until the full batch size is available, using a batch size of more than 1 allows for higher throughput when needed.
msgs, _ := sub.Fetch(10, nats.Context(ctx))
for _, msg := range msgs {
msg.Ack()
}
}
}
package io.nats.examples.jetstream.simple;
import io.nats.client.*;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.examples.jetstream.ResilientPublisher;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static io.nats.examples.jetstream.NatsJsUtils.createOrReplaceStream;
/**
* This example will demonstrate simplified consume with a handler
*/
public class MessageConsumerExample {
private static final String STREAM = "consume-stream";
private static final String SUBJECT = "consume-subject";
private static final String CONSUMER_NAME = "consume-consumer";
private static final String MESSAGE_PREFIX = "consume";
private static final int STOP_COUNT = 500;
private static final int REPORT_EVERY = 100;
private static final String SERVER = "nats://localhost:4222";
public static void main(String[] args) {
Options options = Options.builder().server(SERVER).build();
try (Connection nc = Nats.connect(options)) {
JetStreamManagement jsm = nc.jetStreamManagement();
createOrReplaceStream(jsm, STREAM, SUBJECT);
//Utility for filling the stream with some messages
System.out.println("Starting publish...");
ResilientPublisher publisher = new ResilientPublisher(nc, jsm, STREAM, SUBJECT).basicDataPrefix(MESSAGE_PREFIX).jitter(10);
Thread pubThread = new Thread(publisher);
pubThread.start();
// get stream context, create consumer and get the consumer context
StreamContext streamContext;
ConsumerContext consumerContext;
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger atomicCount = new AtomicInteger();
long start = System.nanoTime();
streamContext = nc.getStreamContext(STREAM);
streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
consumerContext = streamContext.getConsumerContext(CONSUMER_NAME);
MessageHandler handler = msg -> {
msg.ack();
int count = atomicCount.incrementAndGet();
if (count % REPORT_EVERY == 0) {
System.out.println("Handler" + ": Received " + count + " messages in " + (System.nanoTime() - start) / 1_000_000 + "ms.");
}
if (count == STOP_COUNT) {
latch.countDown();
}
};
// create the consumer and install handler
MessageConsumer consumer = consumerContext.consume(handler);
//Waiting for the handler signalling us to stop
latch.await();
// When stop is called, no more pull requests will be made, but messages already requested
// will still come across the wire to the client.
System.out.println("Stopping the consumer...");
consumer.stop();
// wait until the consumer is finished processing backlog
while (!consumer.isFinished()) {
Thread.sleep(10);
}
System.out.println("Final" + ": Received " + atomicCount.get() + " messages in " + (System.nanoTime() - start) / 1_000_000 + "ms.");
publisher.stop(); // otherwise the ConsumerContext background thread will complain when the connection goes away
pubThread.join();
}
catch (JetStreamApiException | IOException e) {
// JetStreamApiException:
// 1. the stream or consumer did not exist
// 2. api calls under the covers theoretically this could fail, but practically it won't.
// IOException:
// likely a connection problem
System.err.println("Exception should not handled, exiting.");
System.exit(-1);
}
catch (Exception e) {
System.err.println("Exception should not handled, exiting.");
System.exit(-1);
}
}
}
import { AckPolicy, connect, nanos } from "../../src/mod.ts";
import { nuid } from "../../nats-base-client/nuid.ts";
const nc = await connect();
const stream = nuid.next();
const subj = nuid.next();
const durable = nuid.next();
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: stream, subjects: [subj] });
const js = nc.jetstream();
await js.publish(subj);
await js.publish(subj);
await js.publish(subj);
await js.publish(subj);
const psub = await js.pullSubscribe(subj, {
mack: true,
// artificially low ack_wait, to show some messages
// not getting acked being redelivered
config: {
durable_name: durable,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(4000),
},
});
(async () => {
for await (const m of psub) {
console.log(
`[${m.seq}] ${
m.redelivered ? `- redelivery ${m.info.redeliveryCount}` : ""
}`
);
if (m.seq % 2 === 0) {
m.ack();
}
}
})();
const fn = () => {
console.log("[PULL]");
psub.pull({ batch: 1000, expires: 10000 });
};
// do the initial pull
fn();
// and now schedule a pull every so often
const interval = setInterval(fn, 10000); // and repeat every 2s
setTimeout(() => {
clearInterval(interval);
nc.drain();
}, 20000);
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo's subject.
await js.add_stream(name="sample-stream", subjects=["foo"])
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print(ack)
# Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("foo", "psub")
# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch(1)
for msg in msgs:
print(msg)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
await using var client = new NatsClient();
INatsJSContext js = client.CreateJetStreamContext();
// Create a stream
var streamConfig = new StreamConfig(name: "FOO", subjects: ["foo"]);
await js.CreateStreamAsync(streamConfig);
// Publish a message
{
PubAckResponse ack = await js.PublishAsync("foo", "Hello, JetStream!");
ack.EnsureSuccess();
}
// Publish messages concurrently
List<NatsJSPublishConcurrentFuture> futures = new();
for (var i = 0; i < 500; i++)
{
NatsJSPublishConcurrentFuture future
= await js.PublishConcurrentAsync("foo", "Hello, JetStream 1!");
futures.Add(future);
}
foreach (var future in futures)
{
await using (future)
{
PubAckResponse ack = await future.GetResponseAsync();
ack.EnsureSuccess();
}
}
// Create a consumer with a maximum 128 inflight messages
INatsJSConsumer consumer = await js.CreateConsumerAsync("FOO", new ConsumerConfig(name: "foo")
{
MaxWaiting = 128,
});
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));
while (cts.IsCancellationRequested == false)
{
var opts = new NatsJSFetchOpts { MaxMsgs = 10 };
await foreach (NatsJSMsg<string> msg in consumer.FetchAsync<string>(opts, cancellationToken: cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
}
}
#include "examples.h"
static const char *usage = ""\
"-gd use global message delivery thread pool\n" \
"-sync receive synchronously (default is asynchronous)\n" \
"-pull use pull subscription\n" \
"-fc enable flow control\n" \
"-count number of expected messages\n";
static void
onMsg(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
if (print)
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
if (start == 0)
start = nats_Now();
// We should be using a mutex to protect those variables since
// they are used from the subscription's delivery and the main
// threads. For demo purposes, this is fine.
if (++count == total)
elapsed = nats_Now() - start;
// Since this is auto-ack callback, we don't need to ack here.
natsMsg_Destroy(msg);
}
static void
asyncCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure)
{
printf("Async error: %u - %s\n", err, natsStatus_GetText(err));
natsSubscription_GetDropped(sub, (int64_t*) &dropped);
}
int main(int argc, char **argv)
{
natsConnection *conn = NULL;
natsStatistics *stats = NULL;
natsOptions *opts = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
jsCtx *js = NULL;
jsErrCode jerr = 0;
jsOptions jsOpts;
jsSubOptions so;
natsStatus s;
bool delStream = false;
opts = parseArgs(argc, argv, usage);
printf("Created %s subscription on '%s'.\n",
(pull ? "pull" : (async ? "asynchronous" : "synchronous")), subj);
s = natsOptions_SetErrorHandler(opts, asyncCb, NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
if (s == NATS_OK)
s = jsOptions_Init(&jsOpts);
if (s == NATS_OK)
s = jsSubOptions_Init(&so);
if (s == NATS_OK)
{
so.Stream = stream;
so.Consumer = durable;
if (flowctrl)
{
so.Config.FlowControl = true;
so.Config.Heartbeat = (int64_t)1E9;
}
}
if (s == NATS_OK)
s = natsConnection_JetStream(&js, conn, &jsOpts);
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
// First check if the stream already exists.
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_NOT_FOUND)
{
jsStreamConfig cfg;
// Since we are the one creating this stream, we can delete at the end.
delStream = true;
// Initialize the configuration structure.
jsStreamConfig_Init(&cfg);
cfg.Name = stream;
// Set the subject
cfg.Subjects = (const char*[1]){subj};
cfg.SubjectsLen = 1;
// Make it a memory stream.
cfg.Storage = js_MemoryStorage;
// Add the stream,
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
}
if (s == NATS_OK)
{
printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
// Need to destroy the returned stream object.
jsStreamInfo_Destroy(si);
}
}
if (s == NATS_OK)
{
if (pull)
s = js_PullSubscribe(&sub, js, subj, durable, &jsOpts, &so, &jerr);
else if (async)
s = js_Subscribe(&sub, js, subj, onMsg, NULL, &jsOpts, &so, &jerr);
else
s = js_SubscribeSync(&sub, js, subj, &jsOpts, &so, &jerr);
}
if (s == NATS_OK)
s = natsSubscription_SetPendingLimits(sub, -1, -1);
if (s == NATS_OK)
s = natsStatistics_Create(&stats);
if ((s == NATS_OK) && pull)
{
natsMsgList list;
int i;
for (count = 0; (s == NATS_OK) && (count < total); )
{
s = natsSubscription_Fetch(&list, sub, 1024, 5000, &jerr);
if (s != NATS_OK)
break;
if (start == 0)
start = nats_Now();
count += (int64_t) list.Count;
for (i=0; (s == NATS_OK) && (i<list.Count); i++)
s = natsMsg_Ack(list.Msgs[i], &jsOpts);
natsMsgList_Destroy(&list);
}
}
else if ((s == NATS_OK) && async)
{
while (s == NATS_OK)
{
if (count + dropped == total)
break;
nats_Sleep(1000);
}
}
else if (s == NATS_OK)
{
for (count = 0; (s == NATS_OK) && (count < total); count++)
{
s = natsSubscription_NextMsg(&msg, sub, 5000);
if (s != NATS_OK)
break;
if (start == 0)
start = nats_Now();
s = natsMsg_Ack(msg, &jsOpts);
natsMsg_Destroy(msg);
}
}
if (s == NATS_OK)
{
printStats(STATS_IN|STATS_COUNT, conn, sub, stats);
printPerf("Received");
}
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
// Let's report some stats after the run
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_OK)
{
printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
jsStreamInfo_Destroy(si);
}
if (delStream)
{
printf("\nDeleting stream %s: ", stream);
s = js_DeleteStream(js, stream, NULL, &jerr);
if (s == NATS_OK)
printf("OK!");
printf("\n");
}
}
else
{
printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
nats_PrintLastErrorStack(stderr);
}
// Destroy all our objects to avoid report of memory leak
jsCtx_Destroy(js);
natsStatistics_Destroy(stats);
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);
// To silence reports of memory still in used with valgrind
nats_Close();
return 0;
}
func ExampleJetStream() {
nc, err := nats.Connect("localhost")
if err != nil {
log.Fatal(err)
}
// Use the JetStream context to produce and consumer messages
// that have been persisted.
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
log.Fatal(err)
}
js.AddStream(&nats.StreamConfig{
Name: "FOO",
Subjects: []string{"foo"},
})
js.Publish("foo", []byte("Hello JS!"))
// Publish messages asynchronously.
for i := 0; i < 500; i++ {
js.PublishAsync("foo", []byte("Hello JS Async!"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
fmt.Println("Did not resolve in time")
}
// Create async consumer on subject 'foo'. Async subscribers
// ack a message once exiting the callback.
js.Subscribe("foo", func(msg *nats.Msg) {
meta, _ := msg.Metadata()
fmt.Printf("Stream Sequence : %v\n", meta.Sequence.Stream)
fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer)
})
// Async subscriber with manual acks.
js.Subscribe("foo", func(msg *nats.Msg) {
msg.Ack()
}, nats.ManualAck())
// Async queue subscription where members load balance the
// received messages together.
// If no consumer name is specified, either with nats.Bind()
// or nats.Durable() options, the queue name is used as the
// durable name (that is, as if you were passing the
// nats.Durable(<queue group name>) option.
// It is recommended to use nats.Bind() or nats.Durable()
// and preferably create the JetStream consumer beforehand
// (using js.AddConsumer) so that the JS consumer is not
// deleted on an Unsubscribe() or Drain() when the member
// that created the consumer goes away first.
// Check Godoc for the QueueSubscribe() API for more details.
js.QueueSubscribe("foo", "group", func(msg *nats.Msg) {
msg.Ack()
}, nats.ManualAck())
// Subscriber to consume messages synchronously.
sub, _ := js.SubscribeSync("foo")
msg, _ := sub.NextMsg(2 * time.Second)
msg.Ack()
// We can add a member to the group, with this member using
// the synchronous version of the QueueSubscribe.
sub, _ = js.QueueSubscribeSync("foo", "group")
msg, _ = sub.NextMsg(2 * time.Second)
msg.Ack()
// ChanSubscribe
msgCh := make(chan *nats.Msg, 8192)
sub, _ = js.ChanSubscribe("foo", msgCh)
select {
case msg := <-msgCh:
fmt.Println("[Received]", msg)
case <-time.After(1 * time.Second):
}
}
package io.nats.examples.jetstream;
import io.nats.client.*;
import io.nats.client.api.PublishAck;
import io.nats.examples.ExampleArgs;
import io.nats.examples.ExampleUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static io.nats.examples.jetstream.NatsJsUtils.createStreamExitWhenExists;
/**
* This example will demonstrate JetStream push subscribing using a durable consumer and a queue
*/
public class NatsJsPushSubQueueDurable {
static final String usageString =
"\nUsage: java -cp <classpath> NatsJsPushSubQueueDurable [-s server] [-strm stream] [-sub subject] [-q queue] [-dur durable] [-mcnt msgCount] [-scnt subCount]"
+ "\n\nDefault Values:"
+ "\n [-strm stream] qdur-stream"
+ "\n [-sub subject] qdur-subject"
+ "\n [-q queue] qdur-queue"
+ "\n [-dur durable] qdur-durable"
+ "\n [-mcnt msgCount] 100"
+ "\n [-scnt subCount] 5"
+ "\n\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n"
+ "\nSet the environment variable NATS_NKEY to use challenge response authentication by setting a file containing your private key.\n"
+ "\nSet the environment variable NATS_CREDS to use JWT/NKey authentication by setting a file containing your user creds.\n"
+ "\nUse the URL in the -s server parameter for user/pass/token authentication.\n";
public static void main(String[] args) {
ExampleArgs exArgs = ExampleArgs.builder("Push Subscribe, Durable Consumer, Queue", args, usageString)
.defaultStream("qdur-stream")
.defaultSubject("qdur-subject")
.defaultQueue("qdur-queue")
.defaultDurable("qdur-durable")
.defaultMsgCount(100)
.defaultSubCount(5)
.build();
try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions(exArgs.server, true))) {
// Create a JetStreamManagement context.
JetStreamManagement jsm = nc.jetStreamManagement();
// Use the utility to create a stream stored in memory.
createStreamExitWhenExists(jsm, exArgs.stream, exArgs.subject);
// Create our JetStream context
JetStream js = nc.jetStream();
System.out.println();
// Setup the subscribers
// - the PushSubscribeOptions can be re-used since all the subscribers are the same
// - use a concurrent integer to track all the messages received
// - have a list of subscribers and threads so I can track them
PushSubscribeOptions pso = PushSubscribeOptions.builder().durable(exArgs.durable).build();
AtomicInteger allReceived = new AtomicInteger();
List<JsQueueSubscriber> subscribers = new ArrayList<>();
List<Thread> subThreads = new ArrayList<>();
for (int id = 1; id <= exArgs.subCount; id++) {
// setup the subscription
JetStreamSubscription sub = js.subscribe(exArgs.subject, exArgs.queue, pso);
// create and track the runnable
JsQueueSubscriber qs = new JsQueueSubscriber(id, exArgs, js, sub, allReceived);
subscribers.add(qs);
// create, track and start the thread
Thread t = new Thread(qs);
subThreads.add(t);
t.start();
}
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server
// create and start the publishing
Thread pubThread = new Thread(new JsPublisher(js, exArgs));
pubThread.start();
// wait for all threads to finish
pubThread.join();
for (Thread t : subThreads) {
t.join();
}
// report
for (JsQueueSubscriber qs : subscribers) {
qs.report();
}
System.out.println();
// delete the stream since we are done with it.
jsm.deleteStream(exArgs.stream);
}
catch (Exception e) {
e.printStackTrace();
}
}
static class JsPublisher implements Runnable {
JetStream js;
ExampleArgs exArgs;
public JsPublisher(JetStream js, ExampleArgs exArgs) {
this.js = js;
this.exArgs = exArgs;
}
@Override
public void run() {
for (int x = 1; x <= exArgs.msgCount; x++) {
try {
PublishAck pa = js.publish(exArgs.subject, ("Data # " + x).getBytes(StandardCharsets.US_ASCII));
} catch (IOException | JetStreamApiException e) {
// something pretty wrong here
e.printStackTrace();
System.exit(-1);
}
}
}
}
static class JsQueueSubscriber implements Runnable {
int id;
int thisReceived;
List<String> datas;
ExampleArgs exArgs;
JetStream js;
JetStreamSubscription sub;
AtomicInteger allReceived;
public JsQueueSubscriber(int id, ExampleArgs exArgs, JetStream js, JetStreamSubscription sub, AtomicInteger allReceived) {
this.id = id;
thisReceived = 0;
datas = new ArrayList<>();
this.exArgs = exArgs;
this.js = js;
this.sub = sub;
this.allReceived = allReceived;
}
public void report() {
System.out.printf("Sub # %d handled %d messages.\n", id, thisReceived);
}
@Override
public void run() {
while (allReceived.get() < exArgs.msgCount) {
try {
Message msg = sub.nextMessage(Duration.ofMillis(500));
while (msg != null) {
thisReceived++;
allReceived.incrementAndGet();
String data = new String(msg.getData(), StandardCharsets.US_ASCII);
datas.add(data);
System.out.printf("QS # %d message # %d %s\n", id, thisReceived, data);
msg.ack();
msg = sub.nextMessage(Duration.ofMillis(500));
}
} catch (InterruptedException e) {
// just try again
}
}
System.out.printf("QS # %d completed.\n", id);
}
}
}
import { AckPolicy, connect } from "../../src/mod.ts";
import { nuid } from "../../nats-base-client/nuid.ts";
const nc = await connect();
// create a regular subscription - this is plain nats
const sub = nc.subscribe("my.messages", { max: 5 });
const done = (async () => {
for await (const m of sub) {
console.log(m.subject);
m.respond();
}
})();
const jsm = await nc.jetstreamManager();
const stream = nuid.next();
const subj = nuid.next();
await jsm.streams.add({ name: stream, subjects: [`${subj}.>`] });
// create a consumer that delivers to the subscription
await jsm.consumers.add(stream, {
ack_policy: AckPolicy.Explicit,
deliver_subject: "my.messages",
});
// publish some old nats messages
nc.publish(`${subj}.A`);
nc.publish(`${subj}.B`);
nc.publish(`${subj}.C`);
nc.publish(`${subj}.D.A`);
nc.publish(`${subj}.F.A.B`);
await done;
await nc.close();
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo's subject.
await js.add_stream(name="sample-stream", subjects=["foo"])
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print(ack)
# Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("foo", "psub")
# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch(1)
for msg in msgs:
print(msg)
# Create single push based subscriber that is durable across restarts.
sub = await js.subscribe("foo", durable="myapp")
msg = await sub.next_msg()
await msg.ack()
# Create deliver group that will be have load balanced messages.
async def qsub_a(msg):
print("QSUB A:", msg)
await msg.ack()
async def qsub_b(msg):
print("QSUB B:", msg)
await msg.ack()
await js.subscribe("foo", "workers", cb=qsub_a)
await js.subscribe("foo", "workers", cb=qsub_b)
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print("\t", ack)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
// NATS .NET doesn't publicly support push consumers and treats all consumers
// as just consumers. The mecahnics of the consuming messages are abstracted
// away from the applications and are handled by the library.
#include "examples.h"
static const char *usage = ""\
"-gd use global message delivery thread pool\n" \
"-sync receive synchronously (default is asynchronous)\n" \
"-pull use pull subscription\n" \
"-fc enable flow control\n" \
"-count number of expected messages\n";
static void
onMsg(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
if (print)
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
if (start == 0)
start = nats_Now();
// We should be using a mutex to protect those variables since
// they are used from the subscription's delivery and the main
// threads. For demo purposes, this is fine.
if (++count == total)
elapsed = nats_Now() - start;
// Since this is auto-ack callback, we don't need to ack here.
natsMsg_Destroy(msg);
}
static void
asyncCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure)
{
printf("Async error: %u - %s\n", err, natsStatus_GetText(err));
natsSubscription_GetDropped(sub, (int64_t*) &dropped);
}
int main(int argc, char **argv)
{
natsConnection *conn = NULL;
natsStatistics *stats = NULL;
natsOptions *opts = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
jsCtx *js = NULL;
jsErrCode jerr = 0;
jsOptions jsOpts;
jsSubOptions so;
natsStatus s;
bool delStream = false;
opts = parseArgs(argc, argv, usage);
printf("Created %s subscription on '%s'.\n",
(pull ? "pull" : (async ? "asynchronous" : "synchronous")), subj);
s = natsOptions_SetErrorHandler(opts, asyncCb, NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
if (s == NATS_OK)
s = jsOptions_Init(&jsOpts);
if (s == NATS_OK)
s = jsSubOptions_Init(&so);
if (s == NATS_OK)
{
so.Stream = stream;
so.Consumer = durable;
if (flowctrl)
{
so.Config.FlowControl = true;
so.Config.Heartbeat = (int64_t)1E9;
}
}
if (s == NATS_OK)
s = natsConnection_JetStream(&js, conn, &jsOpts);
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
// First check if the stream already exists.
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_NOT_FOUND)
{
jsStreamConfig cfg;
// Since we are the one creating this stream, we can delete at the end.
delStream = true;
// Initialize the configuration structure.
jsStreamConfig_Init(&cfg);
cfg.Name = stream;
// Set the subject
cfg.Subjects = (const char*[1]){subj};
cfg.SubjectsLen = 1;
// Make it a memory stream.
cfg.Storage = js_MemoryStorage;
// Add the stream,
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
}
if (s == NATS_OK)
{
printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
// Need to destroy the returned stream object.
jsStreamInfo_Destroy(si);
}
}
if (s == NATS_OK)
{
if (pull)
s = js_PullSubscribe(&sub, js, subj, durable, &jsOpts, &so, &jerr);
else if (async)
s = js_Subscribe(&sub, js, subj, onMsg, NULL, &jsOpts, &so, &jerr);
else
s = js_SubscribeSync(&sub, js, subj, &jsOpts, &so, &jerr);
}
if (s == NATS_OK)
s = natsSubscription_SetPendingLimits(sub, -1, -1);
if (s == NATS_OK)
s = natsStatistics_Create(&stats);
if ((s == NATS_OK) && pull)
{
natsMsgList list;
int i;
for (count = 0; (s == NATS_OK) && (count < total); )
{
s = natsSubscription_Fetch(&list, sub, 1024, 5000, &jerr);
if (s != NATS_OK)
break;
if (start == 0)
start = nats_Now();
count += (int64_t) list.Count;
for (i=0; (s == NATS_OK) && (i<list.Count); i++)
s = natsMsg_Ack(list.Msgs[i], &jsOpts);
natsMsgList_Destroy(&list);
}
}
else if ((s == NATS_OK) && async)
{
while (s == NATS_OK)
{
if (count + dropped == total)
break;
nats_Sleep(1000);
}
}
else if (s == NATS_OK)
{
for (count = 0; (s == NATS_OK) && (count < total); count++)
{
s = natsSubscription_NextMsg(&msg, sub, 5000);
if (s != NATS_OK)
break;
if (start == 0)
start = nats_Now();
s = natsMsg_Ack(msg, &jsOpts);
natsMsg_Destroy(msg);
}
}
if (s == NATS_OK)
{
printStats(STATS_IN|STATS_COUNT, conn, sub, stats);
printPerf("Received");
}
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
// Let's report some stats after the run
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_OK)
{
printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
jsStreamInfo_Destroy(si);
}
if (delStream)
{
printf("\nDeleting stream %s: ", stream);
s = js_DeleteStream(js, stream, NULL, &jerr);
if (s == NATS_OK)
printf("OK!");
printf("\n");
}
}
else
{
printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
nats_PrintLastErrorStack(stderr);
}
// Destroy all our objects to avoid report of memory leak
jsCtx_Destroy(js);
natsStatistics_Destroy(stats);
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);
// To silence reports of memory still in used with valgrind
nats_Close();.
return 0;
}
func ExampleJetStream() {
nc, err := nats.Connect("localhost")
if err != nil {
log.Fatal(err)
}
// Use the JetStream context to produce and consumer messages
// that have been persisted.
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
log.Fatal(err)
}
js.AddStream(&nats.StreamConfig{
Name: "FOO",
Subjects: []string{"foo"},
})
js.Publish("foo", []byte("Hello JS!"))
// ordered push consumer
js.Subscribe("foo", func(msg *nats.Msg) {
meta, _ := msg.Metadata()
fmt.Printf("Stream Sequence : %v\n", meta.Sequence.Stream)
fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer)
}, nats.OrderedConsumer())
}
package io.nats.examples.jetstream;
import io.nats.client.*;
import io.nats.client.api.PublishAck;
import io.nats.client.impl.NatsMessage;
import io.nats.examples.ExampleArgs;
import io.nats.examples.ExampleUtils;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.TemporalUnit;
public class myExample {
public static void main(String[] args) {
final String subject = "foo";
try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions("localhost"))) {
// Create a JetStream context. This hangs off the original connection
// allowing us to produce data to streams and consume data from
// JetStream consumers.
JetStream js = nc.jetStream();
// This example assumes there is a stream already created on subject "foo" and some messages already stored in that stream
// create our message handler.
MessageHandler handler = msg -> {
System.out.println("\nMessage Received:");
if (msg.hasHeaders()) {
System.out.println(" Headers:");
for (String key : msg.getHeaders().keySet()) {
for (String value : msg.getHeaders().get(key)) {
System.out.printf(" %s: %s\n", key, value);
}
}
}
System.out.printf(" Subject: %s\n Data: %s\n",
msg.getSubject(), new String(msg.getData(), StandardCharsets.UTF_8));
System.out.println(" " + msg.metaData());
};
Dispatcher dispatcher = nc.createDispatcher();
PushSubscribeOptions pso = PushSubscribeOptions.builder().ordered(true).build();
JetStreamSubscription sub = js.subscribe(subject, dispatcher, handler, false, pso);
Thread.sleep(100);
sub.drain(Duration.ofMillis(100));
nc.drain(Duration.ofMillis(100));
}
catch(Exception e)
{
e.printStackTrace();
}
}
}
import { connect, consumerOpts } from "../../src/mod.ts";
const nc = await connect();
const js = nc.jetstream();
// note the consumer is not a durable - so when after the
// subscription ends, the server will auto destroy the
// consumer
const opts = consumerOpts();
opts.manualAck();
opts.maxMessages(2);
opts.deliverTo("xxx");
const sub = await js.subscribe("a.>", opts);
await (async () => {
for await (const m of sub) {
console.log(m.seq, m.subject);
m.ack();
}
})();
await nc.close();
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Create ordered consumer with flow control and heartbeats
# that auto resumes on failures.
osub = await js.subscribe("foo", ordered_consumer=True)
data = bytearray()
while True:
try:
msg = await osub.next_msg()
data.extend(msg.data)
except TimeoutError:
break
print("All data in stream:", len(data))
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
await using var client = new NatsClient();
INatsJSContext js = client.CreateJetStreamContext();
var streamConfig = new StreamConfig(name: "FOO", subjects: ["foo"]);
await js.CreateStreamAsync(streamConfig);
PubAckResponse ack = await js.PublishAsync("foo", "Hello, JetStream!");
ack.EnsureSuccess();
INatsJSConsumer orderedConsumer = await js.CreateOrderedConsumerAsync("FOO");
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await foreach (NatsJSMsg<string> msg in orderedConsumer.ConsumeAsync<string>(cancellationToken: cts.Token))
{
NatsJSMsgMetadata? meta = msg.Metadata;
Console.WriteLine($"Stream Sequence : {meta?.Sequence.Stream}");
Console.WriteLine($"Consumer Sequence: {meta?.Sequence.Consumer}");
}