The JetStream controllers allow you to manage NATS JetStream Streams and Consumers via K8S CRDs. You can find more info on how to deploy and usage here. Below you can find an example of how to create a stream and a couple of consumers:
---apiVersion: jetstream.nats.io/v1beta1kind: Streammetadata:name: mystreamspec:name: mystreamsubjects: ["orders.*"]storage: memorymaxAge: 1h---apiVersion: jetstream.nats.io/v1beta1kind: Consumermetadata:name: my-push-consumerspec:streamName: mystreamdurableName: my-push-consumerdeliverSubject: my-push-consumer.ordersdeliverPolicy: lastackPolicy: nonereplayPolicy: instant---apiVersion: jetstream.nats.io/v1beta1kind: Consumermetadata:name: my-pull-consumerspec:streamName: mystreamdurableName: my-pull-consumerdeliverPolicy: allfilterSubject: orders.receivedmaxDeliver: 20ackPolicy: explicit
Once the CRDs are installed you can use kubectl
to manage the streams and consumers as follows:
$ kubectl get streamsNAME STATE STREAM NAME SUBJECTSmystream Created mystream [orders.*]​$ kubectl get consumersNAME STATE STREAM CONSUMER ACK POLICYmy-pull-consumer Created mystream my-pull-consumer explicitmy-push-consumer Created mystream my-push-consumer none​# If you end up in an Errored state, run kubectl describe for more info.# kubectl describe streams mystream# kubectl describe consumers my-pull-consumer