Streams are 'message stores', each stream defines how messages are stored and what the limits (duration, size, interest) of the retention are. Streams consume normal NATS subjects, any message published on those subjects will be captured in the defined storage system. You can do a normal publish to the subject for unacknowledged delivery, though it's better to use the JetStream publish calls instead as the JetStream server will reply with an acknowledgement that it was successfully stored.
In the diagram above we show the concept of storing all
ORDERS.*in the Stream even though there are many types of order related messages. We'll show how you can selectively consume subsets of messages later. Relatively speaking the Stream is the most resource consuming component so being able to combine related data in this manner is important to consider.
Streams can consume many subjects. Here we have
ORDERS.*but we could also consume
SHIPPING.stateinto the same Stream should that make sense (not shown here).
Streams support various retention policies which define when messages in the stream can be automatically deleted, such as when stream limits are hit (like max count, size or age) - if the discard policy is set to 'discard old' - or also more novel options that apply on top of the limits such as 'interest' (automatically deleted after all consumers have received acknowledgement of the delivery of the message to client applications) and 'working queue' (where a message is automatically deleted from the stream when the consumer receives acknowledgement of its consumption from the client application).
Below are the set of stream configuration options that can be defined. The
Versioncolumn indicates the version of the server the option was introduced. The
Editablecolumn indicates the option can be edited after the stream created. See client-specific examples here.
The storage types include:
File(default) - Uses file-based storage for stream data.
Memory- Uses memory-based storage for stream data.
The retention options include:
LimitsPolicy(default) - Retention based on the various limits that are set including:
MaxMsgsPerSubject. If any of these limits are set, whichever limit is hit first will cause the automatic deletion of the respective message(s). See a full code example.
InterestPolicy- Retention based on the consumer interest in the stream and messages. The base case is that there are zero consumers defined for a stream. If messages are published to the stream, they will be immediately deleted so there is no interest. This implies that consumers need to be bound to the stream ahead of messages being published to the stream. Once a given message is ack'ed by all consumers, the message is deleted. See a full code example.
WorkQueuePolicy- Retention with the typical behavior of a FIFO queue. Each message can be consumed only once. This is enforced by only allowing one consumer to be created for a work-queue stream. Once a given message is ack'ed, it will be deleted from the stream. See a full code example.
The discard behavior applies only for streams that have at least one limit defined. The options include:
DiscardOld(default) - This policy will delete the oldest messages in order to maintain the limit. For example, if
MaxAgeis set to one minute, the server will automatically delete messages older than one minute with this policy.
DiscardNew- This policy will reject new messages from being appended to the stream if it would exceed one of the limits. An extension to this policy is
DiscardNewPerSubjectwhich will apply this policy on a per-subject basis within the stream.
Refers to the placement of the stream assets (data) within a NATS deployment, be it a single cluster or a supercluster. A given stream, including all replicas (not mirrors), are bound to a single cluster. So when creating or moving a stream, a cluster will be chosen to host the assets.
Without declaring explicit placement for a stream, by default, the stream will be created within the cluster that the client is connected to assuming it has sufficient storage available.
By declaring stream placement, where these assets are located can be controlled explicitly. This is generally useful to co-locate with the most active clients (publishers or consumers) or may be required for data soveriegnty reasons.
Placement is supported in all client SDKs as well as the CLI. For example, adding a stream via the the CLI to place a stream in a specific cluster looks like this:
nats stream add --cluster aws-us-east1-c1
If you have multiple clusters that form a supercluster, then each is required to have a different name.
Another placement option are tags. Each server can have its own set of tags, defined in configuration, typically describing properties of geography, hosting provider, sizing tiers, etc. In addition, tags are often used in conjunction with the
jetstream.unique_tagconfig option to ensure that replicas must be placed on servers having different values for the tag.
For example, a server A, B, and C in the above cluster might all the same configuration except for the availability zone they are deployed to.
// Server A
server_tags: ["cloud:aws", "region:us-east1", "az:a"]
// Server B
server_tags: ["cloud:aws", "region:us-east1", "az:b"]
// Server C
server_tags: ["cloud:aws", "region:us-east1", "az:c"]
Now we can create a stream by using tags, for example indicating we want a stream in us-east1.
nats stream add --tag region:us-east1
If we had a second cluster in Google Cloud with the same region tag, the stream could be placed in either the AWS or GCP cluster. However, the
unique_tagconstraint ensures each replica will be placed in a different AZ in the cluster that was selected implicitly by the placement tags.
Although less common, note that both the cluster and tags can be used for placement. This would be used if a single cluster contains servers have different properties.
When a stream is declared as a mirror, it will automatically and asynchronously replicate messages from the origin stream. There are several options when declaring the mirror configuration.
Name- Name of the origin stream to source messages from.
StartSeq- An optional start sequence of the origin stream to start mirroring from.
StartTime- An optional message start time to start mirroring from. Any messages that are equal to or greater than the start time will be included.
FilterSubject- An optional filter subject which will include only messages that match the subject, typically including a wildcard.
Domain- The JetStream domain of where the origin stream exists. This is commonly used between a cluster/supercluster and a leaf node/cluster.
A mirror stream can have its own retention policy, replication, and storage type. Although messages cannot be published to a mirror directly by clients, messages can be deleted on-demand (beyond the retention policy), and consumers can similarly bind to the mirror.
A stream defining
Sourcesis a generalization of the
Mirrorand allows for sourcing data from one or more streams concurrently. Essentially these streams are aggregated into a single interleaved stream.
One functional difference from a mirror is that a stream with sources defined can also be published to. That is, it can define a set of subjects for which clients can publish messages to directly.
The fields per source stream are the same as defined in mirror above.
If enabled, the
RePublishstream option will result in the server re-publishing messages received into a stream automatically and immediately after a succesful write, to a distinct destination subject.
For high scale needs where, currently, a dedicated consumer may add too much overhead, clients can establish a core NATS subscription to the destination subject and receive messages that were appended to the stream in real-time.
The fields for configuring republish include:
Source- An optional subject pattern which is a subset of the subjects bound to the stream. It defaults to all messages in the stream, e.g.
HeadersOnly- If true, the message data will not be included in the re-published message, only an additional header
Nats-Msg-Sizeindicating the size of the message in bytes.
Every message that is republished will have a set of headers set providing metadata about the source:
Nats-Stream- name of the stream the message was republished from.
Nats-Subject- The original subject of the message
Nats-Sequence- The original sequence of the message in the stream
Nats-Last-Sequence- The last sequence of the message having the same subject, otherwise zero if this is the first message for the subject.