Consumers

A consumer is a stateful view of a stream. It acts as interface for clients to consume a subset of messages stored in a stream and will keep track of which messages were delivered and acknowledged by clients.

Unlike with core NATS which provides an at most once delivery guarantee of a message, a consumer can provide an at least once delivery guarantee. This is achieved by the combination of published messages being persisted to the stream as well as the consumer tracking delivery and acknowledgement of each individual message as clients receive and process them. JetStream consumers support multiple kinds of acknowledgements and multiple acknowledgement policies. They will take care of automatically re-deliver un-acked (or 'nacked') messages up to a user specified maximum number of delivery attempts (there is an advisory being emitted when a message reaches this limit).

Dispatch type - Pull / Push

Consumers can be push-based where messages will be delivered to a specified subject or pull-based which allows clients to request batches of messages on demand. The choice of what kind of consumer to use depends on the use-case.

If there is a need to process messages in an application controlled manner and easily scale horizontally, you would use a 'pull consumer'. A simple client application that wants a replay of messages from a stream sequentially you would use an 'ordered push consumer'. An application that wants to benefit from load balancing or acknowledge messages individually will use a regular push consumer.

We recommend pull consumers for new projects. In particular when scalability, detailed flow control or error handling are a concern.

Ordered Consumers

Ordered consumers are the convenient default type of push consumers designed for applications that want to efficiently consume a stream for data inspection or analysis.

  • Always ephemeral

  • Auto acknowledgment (no re-delivery)

  • Automatic flow control

  • Single-threaded dispatching

  • No load balancing

Persistence - Durable / Ephemeral

In addition to the choice of being push or pull, a consumer can also be ephemeral or durable. A consumer is considered durable when an explicit name is set on the Durable field when creating the consumer, otherwise it is considered ephemeral.

Durables and ephemeral have the same message delivery semantics but an ephemeral consumer will not have persisted state or fault tolerance (server memory only) and will be automatically cleaned up (deleted) after a period of inactivity, when no subscriptions are bound to the consumer.

By default, durables will have replicated persisted state saved in the cluster and will remain even when there are periods of inactivity (unless InactiveThreshold is set explicitly). Durable consumers can recover from server and client failure.

Configuration

Below are the set of consumer configuration options that can be defined. The Version column indicates the version of nats-server in which the option was introduced. The Editable column indicates the option can be edited after the consumer is created.

General

AckPolicy

The policy choices include:

  • AckExplicit - The default policy. It means that each individual message must be acknowledged. It is recommended to use this mode, as it provides the most reliability and functionality.

  • AckNone - You do not have to ack any messages, the server will assume ack on delivery.

  • AckAll - If you receive a series of messages, you only have to ack the last one you received. All the previous messages received are automatically acknowledged at the same time.

If an ack is required but is not received within the AckWait window, the message will be redelivered.

The server may consider an ack arriving out of the window. If a first process fails to ack within the window it's entirely possible, for instance in queue situation, that the message has been redelivered to another consumer. Since this will technically restart the window, the ack from the first consumer will be considered.

DeliverPolicy

The policy choices include:

  • DeliverAll - The default policy. The consumer will start receiving from the earliest available message.

  • DeliverLast - When first consuming messages, the consumer will start receiving messages with the last message added to the stream, or the last message in the stream that matches the consumer's filter subject if defined.

  • DeliverLastPerSubject - When first consuming messages, start with the latest one for each filtered subject currently in the stream.

  • DeliverNew - When first consuming messages, the consumer will only start receiving messages that were created after the consumer was created.

  • DeliverByStartSequence - When first consuming messages, start at the first message having the sequence number or the next one available. The consumer is required to specify OptStartSeq which defines the sequence number.

  • DeliverByStartTime - When first consuming messages, start with messages on or after this time. The consumer is required to specify OptStartTime which defines this start time.

MaxAckPending

The MaxAckPending capability provides one-to-many flow control and applies to both push and pull consumers. For push consumers, MaxAckPending is the only form of flow control. However, for pull consumers because the delivery of the messages to the client application is client-driven (hence the 'pull') rather than server-initiated (hence the 'push') there is an implicit one-to-one flow control with the subscribers (the maximum batch size of the Fetch calls). Therefore, if you require high throughput you should remember to set it to an appropriately high value (e.g. the default value of 1000), as it can otherwise place a limit on the horizontal scalability of the processing of the stream in high throughput situations. Conversely, if you have bursts of messages published to the stream and your consuming application can be high latency to process the messages because it’s relying on some external higher latency service (like a database for example), then either just pull/fetch just a few at a time or set MaxAckPending to a value much lower than the default and select 'AckWait' judiciously to avoid some messages getting re-delivered because the processing is not fast enough to absorb the bursts (without causing re-deliveries).

FilterSubjects

A filter subject provides a way to apply server-side filtering of messages by a consumer prior to delivering them to clients.

For example, given a stream factory-events with a bound subject of factory-events.*.* and modeling a hierarchy of factory-events.<factory-id>.<event-type>, a consumer factory-A could be created with a filter of factory-events.A.* and only events for factory A would be delivered to clients.

A consumer can be configured with a singular form FilterSubject or the plural form FilterSubjects (as of 2.10). If the plural form, multiple disjoint filters can be applied, such as [factory-events.A.*, factory-events.B.*] or even across all factories, but choosing specific event types [factory-events.*.item_produced, factory-events.*.item_packaged].

For use cases that require granular consumer permissions, a single filter will internally use an extended consumer JetStream API, $JS.API.CONSUMER.CREATE.{stream}.{consumer}.{filter}, which enables restricting users to create consumers for only a specific subset of filter. For example an pub-allow permission on $JS.API.CONSUMER.CREATE.factory-events.*.factory-events.A.* which would allow a client to only create consumers specific to factory A for this stream.

Currently, when multiple filters are used, the more general form is used $JS.API.CONSUMER.DURABLE.CREATE.{stream}.{consumer} which does not include the {filter} token. If granular consumer permissions are required, a different strategy would need to be used, such as pre-creating consumers for clients.

Pull-specific

These options apply only to pull consumers. For an example on how configure a pull consumer using your preferred client, see NATS by Example.

Push-specific

These options apply only to push consumers. For an example on how to configure a push consumer using your preferred client, see NATS by Example.

Last updated