JetStream wire API Reference
The normal way to use JetStream is through the NATS client libraries which expose a set of JetStream functions that you can use directly in your programs. But that is not the only way you can interact with the JetStream infrastructure programmatically. Just like core NATS has a wire protocol on top of TCP, the JetStream enabled nats-server(s) expose a set of Services over core NATS.

Reference

All of these subjects are found as constants in the NATS Server source, so for example the $JS.API.STREAM.LIST is a constant in the nats-server source api.JetStreamListStreams tables below will reference these constants and likewise data structures in the server for payloads.

Error Handling

The APIs used for administrative tools all respond with standardised JSON and these include errors.
1
nats req '$JS.API.STREAM.INFO.nonexisting' ''
Copied!
Output
1
Published 11 bytes to $JS.API.STREAM.INFO.nonexisting
2
Received [_INBOX.lcWgjX2WgJLxqepU0K9pNf.mpBW9tHK] : {
3
"type": "io.nats.jetstream.api.v1.stream_info_response",
4
"error": {
5
"code": 404,
6
"description": "stream not found"
7
}
8
}
Copied!
1
nats req '$JS.STREAM.INFO.ORDERS' ''
Copied!
Output
1
Published 6 bytes to $JS.STREAM.INFO.ORDERS
2
Received [_INBOX.fwqdpoWtG8XFXHKfqhQDVA.vBecyWmF] : '{
3
"type": "io.nats.jetstream.api.v1.stream_info_response",
4
"config": {
5
"name": "ORDERS",
6
...
7
}
Copied!
Here the responses include a type which can be used to find the JSON Schema for each response.
Non admin APIs - like those for adding a message to the stream will respond with -ERR or +OK with an optional reason after.

Admin API

All the admin actions the nats CLI can do falls in the sections below. The API structure are kept in the api package in the jsm.go repository.
Subjects that end in T like api.JSApiConsumerCreateT are formats and would need to have the Stream Name and in some cases also the Consumer name interpolated into them. In this case t := fmt.Sprintf(api.JSApiConsumerCreateT, streamName) to get the final subject.
The command nats events will show you an audit log of all API access events which includes the full content of each admin request, use this to view the structure of messages the nats command sends.
The API uses JSON for inputs and outputs, all the responses are typed using a type field which indicates their Schema. A JSON Schema repository can be found in nats-io/jsm.go/schemas.

General Info

Subject
Constant
Description
Request Payload
Response Payload
$JS.API.INFO
api.JSApiAccountInfo
Retrieves stats and limits about your account
empty payload
api.JetStreamAccountStats

Streams

Subject
Constant
Description
Request Payload
Response Payload
$JS.API.STREAM.LIST
api.JSApiStreamList
Paged list known Streams including all their current information
api.JSApiStreamListRequest
api.JSApiStreamListResponse
$JS.API.STREAM.NAMES
api.JSApiStreamNames
Paged list of Streams
api.JSApiStreamNamesRequest
api.JSApiStreamNamesResponse
$JS.API.STREAM.CREATE.*
api.JSApiStreamCreateT
Creates a new Stream
api.StreamConfig
api.JSApiStreamCreateResponse
$JS.API.STREAM.UPDATE.*
api.JSApiStreamUpdateT
Updates an existing Stream with new config
api.StreamConfig
api.JSApiStreamUpdateResponse
$JS.API.STREAM.INFO.*
api.JSApiStreamInfoT
Information about config and state of a Stream
empty payload, Stream name in subject
api.JSApiStreamInfoResponse
$JS.API.STREAM.DELETE.*
api.JSApiStreamDeleteT
Deletes a Stream and all its data
empty payload, Stream name in subject
api.JSApiStreamDeleteResponse
$JS.API.STREAM.PURGE.*
api.JSApiStreamPurgeT
Purges all of the data in a Stream, leaves the Stream
empty payload, Stream name in subject
api.JSApiStreamPurgeResponse
$JS.API.STREAM.MSG.DELETE.*
api.JSApiMsgDeleteT
Deletes a specific message in the Stream by sequence, useful for GDPR compliance
api.JSApiMsgDeleteRequest
api.JSApiMsgDeleteResponse
$JS.API.STREAM.MSG.GET.*
api.JSApiMsgGetT
Retrieves a specific message from the stream
api.JSApiMsgGetRequest
api.JSApiMsgGetResponse
$JS.API.STREAM.SNAPSHOT.*
api.JSApiStreamSnapshotT
Initiates a streaming backup of a streams data
api.JSApiStreamSnapshotRequest
api.JSApiStreamSnapshotResponse
$JS.API.STREAM.RESTORE.*
api.JSApiStreamRestoreT
Initiates a streaming restore of a stream
{}
api.JSApiStreamRestoreResponse

Consumers

Subject
Constant
Description
Request Payload
Response Payload
$JS.API.CONSUMER.CREATE.*
api.JSApiConsumerCreateT
Create an ephemeral Consumer
api.ConsumerConfig, Stream name in subject
api.JSApiConsumerCreateResponse
$JS.API.CONSUMER.DURABLE.CREATE.*
api.JSApiDurableCreateT
Create an Consumer
api.ConsumerConfig, Stream name in subject
api.JSApiConsumerCreateResponse
$JS.API.CONSUMER.LIST.*
api.JSApiConsumerListT
Paged list of known Consumers including their current info
api.JSApiConsumerListRequest
api.JSApiConsumerListResponse
$JS.API.CONSUMER.NAMES.*
api.JSApiConsumerNamesT
Paged list of known Consumer names
api.JSApiConsumerNamesRequest
api.JSApiConsumerNamesResponse
$JS.API.CONSUMER.INFO.*.*
api.JSApiConsumerInfoT
Information about an Consumer
empty payload, Stream and Consumer names in subject
api.JSApiConsumerInfoResponse
$JS.API.CONSUMER.DELETE.*.*
api.JSApiConsumerDeleteT
Deletes an Consumer
empty payload, Stream and Consumer names in subject
api.JSApiConsumerDeleteResponse

ACLs

It's hard to notice here but there is a clear pattern in these subjects, lets look at the various JetStream related subjects:
General information
1
$JS.API.INFO
Copied!
Stream and Consumer Admin
1
$JS.API.STREAM.CREATE.<stream>
2
$JS.API.STREAM.UPDATE.<stream>
3
$JS.API.STREAM.DELETE.<stream>
4
$JS.API.STREAM.INFO.<stream>
5
$JS.API.STREAM.PURGE.<stream>
6
$JS.API.STREAM.LIST
7
$JS.API.STREAM.NAMES
8
$JS.API.STREAM.MSG.DELETE.<stream>
9
$JS.API.STREAM.MSG.GET.<stream>
10
$JS.API.STREAM.SNAPSHOT.<stream>
11
$JS.API.STREAM.RESTORE.<stream>
12
$JS.API.CONSUMER.CREATE.<stream>
13
$JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer>
14
$JS.API.CONSUMER.DELETE.<stream>.<consumer>
15
$JS.API.CONSUMER.INFO.<stream>.<consumer>
16
$JS.API.CONSUMER.LIST.<stream>
17
$JS.API.CONSUMER.MSG.NEXT.<stream>.<consumer>
18
$JS.API.CONSUMER.NAMES.<stream>
Copied!
Stream and Consumer Use
1
$JS.API.CONSUMER.MSG.NEXT.<stream>.<consumer>
2
$JS.ACK.<stream>.<consumer>.x.x.x
3
$JS.SNAPSHOT.ACK.<stream>.<msg id>
4
$JS.SNAPSHOT.RESTORE.<stream>.<msg id>
Copied!
Events and Advisories:
1
$JS.EVENT.METRIC.CONSUMER_ACK.<stream>.<consumer>
2
$JS.EVENT.ADVISORY.MAX_DELIVERIES.<stream>.<consumer>
3
$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED.<stream>.<consumer>
4
$JS.EVENT.ADVISORY.STREAM.CREATED.<stream>
5
$JS.EVENT.ADVISORY.STREAM.DELETED.<stream>
6
$JS.EVENT.ADVISORY.STREAM.UPDATED.<stream>
7
$JS.EVENT.ADVISORY.CONSUMER.CREATED.<stream>.<consumer>
8
$JS.EVENT.ADVISORY.CONSUMER.DELETED.<stream>.<consumer>
9
$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE.<stream>
10
$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE.<stream>
11
$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE.<stream>
12
$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE.<stream>
13
$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED.<stream>
14
$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST.<stream>
15
$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED.<stream>.<consumer>
16
$JS.EVENT.ADVISORY.CONSUMER.QUORUM_LOST.<stream>.<consumer>
17
$JS.EVENT.ADVISORY.API
Copied!
This design allows you to easily create ACL rules that limit users to a specific Stream or Consumer and to specific verbs for administration purposes. For ensuring only the receiver of a message can Ack it we have response permissions ensuring you can only Publish to Response subject for messages you received.

Acknowledging Messages

Messages that need acknowledgement will have a Reply subject set, something like $JS.ACK.ORDERS.test.1.2.2, this is the prefix defined in api.JetStreamAckPre followed by <stream>.<consumer>.<delivered count>.<stream sequence>.<consumer sequence>.<timestamp>.<pending messages>.
In all of the Synadia maintained API's you can simply do msg.Respond(nil) (or language equivalent) which will send nil to the reply subject.

Fetching The Next Message From a Pull-based Consumer

If you have a pull-based Consumer you can send a standard NATS Request to $JS.API.CONSUMER.MSG.NEXT.<stream>.<consumer>, here the format is defined in api.JetStreamRequestNextT and requires populating using fmt.Sprintf().
1
nats req '$JS.API.CONSUMER.MSG.NEXT.ORDERS.test' '1'
Copied!
Output
1
Published 1 bytes to $JS.API.CONSUMER.MSG.NEXT.ORDERS.test
2
Received [js.1] : 'message 1'
Copied!
Here we ask for just 1 message - nats req only shows 1 - but you can fetch a batch of messages by varying the argument. This combines well with the AckAll Ack policy.
The above request for the next message will stay in the server for as long as the client is connected and future pulls from the same client will accumulate on the server, meaning if you ask for 1 message 100 times and 1000 messages arrive you'll get sent 100 messages not 1.
This is often not desired, pull consumers support a mode where a JSON document is sent describing the pull request.
1
{
2
"expires": 7000000000,
3
"batch": 10
4
}
Copied!
This requests 10 messages and asks the server to keep this request for 7 seconds, this is useful when you poll the server frequently and do not want the pull requests to accumulate on the server. Set the expire time to now + your poll frequency.
1
{
2
"batch": 10,
3
"no_wait": true
4
}
Copied!
Here we see a second format of the Pull request that will not store the request on the queue at all but when there are no messages to deliver will send a nil bytes message with a Status header of 404, this way you can know when you reached the end of the stream for example. A 409 is returned if the Consumer has reached MaxAckPending limits.
1
nats req '$JS.API.CONSUMER.MSG.NEXT.ORDERS.NEW' '{"no_wait": true, "batch": 10}'
Copied!
Output
1
13:45:30 Sending request on "$JS.API.CONSUMER.MSG.NEXT.ORDERS.NEW"
2
13:45:30 Received on "_INBOX.UKQGqq0W1EKl8inzXU1naH.XJiawTRM" rtt 594.908µs
3
13:45:30 Status: 404
4
13:45:30 Description: No Messages
Copied!

Fetching From a Stream By Sequence

If you know the Stream sequence of a message you can fetch it directly, this does not support acks. Do a Request() to $JS.API.STREAM.MSG.GET.ORDERS sending it the message sequence as payload. Here the prefix is defined in api.JetStreamMsgBySeqT which also requires populating using fmt.Sprintf().
1
nats req '$JS.API.STREAM.MSG.GET.ORDERS' '{"seq": 1}'
Copied!
Output
1
Published 1 bytes to $JS.STREAM.ORDERS.MSG.BYSEQ
2
Received [_INBOX.cJrbzPJfZrq8NrFm1DsZuH.k91Gb4xM] : '{
3
"type": "io.nats.jetstream.api.v1.stream_msg_get_response",
4
"message": {
5
"subject": "x",
6
"seq": 1,
7
"data": "aGVsbG8=",
8
"time": "2020-05-06T13:18:58.115424+02:00"
9
}
10
}'
Copied!
The Subject shows where the message was received, Data is base64 encoded and Time is when it was received.

Consumer Samples

Samples are published to a specific subject per Consumer, something like $JS.EVENT.METRIC.CONSUMER_ACK.<stream>.<consumer> you can just subscribe to that and get api.ConsumerAckMetric messages in JSON format. The prefix is defined in api.JetStreamMetricConsumerAckPre.
Last modified 10d ago