JetStream Walkthrough

Prerequisite: enabling JetStream

If you are running a local nats-server stop it and restart it with JetStream enabled using nats-server -js (if that's not already done)
You can then check that JetStream is enabled by using
1
nats account info
Copied!
which should output something like
1
Connection Information:
2
3
Client ID: 6
4
Client IP: 127.0.0.1
5
RTT: 64.996µs
6
Headers Supported: true
7
Maximum Payload: 1.0 MiB
8
Connected URL: nats://127.0.0.1:4222
9
Connected Address: 127.0.0.1:4222
10
Connected Server ID: ND2XVDA4Q363JOIFKJTPZW3ZKZCANH7NJI4EJMFSSPTRXDBFG4M4C34K
11
12
JetStream Account Information:
13
14
Memory: 0 B of Unlimited
15
Storage: 0 B of Unlimited
16
Streams: 0 of Unlimited
17
Consumers: 0 of Unlimited
Copied!
If you see the below then JetStream is not enabled
1
JetStream Account Information:
2
3
JetStream is not supported in this account
Copied!

1. Creating a stream

Let's start by creating a stream to capture and store the messages published on the subject "foo".
Enter nats stream add <Stream name> (in the examples below we will name the stream "my_stream"), then enter "foo" as the subject name and hit return to use the defaults for all the other stream attributes:
1
nats stream add my_stream
Copied!
example output
1
? Subjects to consume foo
2
? Storage backend file
3
? Retention Policy Limits
4
? Discard Policy Old
5
? Stream Messages Limit -1
6
? Per Subject Messages Limit -1
7
? Message size limit -1
8
? Maximum message age limit -1
9
? Maximum individual message size -1
10
? Duplicate tracking time window 2m
11
? Replicas 1
12
Stream my_stream was created
13
14
Information for Stream my_stream created 2021-10-12T08:42:10-07:00
15
16
Configuration:
17
18
Subjects: foo
19
Acknowledgements: true
20
Retention: File - Limits
21
Replicas: 1
22
Discard Policy: Old
23
Duplicate Window: 2m0s
24
Maximum Messages: unlimited
25
Maximum Bytes: unlimited
26
Maximum Age: 0.00s
27
Maximum Message Size: unlimited
28
Maximum Consumers: unlimited
29
30
31
State:
32
33
Messages: 0
34
Bytes: 0 B
35
FirstSeq: 0
36
LastSeq: 0
37
Active Consumers: 0
Copied!
You can then check the information about the stream you just created:
1
nats stream info my_stream
Copied!
which should output something like
1
Information for Stream my_stream created 2021-10-12T08:42:10-07:00
2
3
Configuration:
4
5
Subjects: foo
6
Acknowledgements: true
7
Retention: File - Limits
8
Replicas: 1
9
Discard Policy: Old
10
Duplicate Window: 2m0s
11
Maximum Messages: unlimited
12
Maximum Bytes: unlimited
13
Maximum Age: 0.00s
14
Maximum Message Size: unlimited
15
Maximum Consumers: unlimited
16
17
18
State:
19
20
Messages: 0
21
Bytes: 0 B
22
FirstSeq: 0
23
LastSeq: 0
24
Active Consumers: 0
Copied!

2. Publish some messages into the stream

Let's now start a publisher
1
nats pub foo --count=1000 --sleep 1s "publication #{{Count}} @ {{TimeStamp}}"
Copied!
As messages are being published on the subject "foo" they are also captured and stored in the stream, you can check that by using nats stream info my_stream and even look at the messages themselves using nats stream view my_stream

3. Creating a consumer

Now at this point if you create a 'Core NATS' (i.e. non-streaming) subscriber to listen for messages on the subject 'foo', you will only receive the messages being published after the subscriber was started, this is normal and expected for the basic 'Core NATS' messaging. In order to receive a 'replay' of all the messages contained in the stream (including those that were published in the past) we will now create a 'consumer'
We can administratively create a consumer using the 'nats consumer add ' command, in this example we will name the consumer "pull_consumer", and we will leave the delivery subject to 'nothing' (i.e. just hit return at the prompt) because we are creating a 'pull consumer' and select all for the start policy, you can then just use the defaults and hit return for all the other prompts. The stream the consumer is created on should be the stream 'my_stream' we just created above.
1
nats consumer add
Copied!
example output
1
? Consumer name pull_consumer
2
? Delivery target (empty for Pull Consumers)
3
? Start policy (all, new, last, subject, 1h, msg sequence) all
4
? Replay policy instant
5
? Filter Stream by subject (blank for all)
6
? Maximum Allowed Deliveries -1
7
? Maximum Acknowledgements Pending 0
8
? Select a Stream my_stream
9
Information for Consumer my_stream > pull_consumer created 2021-10-12T09:03:26-07:00
10
11
Configuration:
12
13
Durable Name: pull_consumer
14
Pull Mode: true
15
Deliver Policy: All
16
Deliver Queue Group: _unset_
17
Ack Policy: Explicit
18
Ack Wait: 30s
19
Replay Policy: Instant
20
Max Ack Pending: 20,000
21
Max Waiting Pulls: 512
22
23
State:
24
25
Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
26
Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
27
Outstanding Acks: 0 out of maximum 20000
28
Redelivered Messages: 0
29
Unprocessed Messages: 674
30
Waiting Pulls: 0 of maximum 512
Copied!
You can check on the status of any consumer at any time using nats consumer info or view the messages in the stream using nats stream view my_stream or even remove individual messages from the stream using nats stream rmm

3. Subscribing from the consumer

Now that the consumer has been created and since there are messages in the stream we can now start subscribing to the consumer:
1
nats consumer next my_stream pull_consumer --count 1000
Copied!
This will print out all the messages in the stream starting with the first message (which was published in the past) and continuing with new messages as they are published until the count is reached.
Note that in this example we are creating a pull consumer with a 'durable' name, this means that the consumer can be shared between as many consuming processes as you want. For example instead of running a single nats consumer next with a count of 1000 messages you could have started two instances of nats consumer each with a message count of 500 and you would see the consumption of the messages from the consumer distributed between those instances of nats

Replaying the messages again

Once you have iterated over all the messages in the stream with the consumer, you can get them again by simply creating a new consumer or by deleting that consumer (nats consumer rm) and re-creating it (nats consumer add).

4. Cleaning up

You can clean up a stream (and release the resources associated with it (e.g. the messages stored in the stream)) using nats stream purge
You can also delete a stream (which will also automatically delete all of the consumers that may be defined on that stream) using nats stream rm
Last modified 3d ago