Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Flush() on the connection right after you call subscribe if you need to synchronize with the subscription being ready at the server level.
Connection nc = Nats.connect();
// Do something with the connection
nc.close();const nc = await connect();
// Do something with the connection
doSomething();
// When done close it
await nc.close();nc = NATS()
await nc.connect()
# Do something with the connection
await nc.close()nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionnats-server[14524] 2021/10/25 22:53:53.525530 [INF] Starting nats-server
[14524] 2021/10/25 22:53:53.525640 [INF] Version: 2.6.1
[14524] 2021/10/25 22:53:53.525643 [INF] Git: [not set]
[14524] 2021/10/25 22:53:53.525647 [INF] Name: NDAUZCA4GR3FPBX4IFLBS4VLAETC5Y4PJQCF6APTYXXUZ3KAPBYXLACC
[14524] 2021/10/25 22:53:53.525650 [INF] ID: NDAUZCA4GR3FPBX4IFLBS4VLAETC5Y4PJQCF6APTYXXUZ3KAPBYXLACC
[14524] 2021/10/25 22:53:53.526392 [INF] Starting http monitor on 0.0.0.0:8222
[14524] 2021/10/25 22:53:53.526445 [INF] Listening for client connections on 0.0.0.0:4222
[14524] 2021/10/25 22:53:53.526684 [INF] Server is readynats stream add ORDERS --subjects "ORDERS.*" --ack --max-msgs=-1 --max-bytes=-1 --max-age=1y --storage file --retention limits --max-msg-size=-1 --discard=old
nats consumer add ORDERS NEW --filter ORDERS.received --ack explicit --pull --deliver all --max-deliver=-1 --sample 100
nats consumer add ORDERS DISPATCH --filter ORDERS.processed --ack explicit --pull --deliver all --max-deliver=-1 --sample 100
nats consumer add ORDERS MONITOR --filter '' --ack none --target monitor.ORDERS --deliver last --replay instant// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
// It's optional to call ConnectAsync()
// as it will be called when needed automatically
await client.ConnectAsync();require 'nats/client'
NATS.start do |nc|
# Do something with the connection
# Close the connection
nc.close
endnatsConnection *conn = NULL;
natsStatus s;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
if (s != NATS_OK)
// handle error
// Destroy connection, no-op if conn is NULL.
natsConnection_Destroy(conn);nats reply help.please 'OK, I CAN HELP!!!'nats request help.please 'I need help!'go install github.com/nats-io/nkeys/nk@latestnk -gen user -puboutSUACSSL3UAHUDXKFSNVUZRF5UHPMWZ6BFDTJ7M6USDXIEDNPPQYYYCU3VY
UDXU4RCSJNZOIQHZNWXHXORDPRTGNJAHAHFRGZNEEJCPQTT2M7NLCNF4authorization: {
users: [
{ nkey: UDXU4RCSJNZOIQHZNWXHXORDPRTGNJAHAHFRGZNEEJCPQTT2M7NLCNF4 }
]
}Connection nc = Nats.connect("nats://demo.nats.io:4222");
nc.publish("updates", "All is Well".getBytes(StandardCharsets.UTF_8));const sc = StringCodec();
nc.publish("updates", sc.encode("All is Well"));nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
await nc.publish("updates", b'All is Well')// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient(url: "demo.nats.io", name:
// The default serializer uses UTF-8 encoding for strings
await client.PublishAsync<string>(subject: "updates", data: "All is Wellnc, err := nats.Connect("demo.nats.io", nats.Name("API PublishBytes Example"))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
if err := nc.Publish("updates", []byte("All is Well"));
log.Fatal(err)
}// Set reconnect buffer size in bytes (5 MB)
nc, err := nats.
nats reply foo "service instance A Reply# {{Count}}"nats reply foo "service instance B Reply# {{Count}}"nats reply foo "service instance C Reply# {{Count}}"nats request foo "Simple request"nats request foo "Another simple request"require 'nats/client'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
nc.publish("updates", "All is Well")
endnsc revocations -hOptions options = new Options.Builder()
.server("nats://demo.nats.io:4222")
.reconnectBufferSize(5 * 1024 * 1024) // Set buffer in bytes
.build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close();// Reconnect buffer size is not configurable on NATS JavaScript client# Asyncio NATS client currently does not implement a reconnect buffer// Reconnect buffer size is not configurable on NATS .NET client# There is currently no reconnect pending buffer as part of the Ruby NATS clientnc, err := nats.Connect("demo.nats.io", nats.Name("API Name Option Example"))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionOptions options = new Options.Builder()
.server("nats://demo.nats.io:4222")
.connectionName("API Name Option Example") // Set Name
.build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close(); const nc = await connect({
name: "my-connection",
servers: ["demo.nats.io:4222"],
});nc = NATS()
await nc.connect(
servers=["nats://demo.nats.io:4222"],
name="API Name Option Example")
# Do something with the connection
await nc.close()// Disable reconnect attempts
nc, err := nats.Connect("demo.nats.io", nats.NoReconnect())
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionOptions options = new Options.Builder()
.server("nats://demo.nats.io:4222")
.noReconnect() // Disable reconnect attempts
.build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close(); const nc = await connect({
reconnect: false,
servers: ["demo.nats.io"],
});nsc add operator -insc init -o synadia -n MyFirstAccountexport nsc_zoom_operator=https://account-server-host/jwt/v1/operatornsc edit operator -u https://account-server-host/jwt/v1nsc edit operator -n nats://localhost:4222
nsc tool pub hello worldm, err := nc.Request("foo", nil, time.Second);
# err == nats.ErrNoRespondersManage revocation for users and activations from an account
Usage:
nsc revocations [command]
Available Commands:
add-user Revoke a user
add_activation Revoke an accounts access to an export
delete-user Remove a user revocation
delete_activation Remove an account revocation from an export
list-users List users revoked in an account
list_activations List account revocations for an export
Flags:
-h, --help help for revocations
Global Flags:
-i, --interactive ask questions for various settings
-K, --private-key string private key
Use "nsc revocations [command] --help" for more information about a command.natsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
// Set reconnect buffer size in bytes (5 MB)
s = natsOptions_SetReconnectBufSize(opts, 5*1024*1024);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient(name: "API Name Option Example", url: "nats://demo.nats.io:4222");
// It's optional to call ConnectAsync()
// as it will be called when needed automatically
await client.ConnectAsync();require 'nats/client'
NATS.start(servers: ["nats://demo.nats.io:4222"], name: "API Name Option Example") do |nc|
# Do something with the connection
# Close the connection
nc.close
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetName(opts, "API Name Option Example");
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);nc = NATS()
await nc.connect(
servers=[
"nats://demo.nats.io:1222",
"nats://demo.nats.io:1223",
"nats://demo.nats.io:1224"
],
allow_reconnect=False,
)
# Do something with the connection
await nc.close()// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var client = new NatsClient(new NatsOpts
{
Url = "nats://demo.nats.io:4222",
// .NET client does not support disabling reconnects,
// but you can set the maximum number of reconnect attempts
MaxReconnectRetry = 1,
});require 'nats/client'
NATS.start(servers: ["nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"], reconnect: false) do |nc|
# Do something with the connection
# Close the connection
nc.close
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetAllowReconnect(opts, false);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);const nc = await connect({
servers: ["demo.nats.io"],
noEcho: true,
});
const sub = nc.subscribe(subj, { callback: (_err, _msg) => {} });
nc.publish(subj);
await sub.drain();
// we won't get our own messages
t.is(sub.getProcessed(), 0);ncA = NATS()
ncB = NATS()
await ncA.connect(no_echo=True)
await ncB.connect()
async def handler(msg):
# Messages sent by `ncA' will not be received.
print("[Received] ", msg)
await ncA.subscribe("greetings", cb=handler)
await ncA.flush()
await ncA.publish("greetings", b'Hello World!')
await ncB.publish("greetings", b'Hello World!')
# Do something with the connection
await asyncio.sleep(1)
await ncA.drain()
await ncB.drain()// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var client = new NatsClient(new NatsOpts
{
Url = "nats://demo.nats.io:4222",
// Turn off echo
Echo = false
});NATS.start("nats://demo.nats.io:4222", no_echo: true) do |nc|
# ...
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetNoEcho(opts, true);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);// Turn off echo
nc, err := nats.Connect("demo.nats.io", nats.Name("API NoEcho Example"), nats.NoEcho())
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionOptions options = new Options.Builder()
.server("nats://demo.nats.io:4222")
.noEcho() // Turn off echo
.build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close();ws://...)nc, err := nats.Connect("127.0.0.1
Options options = new Options.Builder()
// credentials file contains the JWT and the secret signing key
const authenticator = credsAuthenticator(
nc, err := nats.Connect("demo.nats.io
Options options = new Options.Builder()
const nc = await connect({
nc = NATS()
await nc.connect(
nc, err := nats.Connect("demo.nats.io
Connection nc = Nats.connect("nats://demo.nats.io:4222"
nc, err := nats.Connect("demo.nats.io
// Set max reconnects attempts
nc, err := nats.
Options options = new Options.Builder()
const nc = await connect({
servers := []string{"nats://127.0.0.1:1222",
Options options = new Options.Builder()
// Set reconnect interval to 10 seconds
nc, err := nats.
Options options = new Options.Builder()
const nc = await connect({
nats sub <subject>nats sub msg.testnats pub <subject> <message>nats pub msg.test "NATS MESSAGE"nats pub msg.test "NATS MESSAGE 2"nats sub msg.testnats pub msg.test "NATS MESSAGE 3"nats sub msg.test.newnats pub msg.test "NATS MESSAGE 4"nats sub msg.*nats pub msg.test "NATS MESSAGE 5"curl -L https://raw.githubusercontent.com/nats-io/nsc/master/install.py | pythonnsc/
├── accounts
│ ├── nats
│ └── nsc.json
└── nkeys
├── creds
└── keys
5 directories, 1 filensc helpnats-topnats-server version 0.6.4 (uptime: 31m42s)
Server:
Load: CPU: 0.8% Memory: 5.9M Slow Consumers: 0
In: Msgs: 34.2K Bytes: 3.0M Msgs/Sec: 37.9 Bytes/Sec: 3389.7
Out: Msgs: 68.3K Bytes: 6.0M Msgs/Sec: 75.8 Bytes/Sec: 6779.4
Connections: 4
HOST CID SUBS PENDING MSGS_TO MSGS_FROM BYTES_TO BYTES_FROM LANG VERSION SUBSCRIPTIONS
127.0.0.1:56134 2 5 0 11.6K 11.6K 1.1M 905.1K go 1.1.0 foo, hello
127.0.1.1:56138 3 1 0 34.2K 0 3.0M 0 go 1.1.0 _INBOX.a96f3f6853616154d23d1b5072
127.0.0.1:56144 4 5 0 11.2K 11.1K 873.5K 1.1M go 1.1.0 foo, hello
127.0.0.1:56151 5 8 0 11.4K 11.5K 1014.6K 1.0M go 1.1.0 foo, hello-----BEGIN NATS USER JWT-----
eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5In0.eyJqdGkiOiJUVlNNTEtTWkJBN01VWDNYQUxNUVQzTjRISUw1UkZGQU9YNUtaUFhEU0oyWlAzNkVMNVJBIiwiaWF0IjoxNTU4MDQ1NTYyLCJpc3MiOiJBQlZTQk0zVTQ1REdZRVVFQ0tYUVM3QkVOSFdHN0tGUVVEUlRFSEFKQVNPUlBWV0JaNEhPSUtDSCIsIm5hbWUiOiJvbWVnYSIsInN1YiI6IlVEWEIyVk1MWFBBU0FKN1pEVEtZTlE3UU9DRldTR0I0Rk9NWVFRMjVIUVdTQUY3WlFKRUJTUVNXIiwidHlwZSI6InVzZXIiLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e319fQ.6TQ2ilCDb6m2ZDiJuj_D_OePGXFyN3Ap2DEm3ipcU5AhrWrNvneJryWrpgi_yuVWKo1UoD5s8bxlmwypWVGFAA
------END NATS USER JWT------
************************* IMPORTANT *************************
NKEY Seed printed below can be used to sign and prove identity.
NKEYs are sensitive and should be treated as secrets.
-----BEGIN USER NKEY SEED-----
SUAOY5JZ2WJKVR4UO2KJ2P3SW6FZFNWEOIMAXF4WZEUNVQXXUOKGM55CYE
------END USER NKEY SEED------
*************************************************************nc = NATS()
async def error_cb(e):
print("Error:", e)
await nc.connect("nats://localhost:4222",
user_credentials="path_to_creds_file",
error_cb=error_cb,
)
# Do something with the connection
await nc.close()nats account infoJetStream Account Information:
JetStream is not supported in this account// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var client = new NatsClient(new NatsOpts
{
Url = "nats://demo.nats.io:4222",
ConnectTimeout = TimeSpan.FromSeconds(10)
});
// You don't have to call ConnectAsync() explicitly,
// first operation will make the connection otherwise.
await client.ConnectAsync();# There is currently no connect timeout as part of the Ruby NATS client API, but you can use a timer to mimic it.
require 'nats/client'
timer = EM.add_timer(10) do
NATS.connect do |nc|
# Do something with the connection
# Close the connection
nc.close
end
end
EM.cancel_timer(timer)nnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
// Set the timeout to 10 seconds (10,000 milliseconds)
s = natsOptions_SetTimeout(opts, 10000);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);// node-nats subscriptions are always async.# Asyncio NATS client currently does not have a sync subscribe API// NATS .NET subscriptions are always async.Connection nc = Nats.connect("nats://demo.nats.io:4222");
System.out.println("The Connection is: " + nc.getStatus()); // CONNECTED
nc.close();
System.out.println("The Connection is: " + nc.getStatus()); // CLOSED // you can find out where you connected:
t.log(`connected to a nats server version ${nc.info.version}`);
// or information about the data in/out of the client:
const stats = nc.stats();
t.log(`client sent ${stats.outMsgs} messages and received ${stats.inMsgs}`);nc = NATS()
await nc.connect(
servers=["nats://demo.nats.io:4222"],
)
# Do something with the connection.
print("The connection is connected?", nc.is_connected)
while True:
if nc.is_reconnecting:
print("Reconnecting to NATS...")
break
await asyncio.sleep(1)
await nc.close()
print("The connection is closed?", nc.is_closed)// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
Console.WriteLine($"{client.Connection.ConnectionState}"); // Closed
await client.ConnectAsync();
Console.WriteLine($"{client.Connection.ConnectionState}"); // OpenNATS.start(max_reconnect_attempts: 2) do |nc|
puts "Connect is connected?: #{nc.connected?}"
timer = EM.add_periodic_timer(1) do
if nc.closing?
puts "Connection closed..."
EM.cancel_timer(timer)
NATS.stop
end
if nc.reconnecting?
puts "Reconnecting to NATS..."
next
end
end
endnc = NATS()
await nc.connect(
servers=["nats://demo.nats.io:4222"],
max_reconnect_attempts=10,
)
# Do something with the connection
await nc.close()// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var client = new NatsClient(new NatsOpts
{
Url = "nats://demo.nats.io:4222",
// Set the maximum number of reconnect attempts
MaxReconnectRetry = 10,
});require 'nats/client'
NATS.start(servers: ["nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"], max_reconnect_attempts: 10) do |nc|
# Do something with the connection
# Close the connection
nc.close
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetMaxReconnect(opts, 10);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);const nc = await connect({
noRandomize: false,
servers: ["127.0.0.1:4443", "demo.nats.io"],
});nc = NATS()
await nc.connect(
servers=[
"nats://demo.nats.io:1222",
"nats://demo.nats.io:1223",
"nats://demo.nats.io:1224"
],
dont_randomize=True,
)
# Do something with the connection
await nc.close()// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var client = new NatsClient(new NatsOpts
{
Url = "nats://127.0.0.1:1222,nats://127.0.0.1:1223,nats://127.0.0.1:1224",
NoRandomize = true,
});nc = NATS()
await nc.connect(
servers=["nats://demo.nats.io:4222"],
reconnect_time_wait=10,
)
# Do something with the connection
await nc.close()// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var client = new NatsClient(new NatsOpts
{
Url = "nats://127.0.0.1:1222,nats://127.0.0.1:1223,nats://127.0.0.1:1224",
// Set reconnect interval to between 5-10 seconds
ReconnectWaitMin = TimeSpan.FromSeconds(5),
ReconnectWaitMax = TimeSpan.FromSeconds(10),
});AllowMsgSchedules stream configuration option allows the scheduling of messages. Users can use this feature for delayed publishing/scheduling of messages. More information is available in ADR-51connect_backoff. If true, will start exponential backoff at 1 second up to 30 seconds. This can slow down the speed of reconnection but significantly reduces the amount of DNS queries and general connection attempts during server restarts or outages.allow_insecure_cipher_suitesopt, err := nats.NkeyOptionFromSeed("seed.txt")
if err != nil {
log.Fatal(err)
}
nc, err := nats.Connect("127.0.0.1", opt)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionNKey theNKey = NKey.createUser(null); // really should load from somewhere
Options options = new Options.Builder()
.server("nats://localhost:4222")
.authHandler(new AuthHandler(){
public char[] getID() {
try {
return theNKey.getPublicKey();
} catch (GeneralSecurityException|IOException|NullPointerException ex) {
return null;
}
}
public byte[] sign(byte[] nonce) {
try {
return theNKey.sign(nonce);
} catch (GeneralSecurityException|IOException|NullPointerException ex) {
return null;
}
}
public char[] getJWT() {
return null;
}
})
.build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close();nats stream add --config stream_with_sources.json[WRN] Invalid JetStream request '$G > $JS.API.STREAM.CREATE.test-stream': json: unknown field "unknown" jetstream {
strict: false
} go install github.com/nats-io/nats-topnats-top [-s server] [-m monitor] [-n num_connections] [-d delay_in_secs] [-sort by]// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient("127.0.0.1", credsFile: "/path/to/file.creds");natsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
// Pass the credential file this way if the file contains both user JWT and seed.
// Otherwise, if the content is split, the first file is the user JWT, the second
// contains the seed.
s = natsOptions_SetUserCredentialsFromFiles(opts, "path_to_creds_file", NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);nats account infoConnection Information:
Client ID: 6
Client IP: 127.0.0.1
RTT: 64.996µs
Headers Supported: true
Maximum Payload: 1.0 MiB
Connected URL: nats://127.0.0.1:4222
Connected Address: 127.0.0.1:4222
Connected Server ID: ND2XVDA4Q363JOIFKJTPZW3ZKZCANH7NJI4EJMFSSPTRXDBFG4M4C34K
JetStream Account Information:
Memory: 0 B of Unlimited
Storage: 0 B of Unlimited
Streams: 0 of Unlimited
Consumers: 0 of Unlimited nats kv add my-kvmy_kv Key-Value Store Status
Bucket Name: my-kv
History Kept: 1
Values Stored: 0
Compressed: false
Backing Store Kind: JetStream
Bucket Size: 0 B
Maximum Bucket Size: unlimited
Maximum Value Size: unlimited
Maximum Age: unlimited
JetStream Stream: KV_my-kv
Storage: Filenats kv put my-kv Key1 Value1nats kv get my-kv Key1my-kv > Key1 created @ 12 Oct 21 20:08 UTC
Value1nats kv del my-kv Key1nats kv create my-sem Semaphore1 Value1nats kv create my-sem Semaphore1 Value1
nats: error: nats: wrong last sequence: 1: key existsnats kv update my-sem Semaphore1 Value2 13nats kv update my-sem Semaphore1 Value2 13
nats: error: nats: wrong last sequence: 14nats kv watch my-kv[2021-10-12 13:15:03] DEL my-kv > Key1nats kv put my-kv Key1 Value2[2021-10-12 13:25:14] PUT my-kv > Key1: Value2nats kv rm my-kv// Set Ping Interval to 20 seconds and Max Pings Outstanding to 5
nc, err := nats.Connect("demo.nats.io", nats.Name("API Ping Example"), nats.PingInterval(20*time.Second), nats.MaxPingsOutstanding(5))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionOptions options = new Options.Builder()
.server("nats://demo.nats.io")
.pingInterval(Duration.ofSeconds(20)) // Set Ping Interval
.maxPingsOut(5) // Set max pings in flight
.build();
// Connection is AutoCloseable
try (Connection nc = Nats.connect(options)) {
// Do something with the connection
}// Set Ping Interval to 20 seconds and Max Pings Outstanding to 5
const nc = await connect({
pingInterval: 20 * 1000,
maxPingOut: 5,
servers: ["demo.nats.io:4222"],
});nc = NATS()
await nc.connect(
servers=["nats://demo.nats.io:4222"],
# Set Ping Interval to 20 seconds and Max Pings Outstanding to 5
ping_interval=20,
max_outstanding_pings=5,
)
# Do something with the connection.// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var client = new NatsClient(new NatsOpts
{
Url = "nats://demo.nats.io:4222",
// Set Ping Interval to 20 seconds and Max Pings Outstanding to 5
PingInterval = TimeSpan.FromSeconds(20),
MaxPingOut = 5,
});require 'nats/client'
# Set Ping Interval to 20 seconds and Max Pings Outstanding to 5
NATS.start(ping_interval: 20, max_outstanding_pings: 5) do |nc|
nc.on_reconnect do
puts "Got reconnected to #{nc.connected_server}"
end
nc.on_disconnect do |reason|
puts "Got disconnected! #{reason}"
end
# Do something with the connection
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
// Set Ping interval to 20 seconds (20,000 milliseconds)
s = natsOptions_SetPingInterval(opts, 20000);
if (s == NATS_OK)
// Set the limit to 5
s = natsOptions_SetMaxPingsOut(opts, 5);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);# The Ruby NATS client subscriptions are all async.natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Subscribe
if (s == NATS_OK)
s = natsConnection_SubscribeSync(&sub, conn, "updates");
// Wait for messages
if (s == NATS_OK)
s = natsSubscription_NextMsg(&msg, sub, 10000);
if (s == NATS_OK)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Destroy message that was received
natsMsg_Destroy(msg);
}
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);require 'nats/client'
NATS.start(servers: ["nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"], dont_randomize_servers: true) do |nc|
# Do something with the connection
# Close the connection
nc.close
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
const char *servers[] = {"nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"};
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetServers(opts, servers, 3);
if (s == NATS_OK)
s = natsOptions_SetNoRandomize(opts, true);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);require 'nats/client'
NATS.start(servers: ["nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"], reconnect_time_wait: 10) do |nc|
# Do something with the connection
# Close the connection
nc.close
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
// Set reconnect interval to 10 seconds (10,000 milliseconds)
s = natsOptions_SetReconnectWait(opts, 10000);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);static natsStatus
sigHandler(
char **customErrTxt,
unsigned char **signature,
int *signatureLength,
const char *nonce,
void *closure)
{
// Sign the given `nonce` and return the signature as `signature`.
// This needs to allocate memory. The length of the signature is
// returned as `signatureLength`.
// If an error occurs the user can return specific error text through
// `customErrTxt`. The library will free this pointer.
return NATS_OK;
}
(...)
natsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
const char *pubKey = "my public key......";
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetNKey(opts, pubKey, sigHandler, NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);// seed should be stored and treated like a secret
const seed = new TextEncoder().encode(
"SUAEL6GG2L2HIF7DUGZJGMRUFKXELGGYFMHF76UO2AYBG3K4YLWR3FKC2Q",
);
const nc = await connect({
port: ns.port,
authenticator: nkeyAuthenticator(seed),
});nc = NATS()
async def error_cb(e):
print("Error:", e)
await nc.connect("nats://localhost:4222",
nkeys_seed="./path/to/nkeys/user.nk",
error_cb=error_cb,
)
# Do something with the connection
await nc.close()// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var client = new NatsClient(new NatsOpts
{
Url = "127.0.0.1",
Name = "API NKey Example",
AuthOpts = new NatsAuthOpts
{
NKeyFile = "/path/to/nkeys/user.nk"
}
});{
"name": "SOURCE_TARGET",
"subjects": [
"foo1.ext.*",
"foo2.ext.*"
],
"discard": "old",
"duplicate_window": 120000000000,
"sources": [
{
"name": "SOURCE1_ORIGIN",
},
],
"deny_delete": false,
"sealed": false,
"max_msg_size": -1,
"allow_rollup_hdrs": false,
"max_bytes": -1,
"storage": "file",
"allow_direct": false,
"max_age": 0,
"max_consumers": -1,
"max_msgs_per_subject": -1,
"num_replicas": 1,
"name": "SOURCE_TARGET",
"deny_purge": false,
"compression": "none",
"max_msgs": -1,
"retention": "limits",
"mirror_direct": false
}{
"name": "SOURCE_TARGET",
"subjects": [
"foo1.ext.*",
"foo2.ext.*"
],
"discard": "old",
"duplicate_window": 120000000000,
"sources": [
{
"name": "SOURCE1_ORIGIN",
"filter_subject": "foo1.bar",
"opt_start_seq": 42,
"external": {
"deliver": "",
"api": "$JS.domainA.API"
}
},
{
"name": "SOURCE2_ORIGIN",
"filter_subject": "foo2.bar"
}
],
"consumer_limits": {
},
"deny_delete": false,
"sealed": false,
"max_msg_size": -1,
"allow_rollup_hdrs": false,
"max_bytes": -1,
"storage": "file",
"allow_direct": false,
"max_age": 0,
"max_consumers": -1,
"max_msgs_per_subject": -1,
"num_replicas": 1,
"name": "SOURCE_TARGET",
"deny_purge": false,
"compression": "none",
"max_msgs": -1,
"retention": "limits",
"mirror_direct": false
}{
"name": "MIRROR_TARGET"
"discard": "old",
"mirror": {
"name": "MIRROR_ORIGIN"
},
"deny_delete": false,
"sealed": false,
"max_msg_size": -1,
"allow_rollup_hdrs": false,
"max_bytes": -1,
"storage": "file",
"allow_direct": false,
"max_age": 0,
"max_consumers": -1,
"max_msgs_per_subject": -1,
"num_replicas": 1,
"name": "MIRROR_TARGET",
"deny_purge": false,
"compression": "none",
"max_msgs": -1,
"retention": "limits",
"mirror_direct": false
}{
"name": "MIRROR_TARGET"
"discard": "old",
"mirror": {
"opt_start_time": "2024-07-11T08:57:20.4441646Z",
"external": {
"deliver": "",
"api": "$JS.domainB.API"
},
"name": "MIRROR_ORIGIN"
},
"consumer_limits": {
},
"deny_delete": false,
"sealed": false,
"max_msg_size": -1,
"allow_rollup_hdrs": false,
"max_bytes": -1,
"storage": "file",
"allow_direct": false,
"max_age": 0,
"max_consumers": -1,
"max_msgs_per_subject": -1,
"num_replicas": 1,
"name": "MIRROR_TARGET",
"deny_purge": false,
"compression": "none",
"max_msgs": -1,
"retention": "limits",
"mirror_direct": false
}go install github.com/nats-io/nats-top@latestsudo -E go install github.com/nats-io/nats-topnats-server -m 8222nats-topnats-server version 0.6.6 (uptime: 2m2s)
Server:
Load: CPU: 0.0% Memory: 6.3M Slow Consumers: 0
In: Msgs: 0 Bytes: 0 Msgs/Sec: 0.0 Bytes/Sec: 0
Out: Msgs: 0 Bytes: 0 Msgs/Sec: 0.0 Bytes/Sec: 0
Connections: 0
HOST CID SUBS PENDING MSGS_TO MSGS_FROM BYTES_TO BYTES_FROM LANG VERSIONnats-server version 0.6.6 (uptime: 30m51s)
Server:
Load: CPU: 0.0% Memory: 10.3M Slow Consumers: 0
In: Msgs: 56 Bytes: 302 Msgs/Sec: 0.0 Bytes/Sec: 0
Out: Msgs: 98 Bytes: 512 Msgs/Sec: 0.0 Bytes/Sec: 0
Connections: 3
HOST CID SUBS PENDING MSGS_TO MSGS_FROM BYTES_TO BYTES_FROM LANG VERSION
::1:58651 6 1 0 52 0 260 0 go 1.1.0
::1:58922 38 1 0 21 0 105 0 go 1.1.0
::1:58953 39 1 0 21 0 105 0 go 1.1.0nats-server version 0.6.6 (uptime: 45m40s)
Server:
Load: CPU: 0.0% Memory: 10.4M Slow Consumers: 0
In: Msgs: 81 Bytes: 427 Msgs/Sec: 0.0 Bytes/Sec: 0
Out: Msgs: 154 Bytes: 792 Msgs/Sec: 0.0 Bytes/Sec: 0
sort by [bytes_to]:
Connections: 3
HOST CID SUBS PENDING MSGS_TO MSGS_FROM BYTES_TO BYTES_FROM LANG VERSION
::1:59259 83 1 0 4 0 20 0 go 1.1.0
::1:59349 91 1 0 2 0 10 0 go 1.1.0
::1:59342 90 1 0 0 0 0 0 go 1.1.0nats-server version 0.6.6 (uptime: 1h2m23s)
Server:
Load: CPU: 0.0% Memory: 10.4M Slow Consumers: 0
In: Msgs: 108 Bytes: 643 Msgs/Sec: 0.0 Bytes/Sec: 0
Out: Msgs: 185 Bytes: 1.0K Msgs/Sec: 0.0 Bytes/Sec: 0
Connections: 3
HOST CID SUBS PENDING MSGS_TO MSGS_FROM BYTES_TO BYTES_FROM LANG VERSION SUBSCRIPTIONS
::1:59708 115 1 0 6 0 48 0 go 1.1.0 foo.bar
::1:59758 122 1 0 1 0 8 0 go 1.1.0 foo
::1:59817 124 1 0 0 0 0 0 go 1.1.0 foonats-top -n 1 -sort subsnats-server version 0.6.6 (uptime: 1h7m0s)
Server:
Load: CPU: 0.0% Memory: 10.4M Slow Consumers: 0
In: Msgs: 109 Bytes: 651 Msgs/Sec: 0.0 Bytes/Sec: 0
Out: Msgs: 187 Bytes: 1.0K Msgs/Sec: 0.0 Bytes/Sec: 0
Connections: 3
HOST CID SUBS PENDING MSGS_TO MSGS_FROM BYTES_TO BYTES_FROM LANG VERSION
::1:59708 115 1 0 6 0 48 0 go 1.1.0[WRN] Dropping messages due to excessive stream ingest rate on 'account' > 'my-stream': IPQ len limit reachedjetstream {
max_buffered_msgs: 50000
max_buffered_size: 256mib
}servers := []string{"nats://127.0.0.1:1222", "
Options options = new Options.Builder()
const nc = await connect({
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Just to not collide using the demo server with other users.
subject := nats.NewInbox()
if err := nc.Publish(subject, []byte("All is Well")); err
log.Fatal(err)
}
// Sends a PING and wait for a PONG from the server, up to the given timeout.
// This gives guarantee that the server has processed the above message.
if err := nc.FlushTimeout(time.Second); err != nil {
log.Fatal(err)
}Connection nc = Nats.connect("nats://demo.nats.io:4222");
nc.publish("updates", "All is Well".getBytes(StandardCharsets.UTF_8));
nc.flush(Duration.ofSeconds(1)); // Flush the message queue
nc.close();const nc = await connect({ servers:
nc = NATS()
await nc.connect(servers
// Connection is AutoCloseable
try (Connection nc = Nats.connect("nats://demo.nats.io:4222")) {
// Do something with the connection
}// If connecting to the default port, the URL can be simplified
// to just the hostname/IP.
// That is, the connect below is equivalent to:
// nats.Connect("nats://demo.nats.io:4222")
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connection nc = Nats.connect("nats://demo.nats.io:4222");nc, err := nats.Connect("demo.nats.io
nc, err := nats.Connect("demo.nats.io
nc = NATS()
await nc.connect(servers=[
"nats://127.0.0.1:1222",
"nats://127.0.0.1:1223",
"nats://127.0.0.1:1224"
])
# Do something with the connection
await nc.close()// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient("nats://127.0.0.1:1222,nats://127.0.0.1:1223,nats://127.0.0.1:1224");
// It's optional to call ConnectAsync()
// as it will be called when needed automatically
await client.ConnectAsync();require 'nats/client'
NATS.start(servers: ["nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"]) do |nc|
# Do something with the connection
# Close the connection
nc.close
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
const char *servers[] = {"nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"};
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetServers(opts, servers, 3);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
await client.PublishAsync("updates", "All is well");
// Sends a PING and wait for a PONG from the server.
// This gives a guarantee that the server has processed the above message
// since the underlining TCP connection sends and receives messages in order.
await client.PingAsync();require 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
nc.subscribe("updates") do |msg|
puts msg
end
nc.publish("updates", "All is Well")
nc.flush do
# Sends a PING and wait for a PONG from the server, up to the given timeout.
# This gives guarantee that the server has processed above message at this point.
end
endnatsConnection *conn = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Send a request and wait for up to 1 second
if (s == NATS_OK)
s = natsConnection_PublishString(conn, "foo", "All is Well");
// Sends a PING and wait for a PONG from the server, up to the given timeout.
// This gives guarantee that the server has processed the above message.
if (s == NATS_OK)
s = natsConnection_FlushTimeout(conn, 1000);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);const start = Date.now();
nc.flush().then(() => {
t.log("round trip completed in", Date.now() - start, "ms");
});nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
await nc.publish("updates", b'All is Well')
# Sends a PING and wait for a PONG from the server, up to the given timeout.
# This gives guarantee that the server has processed above message.
await nc.flush(timeout=1)// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient("nats://demo.nats.io:4222");
// It's optional to call ConnectAsync()
// as it will be called when needed automatically
await client.ConnectAsync();require 'nats/client'
NATS.start(servers: ["nats://demo.nats.io:4222"]) do |nc|
# Do something with the connection
# Close the connection
nc.close
endnatsConnection *conn = NULL;
natsStatus s;
// If connecting to the default port, the URL can be simplified
// to just the hostname/IP.
// That is, the connect below is equivalent to:
// natsConnection_ConnectTo(&conn, "nats://demo.nats.io:4222");
s = natsConnection_ConnectTo(&conn, "demo.nats.io");
if (s != NATS_OK)
// handle error
// Destroy connection, no-op if conn is NULL.
natsConnection_Destroy(conn);Connection nc = Nats.connect("nats://demo.nats.io:4222");
Dispatcher d = nc.createDispatcher((msg) -> {
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
});
// Sync Subscription have an unsubscribe API
Subscription sub = nc.subscribe("updates");
sub.unsubscribe();
// Async Subscriptions on the dispatcher must unsubscribe via the dispatcher,
// not the subscription
d.subscribe("updates");
d.unsubscribe("updates");
// Close the connection
nc.close();const sc = StringCodec();
// set up a subscription to process a request
const sub = nc.subscribe(createInbox(), (_err, m) => {
m.respond(sc.encode(new Date().toLocaleTimeString()));
});
// without arguments the subscription will cancel when the server receives it
// you can also specify how many messages are expected by the subscription
sub.unsubscribe();nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
future = asyncio.Future()
async def cb(msg):
nonlocal future
future.set_result(msg)
sub = await nc.subscribe("updates", cb=cb)
await nc.publish("updates", b'All is Well')
# Remove interest in subject
await sub.unsubscribe()
# Won't be received...
await nc.publish("updates", b'...')// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
// Cancel the subscription after 10 seconds
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
// Subscribe to the "updates" subject
// We unsubscribe when we receive the message "exit"
// or when the cancellation token is triggered.
await foreach (var msg in client.SubscribeAsync<string>("updates").WithCancellation(cts.Token))
{
Console.WriteLine($"Received: {msg.Data}");
if (msg.Data == "exit")
{
// When we exit the loop, we unsubscribe from the subject
break;
}
}
Console.WriteLine("Unsubscribed from updates");require 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
Fiber.new do
f = Fiber.current
sid = nc.subscribe("time") do |msg, reply|
f.resume Time.now
end
nc.publish("time", 'What is the time?', NATS.create_inbox)
# Use the response
msg = Fiber.yield
puts "Reply: #{msg}"
nc.unsubscribe(sid)
# Won't be received
nc.publish("time", 'What is the time?', NATS.create_inbox)
end.resume
endclass StockForJsonPub {
public String symbol;
public float price;
}
public class PublishJSON {
public static void main(String[] args) {
try {
Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Create the data object
StockForJsonPub stk = new StockForJsonPub();
stk.symbol="GOOG";
stk.price=1200;
// use Gson to encode the object to JSON
GsonBuilder builder = new GsonBuilder();
Gson gson = builder.create();
String json = gson.toJson(stk);
// Publish the message
nc.publish("updates", json.getBytes(StandardCharsets.UTF_8));
// Make sure the message goes through before we close
nc.flush(Duration.ZERO);
nc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}nc.publish("updates", JSON.stringify({ ticker: "GOOG", price: 2868.87 }));nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
await nc.publish("updates", json.dumps({"symbol": "GOOG", "price": 1200 }).encode())// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
using var cts = new CancellationTokenSource();
Task process = Task.Run(async () =>
{
// Let's deserialize the message as a UTF-8 string to see
// the published serialized output in the console
await foreach (var msg in client.SubscribeAsync<string>("updates", cancellationToken: cts.Token))
{
Console.WriteLine($"Received: {msg.Data}");
}
});
// Wait for the subscription task to be ready
await Task.Delay(1000);
var stock = new Stock { Symbol = "MSFT", Price = 123.45 };
// The default serializer uses System.Text.Json to serialize the object
await client.PublishAsync<Stock>("updates", stock);
// Define the object
public record Stock {
public string Symbol { get; set; }
public double Price { get; set; }
}
// Output:
// Received: {"Symbol":"MSFT","Price":123.45}require 'nats/client'
require 'json'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
nc.publish("updates", {"symbol": "GOOG", "price": 1200}.to_json)
endnats-server --auth mytokenZ09-_// Set a token
nc, err := nats.Connect("127.0.0.1", nats.Name("API Token Example"), nats.Token("mytoken"))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionOptions options = new Options.Builder()
.server("nats://demo.nats.io:4222")
.token("mytoken") // Set a token
.build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close();const nc = await connect({
port: ns.port,
token: "aToK3n",
});// Connection event handlers are invoked asynchronously
// and the state of the connection may have changed when
// the callback is invoked.
nc, err := nats.Connect("demo.nats.io",
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
// handle disconnect error event
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
// handle reconnect event
}))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionclass MyConnectionListener implements ConnectionListener {
public void connectionEvent(Connection conn, Events event) {
switch (event) {
case CONNECTED:
// The connection has successfully completed the handshake with the nats-server.
break;
case CLOSED:
// The connection is permanently closed, either by manual action or failed reconnects
break;
case DISCONNECTED:
// The connection lost its connection, but may try to reconnect if configured to
break;
case RECONNECTED:
// The connection was connected, lost its connection and successfully reconnected.
break;
case RESUBSCRIBED:
// The connection was reconnected and the server has been notified of all subscriptions.
break;
case DISCOVERED_SERVERS:
// The connection was made aware of new servers from the current server connection.
break;
case LAME_DUCK:
// connected server is coming down soon, might want to prepare for it
break;
}
}
}
MyConnectionListener listener = new MyConnectionListener();
Options options = new Options.Builder()
.server("nats://demo.nats.io:4222")
.connectionListener(listener)
.build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close();nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Subscribe
sub, err := nc.SubscribeSync("time")
if err != nil {
log.Fatal(err)
}
// Read a message
msg, err := sub.NextMsg(10 * time.Second)
if err != nil {
log.Fatal(err)
}
// Get the time
timeAsBytes := []byte(time.Now().String())
// Send the time as the response.
msg.Respond(timeAsBytes)time.us
time.us.east
time.us.east.atlanta
time.eu.east
time.eu.east.warsawfactory1.tools.group42.unit17service.deploy.server-acme.app123orders.online.store123.order171711orders.online.us.server42.ccpayment.premium.store123.electronics.deliver-dhl.order171711.createtime.us
time.us2.east1
time.new-york
time.SanFranciscolocation.Malmö
$location.Stockholm
_Subjects_.mysubjectall*data
<my_stream>
service.stream.1//Java
Options options = Options.builder()
.server("nats://127.0.0.1:4222")
.pedantic()
.build();
Connection nc = Nats.connect(options)natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Subscribe
if (s == NATS_OK)
s = natsConnection_SubscribeSync(&sub, conn, "updates");
// Unsubscribe
if (s == NATS_OK)
s = natsSubscription_Unsubscribe(sub);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);// Structured data is not configurable in C NATS Client.nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"], token="mytoken")
# Do something with the connection.// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var client = new NatsClient(new NatsOpts
{
Url = "127.0.0.1",
Name = "API Token Example",
AuthOpts = new NatsAuthOpts
{
Token = "mytoken"
}
});NATS.start(token: "mytoken") do |nc|
puts "Connected using token"
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetToken(opts, "mytoken");
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);// Token in URL
nc, err := nats.Connect("mytoken@localhost")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionConnection nc = Nats.connect("nats://mytoken@localhost:4222");//Token in URL
// Do something with the connection
nc.close(); // JavaScript doesn't support tokens in urls use the `token` optionnc = NATS()
await nc.connect(servers=["nats://[email protected]:4222"])
# Do something with the connection.// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var client = new NatsClient(new NatsOpts
{
// .NET client doesn't support tokens in URLs
// use Token option instead.
AuthOpts = new NatsAuthOpts
{
Token = "mytoken"
}
});NATS.start("[email protected]:4222") do |nc|
puts "Connected using token!"
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetURL(opts, "nats://[email protected]:4222");
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);const nc = await connect({
maxReconnectAttempts: 10,
servers: ["demo.nats.io"],
});
(async () => {
for await (const s of nc.status()) {
switch (s.type) {
case Events.Reconnect:
t.log(`client reconnected - ${s.data}`);
break;
default:
}
}
})().then();
});nc = NATS()
async def disconnected_cb():
print("Got disconnected!")
async def reconnected_cb():
# See who we are connected to on reconnect.
print("Got reconnected to {url}".format(url=nc.connected_url.netloc))
await nc.connect(
servers=["nats://demo.nats.io:4222"],
reconnect_time_wait=10,
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
)
# Do something with the connection.// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
client.Connection.ConnectionDisconnected += async (sender, args) =>
{
Console.WriteLine($"Disconnected: {args.Message}");
};
client.Connection.ConnectionOpened += async (sender, args) =>
{
Console.WriteLine($"Connected: {args.Message}");
};
client.Connection.ReconnectFailed += async (sender, args) =>
{
Console.WriteLine($"Reconnect Failed: {args.Message}");
};
await client.ConnectAsync();require 'nats/client'
NATS.start(servers: ["nats://127.0.0.1:1222", "nats://127.0.0.1:1223", "nats://127.0.0.1:1224"]) do |nc|
# Do something with the connection
nc.on_reconnect do
puts "Got reconnected to #{nc.connected_server}"
end
nc.on_disconnect do |reason|
puts "Got disconnected! #{reason}"
end
endstatic void
disconnectedCB(natsConnection *conn, void *closure)
{
// Handle disconnect error event
}
static void
reconnectedCB(natsConnection *conn, void *closure)
{
// Handle reconnect event
}
(...)
natsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
// Connection event handlers are invoked asynchronously
// and the state of the connection may have changed when
// the callback is invoked.
if (s == NATS_OK)
s = natsOptions_SetDisconnectedCB(opts, disconnectedCB, NULL);
if (s == NATS_OK)
s = natsOptions_SetReconnectedCB(opts, reconnectedCB, NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Subscribe to the "time" subject and reply with the current time
Subscription sub = nc.subscribe("time");
// Read a message
Message msg = sub.nextMessage(Duration.ZERO);
// Get the time
Calendar cal = Calendar.getInstance();
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
byte[] timeAsBytes = sdf.format(cal.getTime()).getBytes(StandardCharsets.UTF_8);
// Send the time to the reply to subject
nc.publish(msg.getReplyTo(), timeAsBytes);
// Flush and close the connection
nc.flush(Duration.ZERO);
nc.close();const sc = StringCodec();
// set up a subscription to process a request
const sub = nc.subscribe("time");
for await (const m of sub) {
m.respond(sc.encode(new Date().toLocaleDateString()));
}nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
future = asyncio.Future()
async def cb(msg):
nonlocal future
future.set_result(msg)
await nc.subscribe("time", cb=cb)
await nc.publish_request("time", new_inbox(), b'What is the time?')
await nc.flush()
# Read the message
msg = await asyncio.wait_for(future, 1)
# Send the time
time_as_bytes = "{}".format(datetime.now()).encode()
await nc.publish(msg.reply, time_as_bytes)// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
// Subscribe to the "time" subject and reply with the current time
await foreach (var msg in client.SubscribeAsync<string>("time"))
{
await msg.ReplyAsync(DateTime.Now);
}require 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
Fiber.new do
f = Fiber.current
nc.subscribe("time") do |msg, reply|
f.resume Time.now
end
nc.publish("time", 'What is the time?', NATS.create_inbox)
# Use the response
msg = Fiber.yield
puts "Reply: #{msg}"
end.resume
endnatsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Subscribe
if (s == NATS_OK)
s = natsConnection_SubscribeSync(&sub, conn, "time");
// Wait for messages
if (s == NATS_OK)
s = natsSubscription_NextMsg(&msg, sub, 10000);
if (s == NATS_OK)
{
char buf[64];
snprintf(buf, sizeof(buf), "%lld", nats_Now());
// Send the time as a response
s = natsConnection_Publish(conn, natsMsg_GetReply(msg), buf, (int) strlen(buf));
// Destroy message that was received
natsMsg_Destroy(msg);
}
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);nc, err := nats.Connect("demo.nats.io
Connection nc = Nats.connect("nats://demo.nats.io:4222"
nc, err := nats.Connect("demo.nats.io
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Use a WaitGroup to wait for 10 messages to arrive
wg := sync.WaitGroup{}
wg.Add(10)
// Create a queue subscription on "updates" with queue name "workers"
if _, err := nc.QueueSubscribe("updates", "workers", func(m *nats.Msg) {
wg.Done()
}); err != nil {
log.Fatal(err)
}
// Wait for messages to come in
wg.Wait()Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for 10 messages to arrive
CountDownLatch latch = new CountDownLatch(10);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
latch.countDown();
});
// Subscribe to the "updates" subject with a queue group named "workers"
d.subscribe("updates", "workers");
// Wait for a message to come in
latch.await();
// Close the connection
nc.close();const sc = StringCodec();
// this is an example of a callback subscription
// https://github.com/nats-io/nats.js/blob/master/README.md#async-vs-callbacks
nc.subscribe("updates", {
callback: (err, msg) => {
if (err) {
t.error(err.message);
} else {
t.log(sc.decode(msg.data));
}
},
max: 1,
});
// here's an iterator subscription - note the code in the
// for loop will block until the iterator completes
// either from a break/return from the iterator, an
// unsubscribe after the message arrives, or in this case
// an auto-unsubscribe after the first message is received
const sub = nc.subscribe("updates", { max: 1 });
for await (const m of sub) {
t.log(sc.decode(m.data));
}
// subscriptions have notifications, simply wait
// the closed promise
sub.closed
.then(() => {
t.log("subscription closed");
})
.catch((err) => {
t.err(`subscription closed with an error ${err.message}`);
});AckWaitnc.subscribe(subj, {
queue: "workers",
callback: (_err, _msg) => {
t.log("worker1 got message");
},
});
nc.subscribe(subj, {
queue: "workers",
callback: (_err, _msg) => {
t.log("worker2 got message");
},
});nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
future = asyncio.Future()
async def cb(msg):
nonlocal future
future.set_result(msg)
await nc.subscribe("updates", queue="workers", cb=cb)
await nc.publish("updates", b'All is Well')
msg = await asyncio.wait_for(future, 1)
print("Msg", msg)// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
var count = 0;
// Subscribe to the "updates" subject with a queue group named "workers"
await foreach (var msg in client.SubscribeAsync<string>(subject: "updates", queueGroup: "workers"))
{
Console.WriteLine($"Received {++count}: {msg.Subject}: {msg.Data}");
// Break after 10 messages
if (count == 10)
{
break;
}
}
Console.WriteLine("Done");require 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
Fiber.new do
f = Fiber.current
nc.subscribe("updates", queue: "worker") do |msg, reply|
f.resume Time.now
end
nc.publish("updates", "A")
# Use the response
msg = Fiber.yield
puts "Msg: #{msg}"
end.resume
endstatic void
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Need to destroy the message!
natsMsg_Destroy(msg);
}
(...)
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsStatus s;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Create a queue subscription on "updates" with queue name "workers"
if (s == NATS_OK)
s = natsConnection_QueueSubscribe(&sub, conn, "updates", "workers", onMsg, NULL);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
future = asyncio.Future()
async def cb(msg):
nonlocal future
future.set_result(msg)
await nc.subscribe("updates", cb=cb)
await nc.publish("updates", b'All is Well')
await nc.flush()
# Wait for message to come in
msg = await asyncio.wait_for(future, 1)// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
// Subscribe to the "updates" subject and receive messages as <string> type.
// The default serializer understands all primitive types, strings,
// byte arrays, and uses JSON for complex types.
await foreach (var msg in client.SubscribeAsync<string>("updates"))
{
Console.WriteLine($"Received: {msg.Data}");
if (msg.Data == "exit")
{
// When we exit the loop, we unsubscribe from the subject
// as a result of enumeration completion.
break;
}
}require 'nats/client'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
nc.subscribe("updates") do |msg|
puts msg
nc.close
end
nc.publish("updates", "All is Well")
endstatic void
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Need to destroy the message!
natsMsg_Destroy(msg);
}
(...)
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsStatus s;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
if (s == NATS_OK)
{
// Creates an asynchronous subscription on subject "foo".
// When a message is sent on subject "foo", the callback
// onMsg() will be invoked by the client library.
// You can pass a closure as the last argument.
s = natsConnection_Subscribe(&sub, conn, "foo", onMsg, NULL);
}
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);Connection nc = Nats.connect("nats://demo.nats.io:4222");
Dispatcher d = nc.createDispatcher((msg) -> {
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
});
/ subscribe then unsubscribe after 10 "more" messages
// It's technically possible to get more than 10 total if messages are already in
// flight by the time the server receives the unsubscribe message
// Sync Subscription,
Subscription sub = nc.subscribe("updates");
sub.unsubscribe(10);
// Async Subscription directly in the dispatcher
d.subscribe("updates");
d.unsubscribe("updates", 10);
// Close the connection
nc.close();const sc = StringCodec();
// `max` specifies the number of messages that the server will forward.
// The server will auto-cancel.
const subj = createInbox();
const sub1 = nc.subscribe(subj, {
callback: (_err, msg) => {
t.log(`sub1 ${sc.decode(msg.data)}`);
},
max: 10,
});
// another way after 10 messages
const sub2 = nc.subscribe(subj, {
callback: (_err, msg) => {
t.log(`sub2 ${sc.decode(msg.data)}`);
},
});
// if the subscription already received 10 messages, the handler
// won't get any more messages
sub2.unsubscribe(10);nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
async def cb(msg):
print(msg)
sid = await nc.subscribe("updates", cb=cb)
await nc.auto_unsubscribe(sid, 1)
await nc.publish("updates", b'All is Well')
# Won't be received...
await nc.publish("updates", b'...')// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var client = new NatsClient();
// Unsubscribe after 10 messages
var opts = new NatsSubOpts { MaxMsgs = 10 };
var count = 0;
// Subscribe to updates with options
await foreach (var msg in client.SubscribeAsync<string>("updates", opts: opts))
{
Console.WriteLine($"Received[{++count}]: {msg.Data}");
}
Console.WriteLine("Unsubscribed from updates");require 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
Fiber.new do
f = Fiber.current
nc.subscribe("time", max: 1) do |msg, reply|
f.resume Time.now
end
nc.publish("time", 'What is the time?', NATS.create_inbox)
# Use the response
msg = Fiber.yield
puts "Reply: #{msg}"
# Won't be received
nc.publish("time", 'What is the time?', NATS.create_inbox)
end.resume
endnatsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Subscribe
if (s == NATS_OK)
s = natsConnection_SubscribeSync(&sub, conn, "updates");
// Unsubscribe after 1 message is received
if (s == NATS_OK)
s = natsSubscription_AutoUnsubscribe(sub, 1);
// Wait for messages
if (s == NATS_OK)
s = natsSubscription_NextMsg(&msg, sub, 10000);
if (s == NATS_OK)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Destroy message that was received
natsMsg_Destroy(msg);
}
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);nats account infoAccount Information
User:
Account: $G
Expires: never
Client ID: 5
Client IP: 127.0.0.1
RTT: 128µs
Headers Supported: true
Maximum Payload: 1.0 MiB
Connected URL: nats://127.0.0.1:4222
Connected Address: 127.0.0.1:4222
Connected Server ID: NAMR7YBNZA3U2MXG2JH3FNGKBDVBG2QTMWVO6OT7XUSKRINKTRFBRZEC
Connected Server Version: 2.11.0-dev
TLS Connection: no
JetStream Account Information:
Account Usage:
Storage: 0 B
Memory: 0 B
Streams: 0
Consumers: 0
Account Limits:
Max Message Payload: 1.0 MiB
Tier: Default:
Configuration Requirements:
Stream Requires Max Bytes Set: false
Consumer Maximum Ack Pending: Unlimited
Stream Resource Usage Limits:
Memory: 0 B of Unlimited
Memory Per Stream: Unlimited
Storage: 0 B of Unlimited
Storage Per Stream: Unlimited
Streams: 0 of Unlimited
Consumers: 0 of UnlimitedJetStream Account Information:
JetStream is not supported in this accountnats stream add my_stream? Subjects foo
? Storage file
? Replication 1
? Retention Policy Limits
? Discard Policy Old
? Stream Messages Limit -1
? Per Subject Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 2m0s
? Allow message Roll-ups No
? Allow message deletion Yes
? Allow purging subjects or the entire stream Yes
Stream my_stream was created
Information for Stream my_stream created 2024-06-07 12:29:36
Subjects: foo
Replicas: 1
Storage: File
Options:
Retention: Limits
Acknowledgments: true
Discard Policy: Old
Duplicate Window: 2m0s
Direct Get: true
Allows Msg Delete: true
Allows Purge: true
Allows Rollups: false
Limits:
Maximum Messages: unlimited
Maximum Per Subject: unlimited
Maximum Bytes: unlimited
Maximum Age: unlimited
Maximum Message Size: unlimited
Maximum Consumers: unlimited
State:
Messages: 0
Bytes: 0 B
First Sequence: 0
Last Sequence: 0
Active Consumers: 0nats stream info my_streamInformation for Stream my_stream created 2024-06-07 12:29:36
Subjects: foo
Replicas: 1
Storage: File
Options:
Retention: Limits
Acknowledgments: true
Discard Policy: Old
Duplicate Window: 2m0s
Direct Get: true
Allows Msg Delete: true
Allows Purge: true
Allows Rollups: false
Limits:
Maximum Messages: unlimited
Maximum Per Subject: unlimited
Maximum Bytes: unlimited
Maximum Age: unlimited
Maximum Message Size: unlimited
Maximum Consumers: unlimited
State:
Messages: 0
Bytes: 0 B
First Sequence: 0
Last Sequence: 0
Active Consumers: 0nats pub foo --count=1000 --sleep 1s "publication #{{.Count}} @ {{.TimeStamp}}"nats consumer add? Consumer name pull_consumer
? Delivery target (empty for Pull Consumers)
? Start policy (all, new, last, subject, 1h, msg sequence) all
? Acknowledgment policy explicit
? Replay policy instant
? Filter Stream by subjects (blank for all)
? Maximum Allowed Deliveries -1
? Maximum Acknowledgments Pending 0
? Deliver headers only without bodies No
? Add a Retry Backoff Policy No
? Select a Stream my_stream
Information for Consumer my_stream > pull_consumer created 2024-06-07T12:32:09-05:00
Configuration:
Name: pull_consumer
Pull Mode: true
Deliver Policy: All
Ack Policy: Explicit
Ack Wait: 30.00s
Replay Policy: Instant
Max Ack Pending: 1,000
Max Waiting Pulls: 512
State:
Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
Outstanding Acks: 0 out of maximum 1,000
Redelivered Messages: 0
Unprocessed Messages: 74
Waiting Pulls: 0 of maximum 512nats consumer next my_stream pull_consumer --count 1000nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Create a unique subject name for replies.
uniqueReplyTo := nats.NewInbox()
// Listen for a single response
sub, err := nc.SubscribeSync(uniqueReplyTo)
if err != nil {
log.Fatal(err)
}
// Send the request.
// If processing is synchronous, use Request() which returns the response message.
if err := nc.PublishRequest("time", uniqueReplyTo, nil); err
log.Fatal(err)
}
// Read the reply
msg, err := sub.NextMsg(time.Second)
if err != nil {
log.Fatal(err)
}
// Use the response
log.Printf("Reply: %s", msg.Data)nc, err := nats.Connect("demo.nats.io
Connection nc = Nats.connect("nats://demo.nats.io:4222");
// set up a listener for "time" requests
Dispatcher d = nc.createDispatcher(msg -> {
System.out.println("Received time request");
nc.publish(msg.getReplyTo(), ("" + System.currentTimeMillis()).getBytes());
});
d.subscribe("time");
// make a subject for replies and subscribe to that
String replyToThis = NUID.nextGlobal();
Subscription sub = nc.subscribe(replyToThis);
// publish to the "time" subject with reply-to subject that was set up
nc.publish("time", replyToThis, null);
// wait for a response
Message msg = sub.nextMessage(1000);
// look at the response
long time = Long.parseLong(new String(msg.getData()));
System.out.println(new Date(time));
nc.close();// set up a subscription to process the request
const sc = StringCodec();
nc.subscribe("time", {
callback: (_err, msg) => {
msg.respond(sc.encode(new Date().toLocaleTimeString()));
},
});
// create a subscription subject that the responding send replies to
const inbox = createInbox();
const sub = nc.subscribe(inbox, {
max: 1,
callback: (_err, msg) => {
t.log(`the time is ${sc.decode(msg.data)}`);
},
});
nc.publish("time", Empty, { reply: inbox });nc = NATS()
future = asyncio.Future()
async def sub(msg):
nonlocal future
future.set_result(msg)
await nc.connect(servers=["nats://demo.nats.io:4222"])
await nc.subscribe("time", cb=sub)
unique_reply_to = nc.new_inbox()
await nc.publish("time", b'', unique_reply_to)
# Use the response
msg = await asyncio.wait_for(future, 1)
print("Reply:", msg)// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var client = new NatsClient();
await client.ConnectAsync();
// Create a new inbox for the subscription subject
string inbox = client.Connection.NewInbox();
// Use core API to subscribe to have a more fine-grained control over
// the subscriptions. We use <string> as the type, but we are not
// really interested in the message payload.
await using INatsSub<string> timeSub
= await client.Connection.SubscribeCoreAsync<string>("time");
Task responderTask = Task.Run(async () =>
{
await foreach (var msg in timeSub.Msgs.ReadAllAsync())
{
// The default serializer uses StandardFormat with Utf8Formatter
// when formatting DateTimeOffset types.
await msg.ReplyAsync<DateTimeOffset>(DateTimeOffset.UtcNow);
}
});
// Subscribe to the inbox with the expected type of the response
await using INatsSub<DateTimeOffset> inboxSub
= await client.Connection.SubscribeCoreAsync<DateTimeOffset>(inbox);
// The default serializer uses UTF-8 encoding for strings
await client.PublishAsync(subject: "time", replyTo: inbox);
// Read the response from subscription message channel reader
NatsMsg<DateTimeOffset> reply = await inboxSub.Msgs.ReadAsync();
// Print the current time in RFC1123 format taking advantage of the
// DateTimeOffset's formatting capabilities.
Console.WriteLine($"The current date and time is: {reply.Data:R}");
await inboxSub.UnsubscribeAsync();
await timeSub.UnsubscribeAsync();
// make sure the responder task is completed cleanly
await responderTask;
// Output:
// The current date and time is: Tue, 22 Oct 2024 12:21:09 GMTrequire 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
Fiber.new do
f = Fiber.current
nc.subscribe("time") do |msg, reply|
f.resume msg
end
nc.publish("time", 'example', NATS.create_inbox)
# Use the response
msg = Fiber.yield
puts "Reply: #{msg}"
end.resume
endnatsConnection *conn = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Publish a message and provide a reply subject
if (s == NATS_OK)
s = natsConnection_PublishRequestString(conn, "request", "reply", "this is the request");
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);class StockForJsonSub {
public String symbol;
public float price;
public String toString() {
return symbol + " is at " + price;
}
}
public class SubscribeJSON {
public static void main(String[] args) {
try {
Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for 10 messages to arrive
CountDownLatch latch = new CountDownLatch(10);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
Gson gson = new Gson();
String json = new String(msg.getData(), StandardCharsets.UTF_8);
StockForJsonSub stk = gson.fromJson(json, StockForJsonSub.class);
// Use the object
System.out.println(stk);
latch.countDown();
});
// Subscribe
d.subscribe("updates");
// Wait for a message to come in
latch.await();
// Close the connection
nc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}const sub = nc.subscribe(subj, {
callback: (_err, msg) => {
t.log(`${msg.json()}`);
},
max: 1,
});import asyncio
import json
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout
async def run(loop):
nc = NATS()
await nc.connect(servers=["nats://127.0.0.1:4222"], loop=loop)
async def message_handler(msg):
data = json.loads(msg.data.decode())
print(data)
sid = await nc.subscribe("updates", cb=message_handler)
await nc.flush()
await nc.auto_unsubscribe(sid, 2)
await nc.publish("updates", json.dumps({"symbol": "GOOG", "price": 1200 }).encode())
await asyncio.sleep(1, loop=loop)
await nc.close()// dotnet add package NATS.Net
using NATS.Net;
// NATS .NET has a built-in serializer that does the 'unsurprising' thing
// for most types. Most primitive types are serialized as expected.
// For any other type, JSON serialization is used. You can also provide
// your own serializers by implementing the INatsSerializer and
// INasSerializerRegistry interfaces. See also for more information:
// https://nats-io.github.io/nats.net/documentation/advanced/serialization.html
await using var nc = new NatsClient();
CancellationTokenSource cts = new();
// Subscribe for int, string, bytes, json
List<Task> tasks =
[
Task.Run(async () =>
{
await foreach (var msg in nc.SubscribeAsync<int>("x.int", cancellationToken: cts.Token))
{
Console.WriteLine($"Received int: {msg.Data}");
}
}),
Task.Run(async () =>
{
await foreach (var msg in nc.SubscribeAsync<string>("x.string", cancellationToken: cts.Token))
{
Console.WriteLine($"Received string: {msg.Data}");
}
}),
Task.Run(async () =>
{
await foreach (var msg in nc.SubscribeAsync<byte[]>("x.bytes", cancellationToken: cts.Token))
{
if (msg.Data != null)
{
Console.Write($"Received bytes: ");
foreach (var b in msg.Data)
{
Console.Write("0x{0:X2} ", b);
}
Console.WriteLine();
}
}
}),
Task.Run(async () =>
{
await foreach (var msg in nc.SubscribeAsync<MyData>("x.json", cancellationToken: cts.Token))
{
Console.WriteLine($"Received data: {msg.Data}");
}
}),
];
// Give the subscriber tasks some time to subscribe
await Task.Delay(1000);
await nc.PublishAsync<int>("x.int", 100);
await nc.PublishAsync<string>("x.string", "Hello, World!");
await nc.PublishAsync<byte[]>("x.bytes", new byte[] { 0x41, 0x42, 0x43 });
await nc.PublishAsync<MyData>("x.json", new MyData(30, "bar"));
await cts.CancelAsync();
await Task.WhenAll(tasks);
public record MyData(int Id, string Name);
// Output:
// Received int: 100
// Received bytes: 0x41 0x42 0x43
// Received string: Hello, World!
// Received data: MyData { Id = 30, Name = bar }
// See also for more information:
// https://nats-io.github.io/nats.net/documentation/advanced/serialization.htmlrequire 'nats/client'
require 'json'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
nc.subscribe("updates") do |msg|
m = JSON.parse(msg)
# {"symbol"=>"GOOG", "price"=>12}
p m
end
endnats-server --user myname --pass passwordnats server passwd? Enter password [? for help] **********************
? Reenter password [? for help] **********************
$2a$11$qbtrnb0mSG2eV55xoyPqHOZx/lLBlryHRhU3LK2oOPFRwGF/5rtGK// Set a user and plain text password
nc, err := nats.Connect("127.0.0.1", nats.UserInfo("myname", "password"))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionOptions options = new Options.Builder()
.server("nats://localhost:4222")
.userInfo("myname","password") // Set a user and plain text password
.build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close(); const nc = await connect({
port: ns.port,
user: "byname",
pass: "password",
});nats --help...
-s, --server=URL NATS server urls ($NATS_URL)
--user=USER Username or Token ($NATS_USER)
--password=PASSWORD Password ($NATS_PASSWORD)
--creds=FILE User credentials ($NATS_CREDS)
--nkey=FILE User NKEY ($NATS_NKEY)
--tlscert=FILE TLS public certificate ($NATS_CERT)
--tlskey=FILE TLS private key ($NATS_KEY)
--tlsca=FILE TLS certificate authority chain ($NATS_CA)
--socks-proxy=PROXY SOCKS5 proxy for connecting to NATS server
($NATS_SOCKS_PROXY)
--colors=SCHEME Sets a color scheme to use ($NATS_COLOR)
--timeout=DURATION Time to wait on responses from NATS
($NATS_TIMEOUT)
--context=NAME Configuration context ($NATS_CONTEXT)
...#!/bin/bash
echo "-n" "system user password: "
read -s NATS_PASSWORD
export NATS_PASSWORD
nats server report jetstream --user system{
"description": "",
"url": "nats://127.0.0.1:4222",
"token": "",
"user": "",
"password": "",
"creds": "",
"nkey": "",
"cert": "",
"key": "",
"ca": "",
"nsc": "",
"jetstream_domain": "",
"jetstream_api_prefix": "",
"jetstream_event_prefix": "",
"inbox_prefix": "",
"user_jwt": ""
}nats context save example --server nats://nats.example.net:4222 --description 'Example.Net Server'
nats context save local --server nats://localhost:4222 --description 'Local Host' --select nats context lsKnown contexts:
example Example.Net Server
local* Local Hostnats context selectnats rttnats://localhost:4222:
nats://127.0.0.1:4222: 245.115µs
nats://[::1]:4222: 390.239µsnats rtt --context examplenats://nats.example.net:4222:
nats://192.0.2.10:4222: 41.560815ms
nats://192.0.2.11:4222: 41.486609ms
nats://192.0.2.12:4222: 41.178009msnats context save example --description 'Example.Net Server' --nsc nsc://acme/orders/newnats server passwd? Enter password [? for help] **********************
? Reenter password [? for help] **********************
$2a$11$3kIDaCxw.Glsl1.u5nKa6eUnNDLV5HV9tIuUp7EHhMt6Nm9myW1aS authorization {
user: derek
password: $2a$11$3kIDaCxw.Glsl1.u5nKa6eUnNDLV5HV9tIuUp7EHhMt6Nm9myW1aS
}// Structured data is not configurable in C NATS Client.nc = NATS()
await nc.connect(servers=["nats://myname:[email protected]:4222"])
# Do something with the connection.// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var client = new NatsClient(new NatsOpts
{
Url = "nats://localhost:4222",
AuthOpts = new NatsAuthOpts
{
Username = "myname",
Password = "password",
}
});require 'nats/client'
NATS.start(servers:["nats://myname:[email protected]:4222"], name: "my-connection") do |nc|
nc.on_error do |e|
puts "Error: #{e}"
end
nc.on_reconnect do
puts "Got reconnected to #{nc.connected_server}"
end
nc.on_disconnect do |reason|
puts "Got disconnected! #{reason}"
end
nc.close
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetUserInfo(opts, "myname", "password");
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);// Set a user and plain text password
nc, err := nats.Connect("myname:[email protected]")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionConnection nc = Nats.connect("nats://myname:password@localhost:4222");
// Do something with the connection
nc.close();// JavaScript clients don't support username/password in urls use `user` and `pass` options.nc = NATS()
await nc.connect(servers=["nats://myname:[email protected]:4222"])
# Do something with the connection.// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var nc = new NatsClient(new NatsOpts
{
// .NET client doesn't support username/password in URLs
// use `Username` and `Password` options.
Url = "nats://demo.nats.io:4222",
AuthOpts = new NatsAuthOpts
{
Username = "myname",
Password = "password",
}
});require 'nats/client'
NATS.start(servers:["nats://myname:[email protected]:4222"], name: "my-connection") do |nc|
nc.on_error do |e|
puts "Error: #{e}"
end
nc.on_reconnect do
puts "Got reconnected to #{nc.connected_server}"
end
nc.on_disconnect do |reason|
puts "Got disconnected! #{reason}"
end
nc.close
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetURL(opts, "nats://myname:[email protected]:4222");
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);nc, err := nats.Connect("localhost
// This examples requires certificates to be in the java keystore format (.jks).
// To do so openssl is used to generate a pkcs12 file (.p12) from client-cert.pem and client-key.pem.
// The resulting file is then imported int a java keystore named keystore.jks using keytool which is part of java jdk.
Connection nc = Nats.connect("nats://demo.nats.io:4222"
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Send the request
msg, err := nc.Request("time", nil, time.Second
if err != nil {
log.Fatal(err)
}
// Use the response
log.Printf("Reply: %s", msg.Data)
// Close the connection
nc.Close()// tls options available depend on the javascript
// runtime, please verify the readme for the
// client you are using for specific details
// this example showing the node library
const nc = await connect({
port: ns.port,
debug: true,
tls: {
caFile: caCertPath,
keyFile: clientKeyPath,
certFile: clientCertPath,
},
});nc = NATS()
ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations('rootCA.pem')
ssl_ctx.load_cert_chain(certfile='client-cert.pem',
keyfile='client-key.pem')
await nc.connect(io_loop=loop, tls=ssl_ctx)
await nc.connect(servers=["nats://demo.nats.io:4222"], tls=ssl_ctx)
# Do something with the connection.// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.Core;
await using var client = new NatsClient(new NatsOpts
{
TlsOpts = new NatsTlsOpts
{
CaFile = "rootCA.pem",
KeyFile = "client-key.pem",
CertFile = "client-cert.pem",
}
});EM.run do
options = {
:servers => [
'nats://localhost:4222',
],
:tls => {
:private_key_file => 'client-key.pem',
:cert_chain_file => 'client-cert.pem',
:ca_file => 'rootCA.pem'
}
}
NATS.connect(options) do |nc|
puts "#{Time.now.to_f} - Connected to NATS at #{nc.connected_server}"
nc.subscribe("hello") do |msg|
puts "#{Time.now.to_f} - Received: #{msg}"
end
nc.flush do
nc.publish("hello", "world")
end
EM.add_periodic_timer(0.1) do
next unless nc.connected?
nc.publish("hello", "hello")
end
# Set default callbacks
nc.on_error do |e|
puts "#{Time.now.to_f } - Error: #{e}"
end
nc.on_disconnect do |reason|
puts "#{Time.now.to_f} - Disconnected: #{reason}"
end
nc.on_reconnect do |nc|
puts "#{Time.now.to_f} - Reconnected to NATS server at #{nc.connected_server}"
end
nc.on_close do
puts "#{Time.now.to_f} - Connection to NATS closed"
EM.stop
end
end
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_LoadCertificatesChain(opts, "client-cert.pem", "client-key.pem");
if (s == NATS_OK)
s = natsOptions_LoadCATrustedCertificates(opts, "rootCA.pem");
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);nats-server --tls --tlscert=server-cert.pem --tlskey=server-key.pem --tlscacert rootCA.pem --tlsverify// set up a subscription to process the request
const sc = StringCodec();
nc.subscribe("time", {
callback: (_err, msg) => {
msg.respond(sc.encode(new Date().toLocaleTimeString()));
},
});
const r = await nc.request("time");
t.log(sc.decode(r.data));nc = NATS()
async def sub(msg):
await nc.publish(msg.reply, b'response')
await nc.connect(servers=["nats://demo.nats.io:4222"])
await nc.subscribe("time", cb=sub)
# Send the request
try:
msg = await nc.request("time", b'', timeout=1)
# Use the response
print("Reply:", msg)
except asyncio.TimeoutError:
print("Timed out waiting for response")// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
using CancellationTokenSource cts = new();
// Process the time messages in a separate task
Task subscription = Task.Run(async () =>
{
await foreach (var msg in client.SubscribeAsync<string>("time", cancellationToken: cts.Token))
{
await msg.ReplyAsync(DateTimeOffset.Now);
}
});
// Wait for the subscription task to be ready
await Task.Delay(1000);
var reply = await client.RequestAsync<DateTimeOffset>("time");
Console.WriteLine($"Reply: {reply.Data:O}");
await cts.CancelAsync();
await subscription;
// Output:
// Reply: 2024-10-23T05:20:55.0000000+01:00require 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
nc.subscribe("time") do |msg, reply|
nc.publish(reply, "response")
end
Fiber.new do
# Use the response
msg = nc.request("time", "")
puts "Reply: #{msg}"
end.resume
endnatsConnection *conn = NULL;
natsMsg *msg = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Send a request and wait for up to 1 second
if (s == NATS_OK)
s = natsConnection_RequestString(&msg, conn, "request", "this is the request", 1000);
if (s == NATS_OK)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Destroy the message that was received
natsMsg_Destroy(msg);
}
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);sub, err := nc.SubscribeSync(replyTo)
if err != nil {
log.Fatal(err)
}
// Send the request immediately
nc.PublishRequest(subject, replyTo, []byte(input))
nc.Flush()
// Wait for a single response
for {
msg, err := sub.NextMsg(1 * time.Second)
if err != nil {
log.Fatal(err)
}
response = string(msg.Data)
break
}
sub.Unsubscribe()sub, err := nc.SubscribeSync(replyTo)
if err != nil {
log.Fatal(err)
}
nc.Flush()
// Send the request
nc.PublishRequest(subject, replyTo, []byte(input))
// Wait for a single response
max := 100 * time.Millisecond
start := time.Now()
for time.Now().Sub(start) < max {
msg, err := sub.NextMsg(1 * time.Second)
if err != nil {
break
}
responses = append(responses, string(msg.Data))
}
sub.Unsubscribe()sub, err := nc.SubscribeSync(replyTo)
if err != nil {
log.Fatal(err)
}
nc.Flush()
// Send the request
nc.PublishRequest(subject, replyTo, []byte(input))
// Wait for a single response
max := 500 * time.Millisecond
start := time.Now()
for time.Now().Sub(start) < max {
msg, err := sub.NextMsg(1 * time.Second)
if err != nil {
break
}
responses = append(responses, string(msg.Data))
if len(responses) >= minResponses {
break
}
}
sub.Unsubscribe()nsc add operator O2[ OK ] generated and stored operator key "OABX3STBZZRBHMWMIMVHNQVNUG2O3D54BMZXX5LMBYKSAPDSHIWPMMFY"
[ OK ] added operator "O2"nsc generate nkey --operator --storeSOAEW6Z4HCCGSLZJYZQMGFQY2SY6ZKOPIAKUQ5VZY6CW23WWYRNHTQWVOA
OAZBRNE7DQGDYT5CSAGWDMI5ENGKOEJ57BXVU6WUTHFEAO3CU5GLQYF5
operator key stored ~/.nkeys/keys/O/AZ/OAZBRNE7DQGDYT5CSAGWDMI5ENGKOEJ57BXVU6WUTHFEAO3CU5GLQYF5.nknsc edit operator --sk OAZBRNE7DQGDYT5CSAGWDMI5ENGKOEJ57BXVU6WUTHFEAO3CU5GLQYF5[ OK ] added signing key "OAZBRNE7DQGDYT5CSAGWDMI5ENGKOEJ57BXVU6WUTHFEAO3CU5GLQYF5"
[ OK ] edited operator "O2"nsc describe operator╭─────────────────────────────────────────────────────────────────────────╮
│ Operator Details │
├──────────────┬──────────────────────────────────────────────────────────┤
│ Name │ O2 │
│ Operator ID │ OABX3STBZZRBHMWMIMVHNQVNUG2O3D54BMZXX5LMBYKSAPDSHIWPMMFY │
│ Issuer ID │ OABX3STBZZRBHMWMIMVHNQVNUG2O3D54BMZXX5LMBYKSAPDSHIWPMMFY │
│ Issued │ 2019-12-05 14:36:16 UTC │
│ Expires │ │
├──────────────┼──────────────────────────────────────────────────────────┤
│ Signing Keys │ OAZBRNE7DQGDYT5CSAGWDMI5ENGKOEJ57BXVU6WUTHFEAO3CU5GLQYF5 │
╰──────────────┴──────────────────────────────────────────────────────────╯nsc add account A -K ~/.nkeys/keys/O/AZ/OAZBRNE7DQGDYT5CSAGWDMI5ENGKOEJ57BXVU6WUTHFEAO3CU5GLQYF5.nk[ OK ] generated and stored account key "ACDXQQ6KD5MVSFMK7GNF5ARK3OJC6PEICWCH5PQ7HO27VKGCXQHFE33B"
[ OK ] added account "A"nsc generate nkey --account --storeSAAA4BVFTJMBOW3GAYB3STG3VWFSR4TP4QJKG2OCECGA26SKONPFGC4HHE
ADUQTJD4TF4O6LTTHCKDKSHKGBN2NECCHHMWFREPKNO6MPA7ZETFEEF7
account key stored ~/.nkeys/keys/A/DU/ADUQTJD4TF4O6LTTHCKDKSHKGBN2NECCHHMWFREPKNO6MPA7ZETFEEF7.nknsc edit account --sk ADUQTJD4TF4O6LTTHCKDKSHKGBN2NECCHHMWFREPKNO6MPA7ZETFEEF7 -K ~/.nkeys/keys/O/AZ/OAZBRNE7DQGDYT5CSAGWDMI5ENGKOEJ57BXVU6WUTHFEAO3CU5GLQYF5.nk[ OK ] added signing key "ADUQTJD4TF4O6LTTHCKDKSHKGBN2NECCHHMWFREPKNO6MPA7ZETFEEF7"
[ OK ] edited account "A"nsc describe account╭──────────────────────────────────────────────────────────────────────────────────────╮
│ Account Details │
├───────────────────────────┬──────────────────────────────────────────────────────────┤
│ Name │ A │
│ Account ID │ ACDXQQ6KD5MVSFMK7GNF5ARK3OJC6PEICWCH5PQ7HO27VKGCXQHFE33B │
│ Issuer ID │ OAZBRNE7DQGDYT5CSAGWDMI5ENGKOEJ57BXVU6WUTHFEAO3CU5GLQYF5 │
│ Issued │ 2019-12-05 14:48:22 UTC │
│ Expires │ │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Signing Keys │ ADUQTJD4TF4O6LTTHCKDKSHKGBN2NECCHHMWFREPKNO6MPA7ZETFEEF7 │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Max Connections │ Unlimited │
│ Max Leaf Node Connections │ Unlimited │
│ Max Data │ Unlimited │
│ Max Exports │ Unlimited │
│ Max Imports │ Unlimited │
│ Max Msg Payload │ Unlimited │
│ Max Subscriptions │ Unlimited │
│ Exports Allows Wildcards │ True │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Imports │ None │
│ Exports │ None │
╰───────────────────────────┴──────────────────────────────────────────────────────────╯nsc add user U -K ~/.nkeys/keys/A/DU/ADUQTJD4TF4O6LTTHCKDKSHKGBN2NECCHHMWFREPKNO6MPA7ZETFEEF7.nk[ OK ] generated and stored user key "UD47TOTKVDY4IQRGI6D7XMLZPHZVNV5FCD4CNQICLV3FXLQBY72A4UXL"
[ OK ] generated user creds file "~/.nkeys/creds/O2/A/U.creds"
[ OK ] added user "U" to account "A"nsc describe user╭─────────────────────────────────────────────────────────────────────────────────╮
│ User │
├──────────────────────┬──────────────────────────────────────────────────────────┤
│ Name │ U │
│ User ID │ UD47TOTKVDY4IQRGI6D7XMLZPHZVNV5FCD4CNQICLV3FXLQBY72A4UXL │
│ Issuer ID │ ADUQTJD4TF4O6LTTHCKDKSHKGBN2NECCHHMWFREPKNO6MPA7ZETFEEF7 │
│ Issuer Account │ ACDXQQ6KD5MVSFMK7GNF5ARK3OJC6PEICWCH5PQ7HO27VKGCXQHFE33B │
│ Issued │ 2019-12-05 14:50:07 UTC │
│ Expires │ │
├──────────────────────┼──────────────────────────────────────────────────────────┤
│ Response Permissions │ Not Set │
├──────────────────────┼──────────────────────────────────────────────────────────┤
│ Max Messages │ Unlimited │
│ Max Msg Payload │ Unlimited │
│ Network Src │ Any │
│ Time │ Any │
╰──────────────────────┴──────────────────────────────────────────────────────────╯nsc add account A[ OK ] generated and stored account key "ADLGEVANYDKDQ6WYXPNBEGVUURXZY4LLLK5BJPOUDN6NGNXLNH4ATPWR"
[ OK ] push jwt to account server:
[ OK ] pushed account jwt to the account server
> NGS created a new free billing account for your JWT, A [ADLGEVANYDKD].
> Use the 'ngs' command to manage your billing plan.
> If your account JWT is *not* in ~/.nsc, use the -d flag on ngs commands to locate it.
[ OK ] pull jwt from account server
[ OK ] added account "A"nsc edit account -n A --sk generate[ OK ] added signing key "AAZQXKDPOTGUCOCOGDW7HWWVR5WEGF3KYL7EKOEHW2XWRS2PT5AOTRH3"
[ OK ] push jwt to account server
[ OK ] pull jwt from account server
[ OK ] account server modifications:
> allow wildcard exports changed from true to false
[ OK ] edited account "A"nsc edit signing-key --account A --role service --sk AAZQXKDPOTGUCOCOGDW7HWWVR5WEGF3KYL7EKOEHW2XWRS2PT5AOTRH3 --allow-sub "q.>" --deny-pub ">" --allow-pub-response[ OK ] set max responses to 1
[ OK ] added deny pub ">"
[ OK ] added sub "q.>"
[ OK ] push jwt to account server
[ OK ] pull jwt from account server
[ OK ] edited signing key "AAZQXKDPOTGUCOCOGDW7HWWVR5WEGF3KYL7EKOEHW2XWRS2PT5AOTRH3"nsc add user U -K service[ OK ] generated and stored user key "UBFRJ6RNBYJWSVFBS7O4ZW5MM6J3EPE75II3ULPVUWOUH7K7A23D3RQE"
[ OK ] generated user creds file `~/test/issue-2621/keys/creds/synadia/A/U.creds`
[ OK ] added user "U" to account "A"nsc edit signing-key \
--account sales \
--role team-service \
--sk AXUQXKDPOTGUCOCOGDW7HWWVR5WEGF3KYL7EKOEHW2XWRS2PT5AOTRH3 \
--allow-sub "{{account-name()}}.{{tag(team)}}.{{name()}}.>" \
--allow-pub-responsensc add user pam -K team-service --tag team:support
nsc add user joe -K team-service --tag team:leadssales.support.pam.>sales.leads.joe.>nats account infoConnection Information:
Client ID: 6
Client IP: 127.0.0.1
RTT: 64.996µs
Headers Supported: true
Maximum Payload: 1.0 MiB
Connected URL: nats://127.0.0.1:4222
Connected Address: 127.0.0.1:4222
Connected Server ID: ND2XVDA4Q363JOIFKJTPZW3ZKZCANH7NJI4EJMFSSPTRXDBFG4M4C34K
JetStream Account Information:
Memory: 0 B of Unlimited
Storage: 0 B of Unlimited
Streams: 0 of Unlimited
Consumers: 0 of UnlimitedJetStream Account Information:
JetStream is not supported in this accountnats object add myobjbucketmyobjbucket Object Store Status
Bucket Name: myobjbucket
Replicas: 1
TTL: unlimitd
Sealed: false
Size: 0 B
Backing Store Kind: JetStream
JetStream Stream: OBJ_myobjbucketnats object put myobjbucket ~/Movies/NATS-logo.mov1.5 GiB / 1.5 GiB [====================================================================================]
Object information for myobjbucket > /Users/jnmoyne/Movies/NATS-logo.mov
Size: 1.5 GiB
Modification Time: 14 Apr 22 00:34 +0000
Chunks: 12,656
Digest: sha-256 8ee0679dd1462de393d81a3032d71f43d2bc89c0c8a557687cfe2787e926nats object put --name /Movies/NATS-logo.mov myobjbucket ~/Movies/NATS-logo.mov1.5 GiB / 1.5 GiB [====================================================================================]
Object information for myobjbucket > /Movies/NATS-logo.mov
Size: 1.5 GiB
Modification Time: 14 Apr 22 00:34 +0000
Chunks: 12,656
Digest: sha-256 8ee0679dd1462de393d81a3032d71f43d2bc89c0c8a557687cfe2787e926nats object ls myobjbucket╭───────────────────────────────────────────────────────────────────────────╮
│ Bucket Contents │
├─────────────────────────────────────┬─────────┬───────────────────────────┤
│ Name │ Size │ Time │
├─────────────────────────────────────┼─────────┼───────────────────────────┤
│ /Users/jnmoyne/Movies/NATS-logo.mov │ 1.5 GiB │ 2022-04-13T17:34:55-07:00 │
│ /Movies/NATS-logo.mov │ 1.5 GiB │ 2022-04-13T17:35:41-07:00 │
╰─────────────────────────────────────┴─────────┴───────────────────────────╯nats object get myobjbucket ~/Movies/NATS-logo.mov1.5 GiB / 1.5 GiB [====================================================================================]
Wrote: 1.5 GiB to /Users/jnmoyne/NATS-logo.mov in 5.68s average 279 MiB/snats object get myobjbucket --output /temp/Movies/NATS-logo.mov /Movies/NATS-logo.mov1.5 GiB / 1.5 GiB [====================================================================================]
Wrote: 1.5 GiB to /temp/Movies/NATS-logo.mov in 5.68s average 279 MiB/snats object rm myobjbucket ~/Movies/NATS-logo.mov? Delete 1.5 GiB byte file myobjbucket > /Users/jnmoyne/Movies/NATS-logo.mov? Yes
Removed myobjbucket > /Users/jnmoyne/Movies/NATS-logo.mov
myobjbucket Object Store Status
Bucket Name: myobjbucket
Replicas: 1
TTL: unlimitd
Sealed: false
Size: 16 MiB
Backing Store Kind: JetStream
JetStream Stream: OBJ_myobjbucketnats object info myobjbucketmyobjbucket Object Store Status
Bucket Name: myobjbucket
Replicas: 1
TTL: unlimitd
Sealed: false
Size: 1.6 GiB
Backing Store Kind: JetStream
JetStream Stream: OBJ_myobjbucketnats object watch myobjbucket[2022-04-13 17:51:28] PUT myobjbucket > /Users/jnmoyne/Movies/NATS-logo.mov: 1.5 GiB bytes in 12,656 chunks
[2022-04-13 17:53:27] DEL myobjbucket > /Users/jnmoyne/Movies/NATS-logo.movnats object seal myobjbucket? Really seal Bucket myobjbucket, sealed buckets can not be unsealed or modified Yes
myobjbucket has been sealed
myobjbucket Object Store Status
Bucket Name: myobjbucket
Replicas: 1
TTL: unlimitd
Sealed: true
Size: 1.6 GiB
Backing Store Kind: JetStream
JetStream Stream: OBJ_myobjbuckett.log(`max payload for the server is ${nc.info.max_payloadnc = NATS()
await nc.connect(
nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
mp := nc.MaxPayload()
log.Printf("Maximum payload is %v bytes", mp)
// Do something with the max payloadConnection nc = Nats.connect("nats://demo.nats.io:4222");
long mp = nc.getMaxPayload();
System.out.println("max payload for the server is " + mp + " bytes");// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient("nats://demo.nats.io:4222");
// Make sure we connect to a server to receive the server info,
// since connecting to servers is lazy in .NET client.
await client.ConnectAsync();
Console.WriteLine($"MaxPayload = {client.Connection.ServerInfo.MaxPayload}");require 'nats/client'
NATS.start(max_outstanding_pings: 5) do |nc|
nc.on_reconnect do
puts "Got reconnected to #{nc.connected_server}"
end
nc.on_disconnect do |reason|
puts "Got disconnected! #{reason}"
end
# Do something with the max_payload
puts "Maximum Payload is #{nc.server_info[:max_payload]} bytes"
endnatsConnection *conn = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
if (s == NATS_OK)
{
int64_t mp = natsConnection_GetMaxPayload(conn);
printf("Max payload: %d\n", (int) mp);
}
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);opts := nats.GetDefaultOptions()
opts.Url = "demo.nats.io"
// Turn on Pedantic
opts.Pedantic = true
nc, err := opts.Connect()
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionOptions options = new Options.Builder().
server("nats://demo.nats.io:4222").
pedantic(). // Turn on pedantic
build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close();// the pedantic option is useful for developing nats clients.
// the javascript clients also provide `debug` which will
// print to the console all the protocol interactions
// with the server
const nc = await connect({
pedantic: true,
servers: ["demo.nats.io:4222"],
debug: true,
});nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"], pedantic=True)
# Do something with the connection.// Not available in the NATS .NET clientrequire 'nats/client'
NATS.start(pedantic: true) do |nc|
nc.on_reconnect do
puts "Got reconnected to #{nc.connected_server}"
end
nc.on_disconnect do |reason|
puts "Got disconnected! #{reason}"
end
nc.close
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetPedantic(opts, true);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);// This does not apply to the NATS Go ClientOptions options = new Options.Builder().
server("nats://demo.nats.io:4222").
maxControlLine(2 * 1024). // Set the max control line to 2k
build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close();// the max control line is determined automatically by the client# Asyncio NATS client does not allow custom control lines.// control line is not configurable on NATS .NET client.
// required memory is allocated dynamically from the array pool.# There is no need to customize this in the Ruby NATS client.// control line is not configurable on C NATS client.opts := nats.GetDefaultOptions()
opts.Url = "demo.nats.io"
// Turn on Verbose
opts.Verbose = true
nc, err := opts.Connect()
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionOptions options = new Options.Builder().
server("nats://demo.nats.io:4222").
verbose(). // Turn on verbose
build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close();const nc = await connect({
verbose: true,
servers: ["demo.nats.io:4222"],
});nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"], verbose=True)
# Do something with the connection.require 'nats/client'
NATS.start(verbose: true) do |nc|
nc.on_reconnect do
puts "Got reconnected to #{nc.connected_server}"
end
nc.on_disconnect do |reason|
puts "Got disconnected! #{reason}"
end
nc.close
endnatsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetVerbose(opts, true);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);nsc add export --name abc --subject "a.b.c.>" [ OK ] added public stream export "abc"nsc describe account╭──────────────────────────────────────────────────────────────────────────────────────╮
│ Account Details │
├───────────────────────────┬──────────────────────────────────────────────────────────┤
│ Name │ A │
│ Account ID │ ADETPT36WBIBUKM3IBCVM4A5YUSDXFEJPW4M6GGVBYCBW7RRNFTV5NGE │
│ Issuer ID │ OAFEEYZSYYVI4FXLRXJTMM32PQEI3RGOWZJT7Y3YFM4HB7ACPE4RTJPG │
│ Issued │ 2019-12-05 13:35:42 UTC │
│ Expires │ │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Max Connections │ Unlimited │
│ Max Leaf Node Connections │ Unlimited │
│ Max Data │ Unlimited │
│ Max Exports │ Unlimited │
│ Max Imports │ Unlimited │
│ Max Msg Payload │ Unlimited │
│ Max Subscriptions │ Unlimited │
│ Exports Allows Wildcards │ True │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Imports │ None │
╰───────────────────────────┴──────────────────────────────────────────────────────────╯
╭───────────────────────────────────────────────────────────╮
│ Exports │
├──────┬────────┬─────────┬────────┬─────────────┬──────────┤
│ Name │ Type │ Subject │ Public │ Revocations │ Tracking │
├──────┼────────┼─────────┼────────┼─────────────┼──────────┤
│ abc │ Stream │ a.b.c.> │ Yes │ 0 │ N/A │
╰──────┴────────┴─────────┴────────┴─────────────┴──────────╯nsc add account B[ OK ] generated and stored account key "AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H"
[ OK ] added account "B"nsc add import --src-account ADETPT36WBIBUKM3IBCVM4A5YUSDXFEJPW4M6GGVBYCBW7RRNFTV5NGE --remote-subject "a.b.c.>"[ OK ] added stream import "a.b.c.>"nsc describe account╭──────────────────────────────────────────────────────────────────────────────────────╮
│ Account Details │
├───────────────────────────┬──────────────────────────────────────────────────────────┤
│ Name │ B │
│ Account ID │ AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │
│ Issuer ID │ OAFEEYZSYYVI4FXLRXJTMM32PQEI3RGOWZJT7Y3YFM4HB7ACPE4RTJPG │
│ Issued │ 2019-12-05 13:39:55 UTC │
│ Expires │ │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Max Connections │ Unlimited │
│ Max Leaf Node Connections │ Unlimited │
│ Max Data │ Unlimited │
│ Max Exports │ Unlimited │
│ Max Imports │ Unlimited │
│ Max Msg Payload │ Unlimited │
│ Max Subscriptions │ Unlimited │
│ Exports Allows Wildcards │ True │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Exports │ None │
╰───────────────────────────┴──────────────────────────────────────────────────────────╯
╭─────────────────────────────────────────────────────────────────────────────╮
│ Imports │
├─────────┬────────┬─────────┬──────────────┬─────────┬──────────────┬────────┤
│ Name │ Type │ Remote │ Local/Prefix │ Expires │ From Account │ Public │
├─────────┼────────┼─────────┼──────────────┼─────────┼──────────────┼────────┤
│ a.b.c.> │ Stream │ a.b.c.> │ │ │ A │ Yes │
╰─────────┴────────┴─────────┴──────────────┴─────────┴──────────────┴────────╯nsc add user b[ OK ] generated and stored user key "UDKNTNEL5YD66U2FZZ2B3WX2PLJFKEFHAPJ3NWJBFF44PT76Y2RAVFVE"
[ OK ] generated user creds file "~/.nkeys/creds/O/B/b.creds"
[ OK ] added user "b" to account "B"nsc sub --account B --user b "a.b.c.>"nsc pub --account A --user U a.b.c.hello worldnsc add export --subject "private.abc.*" --private --account Ansc describe account A╭──────────────────────────────────────────────────────────────────────────────────────╮
│ Account Details │
├───────────────────────────┬──────────────────────────────────────────────────────────┤
│ Name │ A │
│ Account ID │ ADETPT36WBIBUKM3IBCVM4A5YUSDXFEJPW4M6GGVBYCBW7RRNFTV5NGE │
│ Issuer ID │ OAFEEYZSYYVI4FXLRXJTMM32PQEI3RGOWZJT7Y3YFM4HB7ACPE4RTJPG │
│ Issued │ 2019-12-05 14:24:02 UTC │
│ Expires │ │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Max Connections │ Unlimited │
│ Max Leaf Node Connections │ Unlimited │
│ Max Data │ Unlimited │
│ Max Exports │ Unlimited │
│ Max Imports │ Unlimited │
│ Max Msg Payload │ Unlimited │
│ Max Subscriptions │ Unlimited │
│ Exports Allows Wildcards │ True │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Imports │ None │
╰───────────────────────────┴──────────────────────────────────────────────────────────╯
╭──────────────────────────────────────────────────────────────────────────╮
│ Exports │
├───────────────┬────────┬───────────────┬────────┬─────────────┬──────────┤
│ Name │ Type │ Subject │ Public │ Revocations │ Tracking │
├───────────────┼────────┼───────────────┼────────┼─────────────┼──────────┤
│ abc │ Stream │ a.b.c.> │ Yes │ 0 │ N/A │
│ private.abc.* │ Stream │ private.abc.* │ No │ 0 │ N/A │
╰───────────────┴────────┴───────────────┴────────┴─────────────┴──────────╯nsc list keys --account B╭──────────────────────────────────────────────────────────────────────────────────────────╮
│ Keys │
├────────┬──────────────────────────────────────────────────────────┬─────────────┬────────┤
│ Entity │ Key │ Signing Key │ Stored │
├────────┼──────────────────────────────────────────────────────────┼─────────────┼────────┤
│ O │ OAFEEYZSYYVI4FXLRXJTMM32PQEI3RGOWZJT7Y3YFM4HB7ACPE4RTJPG │ │ * │
│ B │ AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │ │ * │
│ b │ UDKNTNEL5YD66U2FZZ2B3WX2PLJFKEFHAPJ3NWJBFF44PT76Y2RAVFVE │ │ * │
╰────────┴──────────────────────────────────────────────────────────┴─────────────┴────────╯nsc generate activation --account A --target-account AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H --subject private.abc.AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H -o /tmp/activation.jwt[ OK ] generated "private.abc.*" activation for account "AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H"
[ OK ] wrote account description to "/tmp/activation.jwt"cat /tmp/activation.jwt-----BEGIN NATS ACTIVATION JWT-----
eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5In0.eyJqdGkiOiJIS1FPQU9aQkVKS1JYNFJRUVhXS0xYSVBVTlNOSkRRTkxXUFBTSTQ3NkhCVVNYT0paVFFRIiwiaWF0IjoxNTc1NTU1OTczLCJpc3MiOiJBREVUUFQzNldCSUJVS00zSUJDVk00QTVZVVNEWEZFSlBXNE02R0dWQllDQlc3UlJORlRWNU5HRSIsIm5hbWUiOiJwcml2YXRlLmFiYy5BQU00NkUzWUY1V09aU0U1V05ZV0hOM1lZSVNWWk9TSTZYSFRGMlE2NEVDUFhTRlFaUk9KTVAySCIsInN1YiI6IkFBTTQ2RTNZRjVXT1pTRTVXTllXSE4zWVlJU1ZaT1NJNlhIVEYyUTY0RUNQWFNGUVpST0pNUDJIIiwidHlwZSI6ImFjdGl2YXRpb24iLCJuYXRzIjp7InN1YmplY3QiOiJwcml2YXRlLmFiYy5BQU00NkUzWUY1V09aU0U1V05ZV0hOM1lZSVNWWk9TSTZYSFRGMlE2NEVDUFhTRlFaUk9KTVAySCIsInR5cGUiOiJzdHJlYW0ifX0.yD2HWhRQYUFy5aQ7zNV0YjXzLIMoTKnnsBB_NsZNXP-Qr5fz7nowyz9IhoP7UszkN58m__ovjIaDKI9ml0l9DA
------END NATS ACTIVATION JWT------nsc describe jwt -f /tmp/activation.jwt ╭────────────────────────────────────────────────────────────────────────────────────────╮
│ Activation │
├─────────────────┬──────────────────────────────────────────────────────────────────────┤
│ Name │ private.abc.AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │
│ Account ID │ AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │
│ Issuer ID │ ADETPT36WBIBUKM3IBCVM4A5YUSDXFEJPW4M6GGVBYCBW7RRNFTV5NGE │
│ Issued │ 2019-12-05 14:26:13 UTC │
│ Expires │ │
├─────────────────┼──────────────────────────────────────────────────────────────────────┤
│ Hash ID │ GWIS5YCSET4EXEOBXVMQKXAR4CLY4IIXFV4MEMRUXPSQ7L4YTZ4Q==== │
├─────────────────┼──────────────────────────────────────────────────────────────────────┤
│ Import Type │ Stream │
│ Import Subject │ private.abc.AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │
├─────────────────┼──────────────────────────────────────────────────────────────────────┤
│ Max Messages │ Unlimited │
│ Max Msg Payload │ Unlimited │
│ Network Src │ Any │
│ Time │ Any │
╰─────────────────┴──────────────────────────────────────────────────────────────────────╯nsc add import --account B --token /tmp/activation.jwt[ OK ] added stream import "private.abc.AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H"nsc describe account B╭──────────────────────────────────────────────────────────────────────────────────────╮
│ Account Details │
├───────────────────────────┬──────────────────────────────────────────────────────────┤
│ Name │ B │
│ Account ID │ AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │
│ Issuer ID │ OAFEEYZSYYVI4FXLRXJTMM32PQEI3RGOWZJT7Y3YFM4HB7ACPE4RTJPG │
│ Issued │ 2019-12-05 14:29:16 UTC │
│ Expires │ │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Max Connections │ Unlimited │
│ Max Leaf Node Connections │ Unlimited │
│ Max Data │ Unlimited │
│ Max Exports │ Unlimited │
│ Max Imports │ Unlimited │
│ Max Msg Payload │ Unlimited │
│ Max Subscriptions │ Unlimited │
│ Exports Allows Wildcards │ True │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Exports │ None │
╰───────────────────────────┴──────────────────────────────────────────────────────────╯
╭───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ Imports │
├──────────────────────────────────────────────────────────────────────┬────────┬──────────────────────────────────────────────────────────────────────┬──────────────┬─────────┬──────────────┬────────┤
│ Name │ Type │ Remote │ Local/Prefix │ Expires │ From Account │ Public │
├──────────────────────────────────────────────────────────────────────┼────────┼──────────────────────────────────────────────────────────────────────┼──────────────┼─────────┼──────────────┼────────┤
│ a.b.c.> │ Stream │ a.b.c.> │ │ │ A │ Yes │
│ private.abc.AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │ Stream │ private.abc.AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │ │ │ A │ No │
╰──────────────────────────────────────────────────────────────────────┴────────┴──────────────────────────────────────────────────────────────────────┴──────────────┴─────────┴──────────────┴────────╯nsc tools sub --account B --user b private.abc.AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2Hnsc tools pub --account A --user U private.abc.AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H hellonc, err := nats.Connect("demo.nats.io
// Consumer (Dispatcher, Subscription) API
// void setPendingLimits(long maxMessages, long maxBytes)
Connection nc = Nats.connect("nats://demo.nats.io:4222");
Dispatcher d = nc.createDispatcher((msg) -> {
// handle message
});
d.subscribe("updates");
d.setPendingLimits(1_000, 5 * 1024 * 1024); // Set limits on a dispatcher
// Subscribe
Subscription sub = nc.subscribe("updates");
sub.setPendingLimits(1_000, 5 * 1024 * 1024); // Set limits on a subscription
// Do something
// Close the connection
nc.close();// slow pending limits are not configurable on node-natsnc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
future = asyncio.Future()
async def cb(msg):
nonlocal future
future.set_result(msg)
# Set limits of 1000 messages or 5MB
await nc.subscribe("updates", cb=cb, pending_bytes_limit=5*1024*1024, pending_msgs_limit=1000)// dotnet add package NATS.Net
using NATS.Net;
using System.Threading.Channels;
using NATS.Client.Core;
await using var client = new NatsClient();
// Set limits of 1000 messages.
// Note: setting the channel capacity over 1024 is not recommended
// as the channel's backing array will be allocated on the LOH (large object heap).
// NATS .NET client does not support setting a limit on the number of bytes
var subOpts = new NatsSubOpts
{
ChannelOpts = new NatsSubChannelOpts
{
Capacity = 1000,
FullMode = BoundedChannelFullMode.DropOldest
}
};
await foreach (var msg in client.SubscribeAsync<string>(subject: "updates", opts: subOpts))
{
Console.WriteLine($"Received: {msg.Subject}: {msg.Data}");
}# The Ruby NATS client currently does not have option to specify a subscribers pending limits.natsConnection *conn = NULL;
natsSubscription *sub1 = NULL;
natsSubscription *sub2 = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Subscribe
if (s == NATS_OK)
s = natsConnection_Subscribe(&sub1, conn, "updates", onMsg, NULL);
// Set limits of 1000 messages or 5MB, whichever comes first
if (s == NATS_OK)
s = natsSubscription_SetPendingLimits(sub1, 1000, 5*1024*1024);
// Subscribe
if (s == NATS_OK)
s = natsConnection_Subscribe(&sub2, conn, "updates", onMsg, NULL);
// Set no limits for this subscription
if (s == NATS_OK)
s = natsSubscription_SetPendingLimits(sub2, -1, -1);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub1);
natsSubscription_Destroy(sub2);
natsConnection_Destroy(conn);// Set the callback that will be invoked when an asynchronous error occurs.
nc, err := nats.Connect("demo.nats.io", nats.ErrorHandler(logSlowConsumer))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionclass SlowConsumerReporter implements ErrorListener {
public void errorOccurred(Connection conn, String error)
{
}
public void exceptionOccurred(Connection conn, Exception exp) {
}
// Detect slow consumers
public void slowConsumerDetected(Connection conn, Consumer consumer) {
// Get the dropped count
System.out.println("A slow consumer dropped messages: "+ consumer.getDroppedCount());
}
}
public class SlowConsumerListener {
public static void main(String[] args) {
try {
Options options = new Options.Builder().
server("nats://demo.nats.io:4222").
errorListener(new SlowConsumerReporter()). // Set the listener
build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}// slow consumer detection is not configurable on NATS JavaScript client. nc = NATS()
async def error_cb(e):
if type(e) is nats.aio.errors.ErrSlowConsumer:
print("Slow consumer error, unsubscribing from handling further messages...")
await nc.unsubscribe(e.sid)
await nc.connect(
servers=["nats://demo.nats.io:4222"],
error_cb=error_cb,
)
msgs = []
future = asyncio.Future()
async def cb(msg):
nonlocal msgs
nonlocal future
print(msg)
msgs.append(msg)
if len(msgs) == 3:
# Head of line blocking on other messages caused
# by single message processing taking too long...
await asyncio.sleep(1)
await nc.subscribe("updates", cb=cb, pending_msgs_limit=5)
for i in range(0, 10):
await nc.publish("updates", "msg #{}".format(i).encode())
await asyncio.sleep(0)
try:
await asyncio.wait_for(future, 1)
except asyncio.TimeoutError:
pass
for msg in msgs:
print("[Received]", msg)
await nc.close()// dotnet add package NATS.Net
using NATS.Net;
using System.Threading.Channels;
using NATS.Client.Core;
await using var client = new NatsClient();
// Set the event handler for slow consumers
client.Connection.MessageDropped += async (sender, eventArgs) =>
{
Console.WriteLine($"Dropped message: {eventArgs.Subject}: {eventArgs.Data}");
Console.WriteLine($"Current channel size: {eventArgs.Pending}");
};
var subOpts = new NatsSubOpts
{
ChannelOpts = new NatsSubChannelOpts
{
Capacity = 10,
FullMode = BoundedChannelFullMode.DropOldest
// If set to wait (default), you won't be able to detect slow consumers
// FullMode = BoundedChannelFullMode.Wait,
}
};
using var cts = new CancellationTokenSource();
var subscription = Task.Run(async () =>
{
await foreach (var msg in client.SubscribeAsync<string>(subject: "updates", opts: subOpts, cancellationToken: cts.Token))
{
Console.WriteLine($"Received: {msg.Subject}: {msg.Data}");
}
});
for (int i = 0; i < 1_000; i++)
{
await client.PublishAsync(subject: "updates", data: $"message payload {i}");
}
await cts.CancelAsync();
await subscription;# The Ruby NATS client currently does not have option to customize slow consumer limits per sub.static void
errorCB(natsConnection *conn, natsSubscription *sub, natsStatus s, void *closure)
{
// Do something
printf("Error: %d - %s", s, natsStatus_GetText(s));
}
(...)
natsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetErrorHandler(opts, errorCB, NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);nsc add export --name help --subject help --service[ OK ] added public service export "help"nsc describe account╭──────────────────────────────────────────────────────────────────────────────────────╮
│ Account Details │
├───────────────────────────┬──────────────────────────────────────────────────────────┤
│ Name │ A │
│ Account ID │ ADETPT36WBIBUKM3IBCVM4A5YUSDXFEJPW4M6GGVBYCBW7RRNFTV5NGE │
│ Issuer ID │ OAFEEYZSYYVI4FXLRXJTMM32PQEI3RGOWZJT7Y3YFM4HB7ACPE4RTJPG │
│ Issued │ 2019-12-04 18:20:42 UTC │
│ Expires │ │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Max Connections │ Unlimited │
│ Max Leaf Node Connections │ Unlimited │
│ Max Data │ Unlimited │
│ Max Exports │ Unlimited │
│ Max Imports │ Unlimited │
│ Max Msg Payload │ Unlimited │
│ Max Subscriptions │ Unlimited │
│ Exports Allows Wildcards │ True │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Imports │ None │
╰───────────────────────────┴──────────────────────────────────────────────────────────╯
╭────────────────────────────────────────────────────────────╮
│ Exports │
├──────┬─────────┬─────────┬────────┬─────────────┬──────────┤
│ Name │ Type │ Subject │ Public │ Revocations │ Tracking │
├──────┼─────────┼─────────┼────────┼─────────────┼──────────┤
│ help │ Service │ help │ Yes │ 0 │ - │
╰──────┴─────────┴─────────┴────────┴─────────────┴──────────╯nsc add account B[ OK ] generated and stored account key "AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H"
[ OK ] added account "B"nsc add import --src-account ADETPT36WBIBUKM3IBCVM4A5YUSDXFEJPW4M6GGVBYCBW7RRNFTV5NGE --remote-subject help --service[ OK ] added service import "help"nsc describe account╭──────────────────────────────────────────────────────────────────────────────────────╮
│ Account Details │
├───────────────────────────┬──────────────────────────────────────────────────────────┤
│ Name │ B │
│ Account ID │ AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │
│ Issuer ID │ OAFEEYZSYYVI4FXLRXJTMM32PQEI3RGOWZJT7Y3YFM4HB7ACPE4RTJPG │
│ Issued │ 2019-12-04 20:12:42 UTC │
│ Expires │ │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Max Connections │ Unlimited │
│ Max Leaf Node Connections │ Unlimited │
│ Max Data │ Unlimited │
│ Max Exports │ Unlimited │
│ Max Imports │ Unlimited │
│ Max Msg Payload │ Unlimited │
│ Max Subscriptions │ Unlimited │
│ Exports Allows Wildcards │ True │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Exports │ None │
╰───────────────────────────┴──────────────────────────────────────────────────────────╯
╭──────────────────────────────────────────────────────────────────────────╮
│ Imports │
├──────┬─────────┬────────┬──────────────┬─────────┬──────────────┬────────┤
│ Name │ Type │ Remote │ Local/Prefix │ Expires │ From Account │ Public │
├──────┼─────────┼────────┼──────────────┼─────────┼──────────────┼────────┤
│ help │ Service │ help │ help │ │ A │ Yes │
╰──────┴─────────┴────────┴──────────────┴─────────┴──────────────┴────────╯nsc add user b[ OK ] generated and stored user key "UDKNTNEL5YD66U2FZZ2B3WX2PLJFKEFHAPJ3NWJBFF44PT76Y2RAVFVE"
[ OK ] generated user creds file "~/.nkeys/creds/O/B/b.creds"
[ OK ] added user "b" to account "B"nats reply --creds ~/.nkeys/creds/O/A/U.creds help "I will help" nsc reply --account A --user U help "I will help"nats request --creds ~/.nkeys/creds/O/B/b.creds help meReceived on [help]: 'me'Received [_INBOX.v6KAX0v1bu87k49hbg3dgn.StIGJF0D] : 'I will help'nsc reply --account A --user U help "I will help"
nsc req --account B --user b help mepublished request: [help] : 'me'
received reply: [_INBOX.GCJltVq1wRSb5FoJrJ6SE9.w8utbBXR] : 'I will help'nsc add export --subject "private.help.*" --private --service --account A[ OK ] added private service export "private.help.*"nsc describe account A╭──────────────────────────────────────────────────────────────────────────────────────╮
│ Account Details │
├───────────────────────────┬──────────────────────────────────────────────────────────┤
│ Name │ A │
│ Account ID │ ADETPT36WBIBUKM3IBCVM4A5YUSDXFEJPW4M6GGVBYCBW7RRNFTV5NGE │
│ Issuer ID │ OAFEEYZSYYVI4FXLRXJTMM32PQEI3RGOWZJT7Y3YFM4HB7ACPE4RTJPG │
│ Issued │ 2019-12-04 20:19:19 UTC │
│ Expires │ │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Max Connections │ Unlimited │
│ Max Leaf Node Connections │ Unlimited │
│ Max Data │ Unlimited │
│ Max Exports │ Unlimited │
│ Max Imports │ Unlimited │
│ Max Msg Payload │ Unlimited │
│ Max Subscriptions │ Unlimited │
│ Exports Allows Wildcards │ True │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Imports │ None │
╰───────────────────────────┴──────────────────────────────────────────────────────────╯
╭─────────────────────────────────────────────────────────────────────────────╮
│ Exports │
├────────────────┬─────────┬────────────────┬────────┬─────────────┬──────────┤
│ Name │ Type │ Subject │ Public │ Revocations │ Tracking │
├────────────────┼─────────┼────────────────┼────────┼─────────────┼──────────┤
│ help │ Service │ help │ Yes │ 0 │ - │
│ private.help.* │ Service │ private.help.* │ No │ 0 │ - │
╰────────────────┴─────────┴────────────────┴────────┴─────────────┴──────────╯nsc list keys --account B╭──────────────────────────────────────────────────────────────────────────────────────────╮
│ Keys │
├────────┬──────────────────────────────────────────────────────────┬─────────────┬────────┤
│ Entity │ Key │ Signing Key │ Stored │
├────────┼──────────────────────────────────────────────────────────┼─────────────┼────────┤
│ O │ OAFEEYZSYYVI4FXLRXJTMM32PQEI3RGOWZJT7Y3YFM4HB7ACPE4RTJPG │ │ * │
│ B │ AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │ │ * │
│ b │ UDKNTNEL5YD66U2FZZ2B3WX2PLJFKEFHAPJ3NWJBFF44PT76Y2RAVFVE │ │ * │
╰────────┴──────────────────────────────────────────────────────────┴─────────────┴────────╯nsc generate activation --account A --target-account AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H --subject private.help.AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H -o /tmp/activation.jwt[ OK ] generated "private.help.*" activation for account "AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H"
[ OK ] wrote account description to "/tmp/activation.jwt"cat /tmp/activation.jwt-----BEGIN NATS ACTIVATION JWT-----
eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5In0.eyJqdGkiOiJUS01LNEFHT1pOVERDTERGUk9QTllNM0hHUVRDTEJTUktNQUxXWTVSUUhFVEVNNE1VTDdBIiwiaWF0IjoxNTc1NDkxNjEwLCJpc3MiOiJBREVUUFQzNldCSUJVS00zSUJDVk00QTVZVVNEWEZFSlBXNE02R0dWQllDQlc3UlJORlRWNU5HRSIsIm5hbWUiOiJwcml2YXRlLmhlbHAuQUFNNDZFM1lGNVdPWlNFNVdOWVdITjNZWUlTVlpPU0k2WEhURjJRNjRFQ1BYU0ZRWlJPSk1QMkgiLCJzdWIiOiJBQU00NkUzWUY1V09aU0U1V05ZV0hOM1lZSVNWWk9TSTZYSFRGMlE2NEVDUFhTRlFaUk9KTVAySCIsInR5cGUiOiJhY3RpdmF0aW9uIiwibmF0cyI6eyJzdWJqZWN0IjoicHJpdmF0ZS5oZWxwLkFBTTQ2RTNZRjVXT1pTRTVXTllXSE4zWVlJU1ZaT1NJNlhIVEYyUTY0RUNQWFNGUVpST0pNUDJIIiwidHlwZSI6InNlcnZpY2UifX0.4tFx_1UzPUwbV8wFNIJsQYu91K9hZaGRLE10nOphfHGetvMPv1384KC-1AiNdhApObSDFosdDcpjryD0QxaDCQ
------END NATS ACTIVATION JWT------nsc describe jwt -f /tmp/activation.jwt╭─────────────────────────────────────────────────────────────────────────────────────────╮
│ Activation │
├─────────────────┬───────────────────────────────────────────────────────────────────────┤
│ Name │ private.help.AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │
│ Account ID │ AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │
│ Issuer ID │ ADETPT36WBIBUKM3IBCVM4A5YUSDXFEJPW4M6GGVBYCBW7RRNFTV5NGE │
│ Issued │ 2019-12-04 20:33:30 UTC │
│ Expires │ │
├─────────────────┼───────────────────────────────────────────────────────────────────────┤
│ Hash ID │ DD6BZKI2LTQKAJYD5GTSI4OFUG72KD2BF74NFVLUNO47PR4OX64Q==== │
├─────────────────┼───────────────────────────────────────────────────────────────────────┤
│ Import Type │ Service │
│ Import Subject │ private.help.AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │
├─────────────────┼───────────────────────────────────────────────────────────────────────┤
│ Max Messages │ Unlimited │
│ Max Msg Payload │ Unlimited │
│ Network Src │ Any │
│ Time │ Any │
╰─────────────────┴───────────────────────────────────────────────────────────────────────╯nsc add import --account B -u /tmp/activation.jwt --local-subject private.help --name private.help[ OK ] added service import "private.help.AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H"nsc describe account B╭──────────────────────────────────────────────────────────────────────────────────────╮
│ Account Details │
├───────────────────────────┬──────────────────────────────────────────────────────────┤
│ Name │ B │
│ Account ID │ AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │
│ Issuer ID │ OAFEEYZSYYVI4FXLRXJTMM32PQEI3RGOWZJT7Y3YFM4HB7ACPE4RTJPG │
│ Issued │ 2019-12-04 20:38:06 UTC │
│ Expires │ │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Max Connections │ Unlimited │
│ Max Leaf Node Connections │ Unlimited │
│ Max Data │ Unlimited │
│ Max Exports │ Unlimited │
│ Max Imports │ Unlimited │
│ Max Msg Payload │ Unlimited │
│ Max Subscriptions │ Unlimited │
│ Exports Allows Wildcards │ True │
├───────────────────────────┼──────────────────────────────────────────────────────────┤
│ Exports │ None │
╰───────────────────────────┴──────────────────────────────────────────────────────────╯
╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ Imports │
├──────────────┬─────────┬───────────────────────────────────────────────────────────────────────┬──────────────┬─────────┬──────────────┬────────┤
│ Name │ Type │ Remote │ Local/Prefix │ Expires │ From Account │ Public │
├──────────────┼─────────┼───────────────────────────────────────────────────────────────────────┼──────────────┼─────────┼──────────────┼────────┤
│ help │ Service │ help │ help │ │ A │ Yes │
│ private.help │ Service │ private.help.AAM46E3YF5WOZSE5WNYWHN3YYISVZOSI6XHTF2Q64ECPXSFQZROJMP2H │ private.help │ │ A │ No │
╰──────────────┴─────────┴───────────────────────────────────────────────────────────────────────┴──────────────┴─────────┴──────────────┴────────╯nsc reply --account A --user U "private.help.*" "help is here"
nsc req --account B --user b private.help help_mepublished request: [private.help] : 'help_me'
received reply: [_INBOX.3MhS0iCHfqO8wUl1x59bHB.jpE2jvEj] : 'help is here'# Configure a cluster that's dedicated to always sync writes.
server_tags: ["sync:always"]
jetstream {
sync_interval: always
}nats stream add --replicas 3 --tag sync:alwaysAckWaitno_auth_user Configurationfunc ExampleJetStream() {
nc, err
Model Name: MacBook Pro
Model Identifier: Mac16,1
Model Number: MW2U3LL/A
Chip: Apple M4
Total Number of Cores: 10 (4 performance and 6 efficiency)
Memory: 16 GB
System Firmware Version: 13822.1.2
OS Loader Version: 13822.1.2try (Connection nc = Nats.connect("localhost")) {
JetStreamManagement jsm = nc.jetStreamManagement();
jsm.addStream(StreamConfiguration.builder()
.name("example-stream")
.subjects("example-subject")
.build());
JetStream js = jsm.jetStream();
// Publish Synchronously
PublishAck pa = js.publish("example-subject", "Hello JS Sync!".getBytes());
System.out.println("Publish Sequence: " + pa.getSeqno());
// Publish Asynchronously
CompletableFuture<PublishAck> future =
js.publishAsync("example-subject", "Hello JS Async!".getBytes());
try {
pa = future.get(1, TimeUnit.SECONDS);
System.out.println("Publish Sequence: " + pa.getSeqno());
}
catch (ExecutionException e) {
// Might have been a problem with the publish,
// such as a failed expectation (advanced feature)
// Also could be that the publish ack did not return in time
// from the internal request timeout
}
catch (TimeoutException e) {
// The future timed out meaning it's timeout was shorter than
// the publish async's request timeout
}
catch (InterruptedException e) {
// The future.get() thread was interrupted.
}
}import { connect, Empty } from "../../src/mod.ts";
const nc = await connect();
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "example-stream", subjects: ["example-subject"] });
const js = await nc.jetstream();
// the jetstream client provides a publish that returns
// a confirmation that the message was received and stored
// by the server. You can associate various expectations
// when publishing a message to prevent duplicates.
// If the expectations are not met, the message is rejected.
let pa = await js.publish("example-subject", Empty, {
msgID: "a",
expect: { streamName: "example-stream" },
});
console.log(`${pa.stream}[${pa.seq}]: duplicate? ${pa.duplicate}`);
pa = await js.publish("example-subject", Empty, {
msgID: "a",
expect: { lastSequence: 1 },
});
console.log(`${pa.stream}[${pa.seq}]: duplicate? ${pa.duplicate}`);
await jsm.streams.delete("example-stream");
await nc.drain();import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'example-subject'.
await js.add_stream(name="example-stream", subjects=["example-subject"])
for i in range(0, 10):
ack = await js.publish("example-subject", f"hello world: {i}".encode())
print(ack)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
await using var client = new NatsClient();
INatsJSContext js = client.CreateJetStreamContext();
// Create a stream
var streamConfig = new StreamConfig(name: "example-stream", subjects: ["example-subject"]);
await js.CreateStreamAsync(streamConfig);
// Publish a message
{
PubAckResponse ack = await js.PublishAsync("example-subject", "Hello, JetStream!");
ack.EnsureSuccess();
}
// Publish messages concurrently
List<NatsJSPublishConcurrentFuture> futures = new();
for (var i = 0; i < 500; i++)
{
NatsJSPublishConcurrentFuture future
= await js.PublishConcurrentAsync("example-subject", "Hello, JetStream 1!");
futures.Add(future);
}
foreach (var future in futures)
{
await using (future)
{
PubAckResponse ack = await future.GetResponseAsync();
ack.EnsureSuccess();
}
}#include "examples.h"
static const char *usage = ""\
"-stream stream name (default is 'foo')\n" \
"-txt text to send (default is 'hello')\n" \
"-count number of messages to send\n" \
"-sync publish synchronously (default is async)\n";
static void
_jsPubErr(jsCtx *js, jsPubAckErr *pae, void *closure)
{
int *errors = (int*) closure;
printf("Error: %u - Code: %u - Text: %s\n", pae->Err, pae->ErrCode, pae->ErrText);
printf("Original message: %.*s\n", natsMsg_GetDataLength(pae->Msg), natsMsg_GetData(pae->Msg));
*errors = (*errors + 1);
// If we wanted to resend the original message, we would do something like that:
//
// js_PublishMsgAsync(js, &(pae->Msg), NULL);
//
// Note that we use `&(pae->Msg)` so that the library set it to NULL if it takes
// ownership, and the library will not destroy the message when this callback returns.
// No need to destroy anything, everything is handled by the library.
}
int main(int argc, char **argv)
{
natsConnection *conn = NULL;
natsStatistics *stats = NULL;
natsOptions *opts = NULL;
jsCtx *js = NULL;
jsOptions jsOpts;
jsErrCode jerr = 0;
natsStatus s;
int dataLen=0;
volatile int errors = 0;
bool delStream = false;
opts = parseArgs(argc, argv, usage);
dataLen = (int) strlen(payload);
s = natsConnection_Connect(&conn, opts);
if (s == NATS_OK)
s = jsOptions_Init(&jsOpts);
if (s == NATS_OK)
{
if (async)
{
jsOpts.PublishAsync.ErrHandler = _jsPubErr;
jsOpts.PublishAsync.ErrHandlerClosure = (void*) &errors;
}
s = natsConnection_JetStream(&js, conn, &jsOpts);
}
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
// First check if the stream already exists.
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_NOT_FOUND)
{
jsStreamConfig cfg;
// Since we are the one creating this stream, we can delete at the end.
delStream = true;
// Initialize the configuration structure.
jsStreamConfig_Init(&cfg);
cfg.Name = stream;
// Set the subject
cfg.Subjects = (const char*[1]){subj};
cfg.SubjectsLen = 1;
// Make it a memory stream.
cfg.Storage = js_MemoryStorage;
// Add the stream,
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
}
if (s == NATS_OK)
{
printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
// Need to destroy the returned stream object.
jsStreamInfo_Destroy(si);
}
}
if (s == NATS_OK)
s = natsStatistics_Create(&stats);
if (s == NATS_OK)
{
printf("\nSending %" PRId64 " messages to subject '%s'\n", total, stream);
start = nats_Now();
}
for (count = 0; (s == NATS_OK) && (count < total); count++)
{
if (async)
s = js_PublishAsync(js, subj, (const void*) payload, dataLen, NULL);
else
{
jsPubAck *pa = NULL;
s = js_Publish(&pa, js, subj, (const void*) payload, dataLen, NULL, &jerr);
if (s == NATS_OK)
{
if (pa->Duplicate)
printf("Got a duplicate message! Sequence=%" PRIu64 "\n", pa->Sequence);
jsPubAck_Destroy(pa);
}
}
}
if ((s == NATS_OK) && async)
{
jsPubOptions jsPubOpts;
jsPubOptions_Init(&jsPubOpts);
// Let's set it to 30 seconds, if getting "Timeout" errors,
// this may need to be increased based on the number of messages
// being sent.
jsPubOpts.MaxWait = 30000;
s = js_PublishAsyncComplete(js, &jsPubOpts);
if (s == NATS_TIMEOUT)
{
// Let's get the list of pending messages. We could resend,
// etc, but for now, just destroy them.
natsMsgList list;
js_PublishAsyncGetPendingList(&list, js);
natsMsgList_Destroy(&list);
}
}
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
elapsed = nats_Now() - start;
printStats(STATS_OUT, conn, NULL, stats);
printPerf("Sent");
if (errors != 0)
printf("There were %d asynchronous errors\n", errors);
// Let's report some stats after the run
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_OK)
{
printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
jsStreamInfo_Destroy(si);
}
}
if (delStream && (js != NULL))
{
printf("\nDeleting stream %s: ", stream);
s = js_DeleteStream(js, stream, NULL, &jerr);
if (s == NATS_OK)
printf("OK!");
printf("\n");
}
if (s != NATS_OK)
{
printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
nats_PrintLastErrorStack(stderr);
}
// Destroy all our objects to avoid report of memory leak
jsCtx_Destroy(js);
natsStatistics_Destroy(stats);
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);
// To silence reports of memory still in used with valgrind
nats_Close();
return 0;
}nats bench js fetch uses the Fetch() function on a durable consumer to receive messages in batches.
DeliverGroup string in ConsumerConfig to specify which deliver group (or queue group name) the consumer is created fornats-server -m 8222 -js[[2932] 2025/10/28 12:29:02.879297 [INF] Starting nats-server
[2932] 2025/10/28 12:29:02.879658 [INF] Version: 2.12.1
[2932] 2025/10/28 12:29:02.879661 [INF] Git: [fab5f99]
[2932] 2025/10/28 12:29:02.879664 [INF] Name: NBIYCV5UNYPP2ZBZJZNGQ7UJNJILSQZCD6MK2CPWU6UY7PHYPKWOYYS4
[2932] 2025/10/28 12:29:02.879667 [INF] Node: YNleYaHo
[2932] 2025/10/28 12:29:02.879668 [INF] ID: NBIYCV5UNYPP2ZBZJZNGQ7UJNJILSQZCD6MK2CPWU6UY7PHYPKWOYYS4
[2932] 2025/10/28 12:29:02.880586 [INF] Starting http monitor on 0.0.0.0:8222
[2932] 2025/10/28 12:29:02.880696 [INF] Starting JetStream
[2932] 2025/10/28 12:29:02.880755 [WRN] Temporary storage directory used, data could be lost on system reboot
[2932] 2025/10/28 12:29:02.881014 [INF] _ ___ _____ ___ _____ ___ ___ _ __ __
[2932] 2025/10/28 12:29:02.881018 [INF] _ | | __|_ _/ __|_ _| _ \ __| /_\ | \/ |
[2932] 2025/10/28 12:29:02.881019 [INF] | || | _| | | \__ \ | | | / _| / _ \| |\/| |
[2932] 2025/10/28 12:29:02.881020 [INF] \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_| |_|
[2932] 2025/10/28 12:29:02.881020 [INF]
[2932] 2025/10/28 12:29:02.881021 [INF] https://docs.nats.io/jetstream
[2932] 2025/10/28 12:29:02.881022 [INF]
[2932] 2025/10/28 12:29:02.881022 [INF] ---------------- JETSTREAM ----------------
[2932] 2025/10/28 12:29:02.881023 [INF] Strict: true
[2932] 2025/10/28 12:29:02.881026 [INF] Max Memory: 12.00 GB
[2932] 2025/10/28 12:29:02.881027 [INF] Max Storage: 233.86 GB
[2932] 2025/10/28 12:29:02.881027 [INF] Store Directory: "/var/folders/cx/x13pjm0n3ds6w4q_4xhr_c0r0000gn/T/nats/jetstream"
[2932] 2025/10/28 12:29:02.881029 [INF] API Level: 2
[2932] 2025/10/28 12:29:02.881030 [INF] -------------------------------------------
[2932] 2025/10/28 12:29:02.881335 [INF] Listening for client connections on 0.0.0.0:4222
[2932] 2025/10/28 12:29:02.881434 [INF] Server is readynats bench pub foo --size 16 --msgs 100000012:45:18 Starting Core NATS publisher benchmark [clients=1, msg-size=16 B, msgs=1,000,000, multi-subject=false, multi-subject-max=100,000, sleep=0s, subject=foo]
12:45:18 [1] Starting Core NATS publisher, publishing 1,000,000 messages
Finished 0s [================================================================] 100%
NATS Core NATS publisher stats: 14,786,683 msgs/sec ~ 226 MiB/sec ~ 0.07usnats bench sub foo --size 16 --msgs 1000000nats bench pub foo --size 16 --msgs 100000013:15:53 Starting Core NATS publisher benchmark [clients=1, msg-size=16 B, msgs=1,000,000, multi-subject=false, multi-subject-max=100,000, sleep=0s, subject=foo]
13:15:53 [1] Starting Core NATS publisher, publishing 1,000,000 messages
Finished 0s [================================================================] 100%
NATS Core NATS publisher stats: 4,925,767 msgs/sec ~ 75 MiB/sec ~ 0.20us13:15:50 Starting Core NATS subscriber benchmark [clients=1, msg-size=16 B, msgs=1,000,000, multi-subject=false, subject=foo]
13:15:50 [1] Starting Core NATS subscriber, expecting 1,000,000 messages
Finished 0s [============================================================] 100%
NATS Core NATS subscriber stats: 4,928,153 msgs/sec ~ 75 MiB/sec ~ 0.20usnats bench pub foo --size 16kb13:20:18 Starting Core NATS publisher benchmark [clients=1, msg-size=16 KiB, msgs=100,000, multi-subject=false, multi-subject-max=100,000, sleep=0s, subject=foo]
13:20:18 [1] Starting Core NATS publisher, publishing 100,000 messages
Finished 0s [================================================================] 100%
NATS Core NATS publisher stats: 230,800 msgs/sec ~ 3.5 GiB/sec ~ 4.33usnats bench sub foo --size 16kb13:20:15 Starting Core NATS subscriber benchmark [clients=1, msg-size=16 KiB, msgs=100,000, multi-subject=false, subject=foo]
13:20:15 [1] Starting Core NATS subscriber, expecting 100,000 messages
Finished 0s [============================================================] 100%
NATS Core NATS subscriber stats: 226,091 msgs/sec ~ 3.4 GiB/sec ~ 4.42usnats bench sub foo --clients 4nats bench pub foo13:34:26 Starting Core NATS publisher benchmark [clients=1, msg-size=128 B, msgs=100,000, multi-subject=false, multi-subject-max=100,000, sleep=0s, subject=foo]
13:34:26 [1] Starting Core NATS publisher, publishing 100,000 messages
Finished 0s [================================================================] 100%
NATS Core NATS publisher stats: 1,012,200 msgs/sec ~ 124 MiB/sec ~ 0.99us13:34:24 Starting Core NATS subscriber benchmark [clients=4, msg-size=128 B, msgs=100,000, multi-subject=false, subject=foo]
13:34:24 [1] Starting Core NATS subscriber, expecting 100,000 messages
13:34:24 [2] Starting Core NATS subscriber, expecting 100,000 messages
13:34:24 [3] Starting Core NATS subscriber, expecting 100,000 messages
13:34:24 [4] Starting Core NATS subscriber, expecting 100,000 messages
Finished 0s [============================================================] 100%
Finished 0s [============================================================] 100%
Finished 0s [============================================================] 100%
Finished 0s [============================================================] 100%
[1] 1,013,938 msgs/sec ~ 124 MiB/sec ~ 0.99us (100,000 msgs)
[2] 1,014,120 msgs/sec ~ 124 MiB/sec ~ 0.99us (100,000 msgs)
[3] 1,007,242 msgs/sec ~ 123 MiB/sec ~ 0.99us (100,000 msgs)
[4] 1,004,311 msgs/sec ~ 123 MiB/sec ~ 1.00us (100,000 msgs)
NATS Core NATS subscriber aggregated stats: 4,015,923 msgs/sec ~ 490 MiB/sec
message rates min 1,004,311 | avg 1,009,902 | max 1,014,120 | stddev 4,254 msgs
avg latencies min 0.99us | avg 0.99us | max 1.00us | stddev 0.00usnats bench sub foo --clients 4 --msgs 1000000nats bench pub foo --clients 4 --msgs 100000013:40:24 Starting Core NATS publisher benchmark [clients=4, msg-size=128 B, msgs=1,000,000, multi-subject=false, multi-subject-max=100,000, sleep=0s, subject=foo]
13:40:24 [1] Starting Core NATS publisher, publishing 250,000 messages
13:40:24 [2] Starting Core NATS publisher, publishing 250,000 messages
13:40:24 [3] Starting Core NATS publisher, publishing 250,000 messages
13:40:24 [4] Starting Core NATS publisher, publishing 250,000 messages
Finished 0s [================================================================] 100%
Finished 0s [================================================================] 100%
Finished 0s [================================================================] 100%
Finished 0s [================================================================] 100%
[1] 272,785 msgs/sec ~ 33 MiB/sec ~ 3.67us (250,000 msgs)
[2] 271,251 msgs/sec ~ 33 MiB/sec ~ 3.69us (250,000 msgs)
[3] 270,340 msgs/sec ~ 33 MiB/sec ~ 3.70us (250,000 msgs)
[4] 270,040 msgs/sec ~ 33 MiB/sec ~ 3.70us (250,000 msgs)
NATS Core NATS publisher aggregated stats: 1,080,144 msgs/sec ~ 132 MiB/sec
message rates min 270,040 | avg 271,104 | max 272,785 | stddev 1,068 msgs
avg latencies min 3.67us | avg 3.69us | max 3.70us | stddev 0.01us13:40:18 Starting Core NATS subscriber benchmark [clients=4, msg-size=128 B, msgs=1,000,000, multi-subject=false, subject=foo]
13:40:18 [1] Starting Core NATS subscriber, expecting 1,000,000 messages
13:40:18 [2] Starting Core NATS subscriber, expecting 1,000,000 messages
13:40:18 [3] Starting Core NATS subscriber, expecting 1,000,000 messages
13:40:18 [4] Starting Core NATS subscriber, expecting 1,000,000 messages
Finished 0s [============================================================] 100%
Finished 0s [============================================================] 100%
Finished 0s [============================================================] 100%
Finished 0s [============================================================] 100%
[1] 1,080,830 msgs/sec ~ 132 MiB/sec ~ 0.93us (1,000,000 msgs)
[2] 1,080,869 msgs/sec ~ 132 MiB/sec ~ 0.93us (1,000,000 msgs)
[3] 1,080,849 msgs/sec ~ 132 MiB/sec ~ 0.93us (1,000,000 msgs)
[4] 1,080,821 msgs/sec ~ 132 MiB/sec ~ 0.93us (1,000,000 msgs)
NATS Core NATS subscriber aggregated stats: 4,323,201 msgs/sec ~ 528 MiB/sec
message rates min 1,080,821 | avg 1,080,842 | max 1,080,869 | stddev 18 msgs
avg latencies min 0.93us | avg 0.93us | max 0.93us | stddev 0.00usnats bench service serve foonats bench service request foo13:46:43 Starting Core NATS service requester benchmark [clients=1, msg-size=128 B, msgs=100,000, sleep=0s, subject=foo]
13:46:43 [1] Starting Core NATS service requester, requesting 100,000 messages
Finished 5s [================================================================] 100%
NATS Core NATS service requester stats: 19,659 msgs/sec ~ 2.4 MiB/sec ~ 50.87usnats bench service serve foo --size 16 --clients 2nats bench service request foo --size 16 --clients 50 --no-progress13:57:56 Starting Core NATS service requester benchmark [clients=50, msg-size=16 B, msgs=100,000, sleep=0s, subject=foo]
13:57:56 [1] Starting Core NATS service requester, requesting 2,000 messages
13:57:56 [2] Starting Core NATS service requester, requesting 2,000 messages
...
13:57:56 [49] Starting Core NATS service requester, requesting 2,000 messages
13:57:56 [50] Starting Core NATS service requester, requesting 2,000 messages
[1] 2,735 msgs/sec ~ 43 KiB/sec ~ 365.62us (2,000 msgs)
[2] 2,700 msgs/sec ~ 42 KiB/sec ~ 370.24us (2,000 msgs)
...
[49] 2,651 msgs/sec ~ 41 KiB/sec ~ 377.14us (2,000 msgs)
[50] 2,649 msgs/sec ~ 41 KiB/sec ~ 377.48us (2,000 msgs)
NATS Core NATS service requester aggregated stats: 132,438 msgs/sec ~ 2.0 MiB/sec
message rates min 2,649 | avg 2,673 | max 2,735 | stddev 17 msgs
avg latencies min 365.62us | avg 373.93us | max 377.48us | stddev 2.43usnats bench js pub sync jsfoo --size 16 --create --storage memory18:47:47 Starting JetStream synchronous publisher benchmark [batch=0, clients=1, dedup-window=2m0s, deduplication=false, max-bytes=1,073,741,824, msg-size=16 B, msgs=100,000, multi-subject=false, multi-subject-max=100,000, purge=false, replicas=1, sleep=0s, storage=memory, stream=benchstream, subject=jsfoo]
18:47:47 Using stream: benchstream
18:47:47 [1] Starting JetStream synchronous publisher, publishing 100,000 messages
Publishing 2s [================================================================] 100%
NATS JetStream synchronous publisher stats: 35,734 msgs/sec ~ 558 KiB/sec ~ 27.98usnats bench js pub batch jsfoo --size 16 --batch 1000 --purge --storage memory18:51:27 Starting JetStream batched publisher benchmark [batch=1,000, clients=1, msg-size=16 B, msgs=100,000, multi-subject=false, multi-subject-max=100,000, purge=true, sleep=0s, stream=benchstream, subject=jsfoo]
18:51:27 Using stream: benchstream
18:51:27 Purging the stream
18:51:27 [1] Starting JetStream batched publisher, publishing 100,000 messages
Finished 0s [================================================================] 100%
NATS JetStream batched publisher stats: 627,430 msgs/sec ~ 9.6 MiB/sec ~ 1.59usnats stream rm -f benchstream
nats bench js pub async jsfoo --create13:09:34 Starting JetStream asynchronous publisher benchmark [batch=500, clients=1, dedup-window=2m0s, deduplication=false, max-bytes=1,073,741,824, msg-size=128 B, msgs=100,000, multi-subject=false, multi-subject-max=100,000, purge=false, replicas=1, sleep=0s, storage=file, stream=benchstream, subject=jsfoo]
13:09:34 Using stream: benchstream
13:09:34 [1] Starting JetStream asynchronous publisher, publishing 100,000 messages
Finished 0s [================================================================] 100%
NATS JetStream asynchronous publisher stats: 403,828 msgs/sec ~ 49 MiB/sec ~ 2.48usnats bench js ordered13:33:48 Starting JetStream ordered ephemeral consumer benchmark [clients=1, msg-size=128 B, msgs=100,000, purge=false, sleep=0s, stream=benchstream]
13:33:48 [1] Starting JetStream ordered ephemeral consumer, expecting 100,000 messages
Finished 0s [================================================================] 100%
NATS JetStream ordered ephemeral consumer stats: 1,201,540 msgs/sec ~ 147 MiB/sec ~ 0.83usnats bench js consume --clients 4 --no-progress13:46:04 Starting JetStream durable consumer (callback) benchmark [acks=explicit, batch=500, clients=4, consumer=nats-bench, double-acked=false, msg-size=128 B, msgs=100,000, purge=false, sleep=0s, stream=benchstream]
13:46:04 [1] Starting JetStream durable consumer (callback), expecting 25,000 messages
13:46:04 [2] Starting JetStream durable consumer (callback), expecting 25,000 messages
13:46:04 [3] Starting JetStream durable consumer (callback), expecting 25,000 messages
13:46:04 [4] Starting JetStream durable consumer (callback), expecting 25,000 messages
[1] 73,230 msgs/sec ~ 8.9 MiB/sec ~ 13.66us (25,000 msgs)
[2] 72,921 msgs/sec ~ 8.9 MiB/sec ~ 13.71us (25,000 msgs)
[3] 72,696 msgs/sec ~ 8.9 MiB/sec ~ 13.76us (25,000 msgs)
[4] 72,687 msgs/sec ~ 8.9 MiB/sec ~ 13.76us (25,000 msgs)
NATS JetStream durable consumer (callback) aggregated stats: 290,438 msgs/sec ~ 36 MiB/sec
message rates min 72,687 | avg 72,883 | max 73,230 | stddev 220 msgs
avg latencies min 13.66us | avg 13.72us | max 13.76us | stddev 0.04usnats bench js fetch --acks none --clients 214:09:10 Starting JetStream durable consumer (fetch) benchmark [acks=none, batch=500, clients=2, consumer=nats-bench, double-acked=false, msg-size=128 B, msgs=100,000, purge=false, sleep=0s, stream=benchstream]
14:09:10 [1] Starting JetStream durable consumer (fetch), expecting 50,000 messages
14:09:10 [2] Starting JetStream durable consumer (fetch), expecting 50,000 messages
Finished 0s [================================================================] 100%
Finished 0s [================================================================] 100%
[1] 567,330 msgs/sec ~ 69 MiB/sec ~ 1.76us (50,000 msgs)
[2] 567,067 msgs/sec ~ 69 MiB/sec ~ 1.76us (50,000 msgs)
NATS JetStream durable consumer (fetch) aggregated stats: 1,128,932 msgs/sec ~ 138 MiB/sec
message rates min 567,067 | avg 567,198 | max 567,330 | stddev 131 msgs
avg latencies min 1.76us | avg 1.76us | max 1.76us | stddev 0.00usnats bench js get sync14:13:30 Starting JetStream synchronous getter benchmark [clients=1, msg-size=128 B, msgs=100,000, sleep=0s, stream=benchstream]
14:13:30 [1] Starting JetStream synchronous getter, expecting 100,000 messages
Finished 3s [================================================================] 100%
NATS JetStream synchronous getter stats: 33,244 msgs/sec ~ 4.1 MiB/sec ~ 30.08usnats bench js get batch --clients 214:11:09 Starting JetStream batched direct getter benchmark [batch=500, clients=2, filter=>, msg-size=128 B, msgs=100,000, sleep=0s, stream=benchstream]
14:11:09 [1] Starting JetStream batched direct getter, expecting 100,000 messages
14:11:09 [2] Starting JetStream batched direct getter, expecting 100,000 messages
Finished 0s [================================================================] 100%
Finished 0s [================================================================] 100%
[1] 509,387 msgs/sec ~ 62 MiB/sec ~ 1.96us (100,000 msgs)
[2] 500,449 msgs/sec ~ 61 MiB/sec ~ 2.00us (100,000 msgs)
NATS JetStream batched direct getter aggregated stats: 1,000,898 msgs/sec ~ 122 MiB/sec
message rates min 500,449 | avg 504,918 | max 509,387 | stddev 4,469 msgs
avg latencies min 1.96us | avg 1.98us | max 2.00us | stddev 0.02usnats bench js ordered --purge --clients 8 --no-progressnats bench js pub async jsfoo --clients 8 --no-progress15:23:08 Starting JetStream asynchronous publisher benchmark [batch=500, clients=8, msg-size=128 B, msgs=100,000, multi-subject=false, multi-subject-max=100,000, purge=false, sleep=0s, stream=benchstream, subject=jsfoo]
15:23:08 Using stream: benchstream
15:23:08 [1] Starting JetStream asynchronous publisher, publishing 12,500 messages
15:23:08 [2] Starting JetStream asynchronous publisher, publishing 12,500 messages
...
15:23:08 [7] Starting JetStream asynchronous publisher, publishing 12,500 messages
15:23:08 [8] Starting JetStream asynchronous publisher, publishing 12,500 messages
[1] 33,289 msgs/sec ~ 4.1 MiB/sec ~ 30.04us (12,500 msgs)
[2] 33,242 msgs/sec ~ 4.1 MiB/sec ~ 30.08us (12,500 msgs)
...
[7] 31,947 msgs/sec ~ 3.9 MiB/sec ~ 31.30us (12,500 msgs)
[8] 31,586 msgs/sec ~ 3.9 MiB/sec ~ 31.66us (12,500 msgs)
NATS JetStream asynchronous publisher aggregated stats: 252,544 msgs/sec ~ 31 MiB/sec
message rates min 31,586 | avg 32,614 | max 33,289 | stddev 638 msgs
avg latencies min 30.04us | avg 30.67us | max 31.66us | stddev 0.60us 15:23:02 Starting JetStream ordered ephemeral consumer benchmark [clients=8, msg-size=128 B, msgs=100,000, purge=true, sleep=0s, stream=benchstream]
15:23:02 [1] Starting JetStream ordered ephemeral consumer, expecting 100,000 messages
15:23:02 [2] Starting JetStream ordered ephemeral consumer, expecting 100,000 messages
...
15:23:02 [7] Starting JetStream ordered ephemeral consumer, expecting 100,000 messages
15:23:02 [8] Starting JetStream ordered ephemeral consumer, expecting 100,000 messages
[1] 111,627 msgs/sec ~ 14 MiB/sec ~ 8.96us (100,000 msgs)
[2] 110,534 msgs/sec ~ 14 MiB/sec ~ 9.05us (100,000 msgs)
...
[7] 109,849 msgs/sec ~ 13 MiB/sec ~ 9.10us (100,000 msgs)
[8] 109,797 msgs/sec ~ 13 MiB/sec ~ 9.11us (100,000 msgs)
NATS JetStream ordered ephemeral consumer aggregated stats: 878,326 msgs/sec ~ 107 MiB/sec
message rates min 109,797 | avg 110,306 | max 111,627 | stddev 556 msgs
avg latencies min 8.96us | avg 9.07us | max 9.11us | stddev 0.05usnats bench kv put14:26:04 Starting JetStream KV putter benchmark [bucket=benchbucket, clients=1, msg-size=128 B, msgs=100,000, purge=false, sleep=0s]
14:26:04 [1] Starting JetStream KV putter, publishing 100,000 messages
Putting 3s [================================================================] 100%
NATS JetStream KV putter stats: 30,067 msgs/sec ~ 3.7 MiB/sec ~ 33.26usnats bench kv get --clients 16 --randomize 100000 --no-progress14:28:33 Starting JetStream KV getter benchmark [bucket=benchbucket, clients=16, msg-size=128 B, msgs=100,000, randomize=100,000, sleep=0s]
14:28:33 [1] Starting JetStream KV getter, trying to get 6,250 messages
14:28:33 [2] Starting JetStream KV getter, trying to get 6,250 messages
...
14:28:33 [15] Starting JetStream KV getter, trying to get 6,250 messages
14:28:33 [16] Starting JetStream KV getter, trying to get 6,250 messages
[1] 6,568 msgs/sec ~ 821 KiB/sec ~ 152.23us (6,250 msgs)
[2] 6,579 msgs/sec ~ 822 KiB/sec ~ 151.98us (6,250 msgs)
...
[15] 6,474 msgs/sec ~ 809 KiB/sec ~ 154.45us (6,250 msgs)
[16] 6,451 msgs/sec ~ 806 KiB/sec ~ 155.01us (6,250 msgs)
NATS JetStream KV getter aggregated stats: 102,844 msgs/sec ~ 13 MiB/sec
message rates min 6,448 | avg 6,509 | max 6,579 | stddev 40 msgs
avg latencies min 151.98us | avg 153.61us | max 155.08us | stddev 0.96usmappings: {
transform.order target.order
target.order transform.order
}server_name: "hub"
cluster: { name: "hub" }
mappings: {
orders.* orders.central.{{wildcard(1)}}
}server_name: "hub"
cluster: { name: "hub" }
mappings: {
orders.> orders.central.>}
}server_name: "store1"
cluster: { name: "store1" }
mappings: {
orders.central.* orders.local.{{wildcard(1)}}
}{
"name": "orders",
"subjects": [ "orders.local.*"],
"subject_transform":{"src":"orders.local.*","dest":"orders.{{wildcard(1)}}"},
"retention": "limits",
...
"republish": {
"src": "orders.*",
"dest": "orders.trace.{{wildcard(1)}}"
},nats trace orders.device1.order1
Tracing message route to subject orders.device1.order1
Client "NATS CLI Version development" cid:16 cluster:"hub" server:"hub" version:"2.11.0-dev"
Mapping subject:"orders.hub.device1.order1"
--J JetStream action:"stored" stream:"orders" subject:"orders.device1.order1"
--X No active interest
Legend: Client: --C Router: --> Gateway: ==> Leafnode: ~~> JetStream: --J Error: --X
Egress Count:
JetStream: 1nats server mapping foo bar foo
> barnats server mapping foo bar
> Enter subjects to test, empty subject terminates.
>
> ? Subject foo
> bar
> ? Subject test
> Error: no matching transforms availableserver_name: "hub"
cluster: { name: "hub" }
mappings: {
orders.flush orders.central.flush
orders.* orders.central.{{wildcard(1)}}
}server_name: "hub"
cluster: { name: "hub" }
mappings: {
orders.> orders.central.>
}server_name: "hub"
cluster: { name: "hub" }
accounts {
accountA: {
mappings: {
orders.flush orders.central.flush
orders.* orders.central.{{wildcard(1)}}
}
}
}nats server mapping ">" "baz.>" bar.a.b
> baz.bar.b.anats server mapping "bar.*.*" "baz.{{wildcard(2)}}.{{wildcard(1)}}" bar.a.b
> baz.b.anats server mapping "orders.*.*" "foo.{{wildcard(2)}}" orders.local.order1
> orders.order1nats server mapping "neworders.*" "neworders.{{wildcard(1)}}.{{partition(3,1)}}" neworders.customerid1
> neworders.customerid1.0nats server mapping "foo.*.*" "foo.{{wildcard(1)}}.{{wildcard(2)}}.{{partition(5,1,2)}}" foo.us.customerid
> foo.us.customerid.0nats server mapping "foo.*.*" "foo.{{wildcard(1)}}.{{wildcard(2)}}.{{partition(3,1,2)}}" myservice.requests: [
{ destination: myservice.requests.v1, weight: 100% }
] myservice.requests: [
{ destination: myservice.requests.v1, weight: 98% },
{ destination: myservice.requests.v2, weight: 2% }
]mappings = {
"foo":[
{destination:"foo.west", weight: 100%, cluster: "west"},
{destination:"foo.central", weight: 100%, cluster: "central"},
{destination:"foo.east", weight: 100%, cluster: "east"}
]
}{
"name": "orders",
"subjects": [ "orders.local.*"],
"subject_transform":{"src":"orders.local.*","dest":"orders.{{wildcard(1)}}"},
"retention": "limits",
...
"sources": [
{
"name": "other_orders",
"subject_transforms": [
{
"src": "orders.online.*",
"dest": "orders.{{wildcard(1)}}"
}
]
}
],
"republish": {
"src": "orders.*",
"dest": "orders.trace.{{wildcard(1)}}"
}
}resolver_tls {
cert_file: ...
key_file: ...
ca_file: ...
}nats req -H Nats-Msg-Id:1 ORDERS.new hello1
nats req -H Nats-Msg-Id:1 ORDERS.new hello2
nats req -H Nats-Msg-Id:1 ORDERS.new hello3
nats req -H Nats-Msg-Id:1 ORDERS.new hello4nats stream info ORDERS....
State:
Messages: 1
Bytes: 67 Bnats str info ORDERS...
Statistics:
Messages: 0
Bytes: 0 B
FirstSeq: 0
LastSeq: 0
Active Consumers: 1nats con info ORDERS DISPATCH...
State:
Last Delivered Message: Consumer sequence: 1 Stream sequence: 1
Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
Pending Messages: 0
Redelivered Messages: 0nats pub ORDERS.processed "order 4"Published 7 bytes to ORDERS.processed
$ nats str info ORDERS
...
Statistics:
Messages: 1
Bytes: 53 B
FirstSeq: 1
LastSeq: 1
Active Consumers: 1nats con next ORDERS DISPATCH--- received on ORDERS.processed
order 4
Acknowledged message
$ nats con info ORDERS DISPATCH
...
State:
Last Delivered Message: Consumer sequence: 2 Stream sequence: 2
Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
Pending Messages: 0
Redelivered Messages: 0nats pub ORDERS.processed "order 5"Published 7 bytes to ORDERS.processednats consumer next ORDERS DISPATCH --no-ack--- received on ORDERS.processed
order 5nats consumer info ORDERS DISPATCHState:
Last Delivered Message: Consumer sequence: 3 Stream sequence: 3
Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
Pending Messages: 1
Redelivered Messages: 0nats consumer next ORDERS DISPATCH --no-ack--- received on ORDERS.processed
order 5nats consumer info ORDERS DISPATCHState:
Last Delivered Message: Consumer sequence: 4 Stream sequence: 3
Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
Pending Messages: 1
Redelivered Messages: 1nats consumer next ORDERS DISPATCH --- received on ORDERS.processed
order 5
Acknowledged messagenats consumer info ORDERS DISPATCHState:
Last Delivered Message: Consumer sequence: 5 Stream sequence: 3
Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
Pending Messages: 0
Redelivered Messages: 0nats consumer add ORDERS ALL --pull --filter ORDERS.processed --ack none --replay instant --deliver all
nats consumer next ORDERS ALL--- received on ORDERS.processed
order 1
Acknowledged messagenats consumer add ORDERS LAST --pull --filter ORDERS.processed --ack none --replay instant --deliver last
nats consumer next ORDERS LAST--- received on ORDERS.processed
order 100
Acknowledged messagenats consumer add ORDERS TEN --pull --filter ORDERS.processed --ack none --replay instant --deliver 10
nats consumer next ORDERS TEN--- received on ORDERS.processed
order 10
Acknowledged messagenats stream purge ORDERS
for i in 1 2 3
do
nats pub ORDERS.processed "order ${i}"
sleep 60
donenats consumer add ORDERS 2MIN --pull --filter ORDERS.processed --ack none --replay instant --deliver 2m
nats consumer next ORDERS 2MIN--- received on ORDERS.processed
order 2
Acknowledged messagenats sub my.monitornats consumer add ORDERS --filter '' --ack none --target 'my.monitor' --deliver last --replay instant --ephemeralnats consumer add ORDERS REPLAY --target out.original --filter ORDERS.processed --ack none --deliver all --sample 100 --replay original...
Replay Policy: original
...for i in 1 2 3 <15:15:35
do
nats pub ORDERS.processed "order ${i}"
sleep 10
donePublished [ORDERS.processed] : 'order 1'
Published [ORDERS.processed] : 'order 2'
Published [ORDERS.processed] : 'order 3'nats sub -t out.originalListening on [out.original]
2020/01/03 15:17:26 [#1] Received on [ORDERS.processed]: 'order 1'
2020/01/03 15:17:36 [#2] Received on [ORDERS.processed]: 'order 2'
2020/01/03 15:17:46 [#3] Received on [ORDERS.processed]: 'order 3'
^Cnats consumer info ORDERS NEW...
Sampling Rate: 100
...length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + msg + hash(8)length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + hdr_len(4) + hdr + msg + hash(8)// There is not a single listener for connection events in the NATS Go Client.
// Instead, you can set individual event handlers using:
nc, err := nats.Connect("demo.nats.io",
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
log.Printf("client disconnected: %v", err)
}),
nats.ReconnectHandler(func(_ *nats.Conn) {
log.Printf("client reconnected")
}),
nats.ClosedHandler(func(_ *nats.Conn) {
log.Printf("client closed")
}))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
DisconnectHandler(cb ConnHandler)
ReconnectHandler(cb ConnHandler)
ClosedHandler(cb ConnHandler)
DiscoveredServersHandler(cb ConnHandler)
ErrorHandler(cb ErrHandler)// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
client.Connection.ConnectionDisconnected += async (sender, args) =>
{
Console.WriteLine($"Disconnected: {args.Message}");
};
client.Connection.ConnectionOpened += async (sender, args) =>
{
Console.WriteLine($"Connected: {args.Message}");
};
client.Connection.ReconnectFailed += async (sender, args) =>
{
Console.WriteLine($"Reconnect Failed: {args.Message}");
};
await client.ConnectAsync();# There is not a single listener for connection events in the Ruby NATS Client.
# Instead, you can set individual event handlers using:
NATS.on_disconnect do
end
NATS.on_reconnect do
end
NATS.on_close do
end
NATS.on_error do
endstatic void
disconnectedCB(natsConnection *conn, void *closure)
{
// Do something
printf("Connection disconnected\n");
}
static void
reconnectedCB(natsConnection *conn, void *closure)
{
// Do something
printf("Connection reconnected\n");
}
static void
closedCB(natsConnection *conn, void *closure)
{
// Do something
printf("Connection closed\n");
}
(...)
natsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetDisconnectedCB(opts, disconnectedCB, NULL);
if (s == NATS_OK)
s = natsOptions_SetReconnectedCB(opts, reconnectedCB, NULL);
if (s == NATS_OK)
s = natsOptions_SetClosedCB(opts, closedCB, NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);// Be notified if a new server joins the cluster.
// Print all the known servers and the only the ones that were discovered.
nc, err := nats.Connect("demo.nats.io",
nats.DiscoveredServersHandler(func(nc *nats.Conn) {
log.Printf("Known servers: %v\n", nc.Servers())
log.Printf("Discovered servers: %v\n", nc.DiscoveredServers())
}))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionclass ServersAddedListener implements ConnectionListener {
public void connectionEvent(Connection nc, Events event) {
if (event == Events.DISCOVERED_SERVERS) {
for (String server : nc.getServers()) {
System.out.println("Known server: "+server);
}
}
}
}
public class ListenForNewServers {
public static void main(String[] args) {
try {
Options options = new Options.Builder().
server("nats://demo.nats.io:4222").
connectionListener(new ServersAddedListener()). // Set the listener
build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}const nc = await connect({ servers: ["demo.nats.io:4222"] });
(async () => {
for await (const s of nc.status()) {
switch (s.type) {
case Status.Update:
t.log(`servers added - ${s.data.added}`);
t.log(`servers deleted - ${s.data.deleted}`);
break;
default:
}
}
})().then();# Asyncio NATS client does not support discovered servers handler right now// NATS .NET client does not support discovered servers handler right now# The Ruby NATS client does not support discovered servers handler right nowstatic void
discoveredServersCB(natsConnection *conn, void *closure)
{
natsStatus s = NATS_OK;
char **servers = NULL;
int count = 0;
s = natsConnection_GetDiscoveredServers(conn, &servers, &count);
if (s == NATS_OK)
{
int i;
// Do something...
for (i=0; i<count; i++)
printf("Discovered server: %s\n", servers[i]);
// Free allocated memory
for (i=0; i<count; i++)
free(servers[i]);
free(servers);
}
}
(...)
natsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetDiscoveredServersCB(opts, discoveredServersCB, NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);// Set the callback that will be invoked when an asynchronous error occurs.
nc, err := nats.Connect("demo.nats.io",
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
log.Printf("Error: %v", err)
}))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connectionclass MyErrorListener implements ErrorListener {
public void errorOccurred(Connection conn, String error)
{
System.out.println("The server notificed the client with: "+error);
}
public void exceptionOccurred(Connection conn, Exception exp) {
System.out.println("The connection handled an exception: "+exp.getLocalizedMessage());
}
public void slowConsumerDetected(Connection conn, Consumer consumer) {
System.out.println("A slow consumer was detected.");
}
}
public class SetErrorListener {
public static void main(String[] args) {
try {
Options options = new Options.Builder().
server("nats://demo.nats.io:4222").
errorListener(new MyErrorListener()). // Set the listener
build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}const nc = await connect({ servers: ["demo.nats.io"] });
// if the client gets closed with an error you can trap that
// condition in the closed handler like this:
nc.closed().then((err) => {
if (err) {
t.log(`the connection closed with an error ${err.message}`);
} else {
t.log(`the connection closed.`);
}
});
// if you have a status listener, it will too get notified
(async () => {
for await (const s of nc.status()) {
switch (s.type) {
case Status.Error:
// typically if you get this the nats connection will close
t.log("client got an async error from the server");
break;
default:
t.log(`got an unknown status ${s.type}`);
}
}
})().then();nc = NATS()
async def error_cb(e):
print("Error: ", e)
await nc.connect(
servers=["nats://demo.nats.io:4222"],
reconnect_time_wait=10,
error_cb=error_cb,
)
# Do something with the connection.// dotnet add package NATS.Net
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Net;
// NATS .NET client does not support error handler right now
// instead, you can use the logger since server errors are logged
// with the error level and eventId 1005 (Protocol Log Event).
await using var client = new NatsClient(new NatsOpts
{
LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()),
});require 'nats/client'
NATS.start(servers:["nats://demo.nats.io:4222"]) do |nc|
nc.on_error do |e|
puts "Error: #{e}"
end
nc.close
endstatic void
errorCB(natsConnection *conn, natsSubscription *sub, natsStatus s, void *closure)
{
// Do something
printf("Error: %d - %s\n", s, natsStatus_GetText(s));
}
(...)
natsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetErrorHandler(opts, errorCB, NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);class MyConnectionListener implements ConnectionListener {
public void connectionEvent(Connection natsConnection, Events event) {
System.out.println("Connection event - " + event);
}
}
public class SetConnectionListener {
public static void main(String[] args) {
try {
Options options = new Options.Builder()
.server("nats://demo.nats.io:4222")
.connectionListener(new MyConnectionListener()) // Set the listener
.build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}const nc = await connect({ servers: ["demo.nats.io"] });
nc.closed().then(() => {
t.log("the connection closed!");
});
(async () => {
for await (const s of nc.status()) {
switch (s.type) {
case Events.Disconnect:
t.log(`client disconnected - ${s.data}`);
break;
case Events.LDM:
t.log("client has been requested to reconnect");
break;
case Events.Update:
t.log(`client received a cluster update - ${s.data}`);
break;
case Events.Reconnect:
t.log(`client reconnected - ${s.data}`);
break;
case Events.Error:
t.log("client got a permissions error");
break;
case DebugEvents.Reconnecting:
t.log("client is attempting to reconnect");
break;
case DebugEvents.StaleConnection:
t.log("client has a stale connection");
break;
default:
t.log(`got an unknown status ${s.type}`);
}
}
})().then();# Asyncio NATS client can be defined a number of event callbacks
async def disconnected_cb():
print("Got disconnected!")
async def reconnected_cb():
# See who we are connected to on reconnect.
print("Got reconnected to {url}".format(url=nc.connected_url.netloc))
async def error_cb(e):
print("There was an error: {}".format(e))
async def closed_cb():
print("Connection is closed")
# Setup callbacks to be notified on disconnects and reconnects
options["disconnected_cb"] = disconnected_cb
options["reconnected_cb"] = reconnected_cb
# Setup callbacks to be notified when there is an error
# or connection is closed.
options["error_cb"] = error_cb
options["closed_cb"] = closed_cb
await nc.connect(**options)
nats stream add --cluster aws-us-east1-c1cluster {
name: aws-us-east1-c1
# etc..
}// Server A
server_tags: ["cloud:aws", "region:us-east1", "az:a"]
jetstream: {
unique_tag: "az"
}
// Server B
server_tags: ["cloud:aws", "region:us-east1", "az:b"]
jetstream: {
unique_tag: "az"
}
// Server C
server_tags: ["cloud:aws", "region:us-east1", "az:c"]
jetstream: {
unique_tag: "az"
}nats stream add --tag region:us-east1nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Use a WaitGroup to wait for 2 messages to arrive
wg := sync.WaitGroup{}
wg.Add(2)
// Subscribe
if _, err := nc.Subscribe("time.*.east", func(m
log.Printf("%s: %s", m.Subject, m.Data)
wg.Done()
}); err != nil {
log.Fatal(err)
}
// Wait for the 2 messages to come in
wg.Wait()Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for 2 messages to arrive
CountDownLatch latch = new CountDownLatch(2);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
String subject = msg.getSubject();
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(subject + ": " + str);
latch.countDown();
});
// Subscribe
d.subscribe("time.*.east");
// Wait for messages to come in
latch.await();
// Close the connection
nc.close();func ExampleJetStreamManager() {
nc, _ := nats.Connect("localhost")
js, _ := nc.JetStream()
// Create a stream
js.AddStream(&nats.StreamConfig{
Name: "example-stream",
Subjects: []string{"example-subject"},
MaxBytes: 1024,
})
// Update a stream
js.UpdateStream(&nats.StreamConfig{
Name: "example-stream",
MaxBytes: 2048,
})
// Create a durable consumer
js.AddConsumer("example-stream", &nats.ConsumerConfig{
Durable: "example-consumer-name",
})
// Get information about all streams (with Context JSOpt)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second
defer cancel()
for info := range js.StreamsInfo(nats.Context(ctx)) {
fmt.Println("stream name: ", info.Config.Name)
}
// Get information about all consumers (with MaxWait JSOpt)
for info := range js.ConsumersInfo("example-stream", nats.MaxWait(
fmt.Println("consumer name: ", info.Name)
}
// Delete a consumer
js.DeleteConsumer("example-stream", "example-consumer-name")
// Delete a stream
js.DeleteStream("example-stream")
}static void
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Need to destroy the message!
natsMsg_Destroy(msg);
}
(...)
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsStatus s;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
if (s == NATS_OK)
s = natsConnection_Subscribe(&sub, conn, "time.*.east", onMsg, NULL);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Use a WaitGroup to wait for 4 messages to arrive
wg := sync.WaitGroup{}
wg.Add(4)
// Subscribe
if _, err := nc.Subscribe("time.>", func(m *nats.Msg) {
log.Printf("%s: %s", m.Subject, m.Data)
wg.Done()
}); err != nil {
log.Fatal(err)
}
// Wait for the 4 messages to come in
wg.Wait()
// Close the connection
nc.Close()Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for 4 messages to arrive
CountDownLatch latch = new CountDownLatch(4);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
String subject = msg.getSubject();
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(subject + ": " + str);
latch.countDown();
});
// Subscribe
d.subscribe("time.>");
// Wait for messages to come in
latch.await();
// Close the connection
nc.close();let nc = NATS.connect({
url: "nats://demo.nats.io:4222"
});
nc.subscribe('time.>', (msg, reply, subject) => {
// converting timezones correctly in node requires a library
// this doesn't take into account *many* things.
let time = "";
switch (subject) {
case 'time.us.east':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
break;
case 'time.us.central':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
break;
case 'time.us.mountain':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
break;
case 'time.us.west':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
break;
default:
time = "I don't know what you are talking about Willis";
}
t.log(subject, time);
});nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
# Use queue to wait for 4 messages to arrive
queue = asyncio.Queue()
async def cb(msg):
await queue.put(msg)
await nc.subscribe("time.>", cb=cb)
# Send 2 messages and wait for them to come in
await nc.publish("time.A.east", b'A')
await nc.publish("time.B.east", b'B')
await nc.publish("time.C.west", b'C')
await nc.publish("time.D.west", b'D')
for i in range(0, 4):
msg = await queue.get()
print("Msg:", msg)
await nc.close()// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
var count = 0;
await foreach (var msg in client.SubscribeAsync<string>("time.>"))
{
Console.WriteLine($"Received {++count}: {msg.Subject}: {msg.Data}");
if (count == 4)
{
break;
}
}
Console.WriteLine("Done");
// Output:
// Received 1: time.us.east: 2024-10-21T22:11:24 America/New_York (-04)
// Received 2: time.us.east.atlanta: 2024-10-21T22:11:24 America/New_York (-04)
// Received 3: time.eu.east: 2024-10-22T04:11:24 Europe/Warsaw (+02)
// Received 4: time.eu.east.warsaw: 2024-10-22T04:11:24 Europe/Warsaw (+02)
// Donerequire 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
Fiber.new do
f = Fiber.current
nc.subscribe("time.>") do |msg, reply|
f.resume Time.now.to_f
end
nc.publish("time.A.east", "A")
nc.publish("time.B.east", "B")
nc.publish("time.C.west", "C")
nc.publish("time.D.west", "D")
# Use the response
4.times do
msg = Fiber.yield
puts "Msg: #{msg}"
end
end.resume
endawait nc.subscribe('time.>', (err, msg) => {
// converting timezones correctly in node requires a library
// this doesn't take into account *many* things.
let time = "";
switch (msg.subject) {
case 'time.us.east':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
break;
case 'time.us.central':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
break;
case 'time.us.mountain':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
break;
case 'time.us.west':
time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
break;
default:
time = "I don't know what you are talking about Willis";
}
t.log(msg.subject, time);
});static void
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Need to destroy the message!
natsMsg_Destroy(msg);
}
(...)
natsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsStatus s;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
if (s == NATS_OK)
s = natsConnection_Subscribe(&sub, conn, "time.>", onMsg, NULL);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
zoneID, err := time.LoadLocation("America/New_York")
if err != nil {
log.Fatal(err)
}
now := time.Now()
zoneDateTime := now.In(zoneID)
formatted := zoneDateTime.String()
nc.Publish("time.us.east", []byte(formatted))
nc.Publish("time.us.east.atlanta", []byte(formatted))
zoneID, err = time.LoadLocation("Europe/Warsaw")
if err != nil {
log.Fatal(err)
}
zoneDateTime = now.In(zoneID)
formatted = zoneDateTime.String()
nc.Publish("time.eu.east", []byte(formatted))
nc.Publish("time.eu.east.warsaw", []byte(formatted))Connection nc = Nats.connect("nats://demo.nats.io:4222");
ZoneId zoneId = ZoneId.of("America/New_York");
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(Instant.now(), zoneId);
String formatted = zonedDateTime.format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
nc.publish("time.us.east", formatted.getBytes(StandardCharsets.UTF_8));
nc.publish("time.us.east.atlanta", formatted.getBytes(StandardCharsets.UTF_8));
zoneId = ZoneId.of("Europe/Warsaw");
zonedDateTime = ZonedDateTime.ofInstant(Instant.now(), zoneId);
formatted = zonedDateTime.format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
nc.publish("time.eu.east", formatted.getBytes(StandardCharsets.UTF_8));
nc.publish("time.eu.east.warsaw", formatted.getBytes(StandardCharsets.UTF_8));
nc.flush(Duration.ZERO);
nc.close();nc.publish('time.us.east');
nc.publish('time.us.central');
nc.publish('time.us.mountain');
nc.publish('time.us.west');nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
await nc.publish("time.us.east", b'...')
await nc.publish("time.us.east.atlanta", b'...')
await nc.publish("time.eu.east", b'...')
await nc.publish("time.eu.east.warsaw", b'...')
await nc.close()// dotnet add package NATS.Net
// dotnet add package NodaTime
using NATS.Net;
using NodaTime;
await using var client = new NatsClient();
Instant now = SystemClock.Instance.GetCurrentInstant();
{
DateTimeZone zone = DateTimeZoneProviders.Tzdb["America/New_York"];
string formatted = now.InZone(zone).ToString();
await client.PublishAsync("time.us.east", formatted);
await client.PublishAsync("time.us.east.atlanta", formatted);
}
{
DateTimeZone zone = DateTimeZoneProviders.Tzdb["Europe/Warsaw"];
string formatted = now.InZone(zone).ToString();
await client.PublishAsync("time.eu.east", formatted);
await client.PublishAsync("time.eu.east.warsaw", formatted);
}NATS.start do |nc|
nc.publish("time.us.east", '...')
nc.publish("time.us.east.atlanta", '...')
nc.publish("time.eu.east", '...')
nc.publish("time.eu.east.warsaw", '...')
nc.drain
endnc.publish('time.us.east');
nc.publish('time.us.central');
nc.publish('time.us.mountain');
nc.publish('time.us.west');nc.subscribe("time.us.*", (_err, msg) => {
// converting timezones correctly in node requires a library
// this doesn't take into account *many* things.
let time;
switch (msg.subject) {
case "time.us.east":
time = new Date().toLocaleTimeString("en-us", {
timeZone: "America/New_York",
});
break;
case "time.us.central":
time = new Date().toLocaleTimeString("en-us", {
timeZone: "America/Chicago",
});
break;
case "time.us.mountain":
time = new Date().toLocaleTimeString("en-us", {
timeZone: "America/Denver",
});
break;
case "time.us.west":
time = new Date().toLocaleTimeString("en-us", {
timeZone: "America/Los_Angeles",
});
break;
default:
time = "I don't know what you are talking about Willis";
}
t.log(subject, time);
});nc = NATS()
await nc.connect(servers=["nats://demo.nats.io:4222"])
# Use queue to wait for 2 messages to arrive
queue = asyncio.Queue()
async def cb(msg):
await queue.put_nowait(msg)
await nc.subscribe("time.*.east", cb=cb)
# Send 2 messages and wait for them to come in
await nc.publish("time.A.east", b'A')
await nc.publish("time.B.east", b'B')
msg_A = await queue.get()
msg_B = await queue.get()
print("Msg A:", msg_A)
print("Msg B:", msg_B)// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
var count = 0;
await foreach (var msg in client.SubscribeAsync<string>("time.*.east"))
{
Console.WriteLine($"Received {++count}: {msg.Subject}: {msg.Data}");
if (count == 2)
{
break;
}
}
Console.WriteLine("Done");
// Output:
// Received 1: time.us.east: 2024-10-21T22:11:24 America/New_York (-04)
// Received 2: time.eu.east: 2024-10-22T04:11:24 Europe/Warsaw (+02)
// Donerequire 'nats/client'
require 'fiber'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
Fiber.new do
f = Fiber.current
nc.subscribe("time.*.east") do |msg, reply|
f.resume Time.now
end
nc.publish("time.A.east", "A")
nc.publish("time.B.east", "B")
# Use the response
msg_A = Fiber.yield
puts "Msg A: #{msg_A}"
msg_B = Fiber.yield
puts "Msg B: #{msg_B}"
end.resume
endtry (Connection nc = Nats.connect("localhost")) {
JetStreamManagement jsm = nc.jetStreamManagement();
// Create a stream
StreamInfo si = jsm.addStream(StreamConfiguration.builder()
.name("example-stream")
.subjects("example-subject")
.maxBytes(1024)
.build());
StreamConfiguration config = si.getConfiguration();
System.out.println("stream name: " + config.getName() + ", max_bytes: " + config.getMaxBytes());
// Update a stream
si = jsm.updateStream(StreamConfiguration.builder()
.name("example-stream")
.maxBytes(2048)
.build());
config = si.getConfiguration();
System.out.println("stream name: " + config.getName() + ", max_bytes: " + config.getMaxBytes());
// Create a durable consumer
jsm.createConsumer("example-stream", ConsumerConfiguration.builder()
.durable("example-consumer-name")
.build());
// Get information about all streams
List<StreamInfo> streams = jsm.getStreams();
for (StreamInfo info : streams) {
System.out.println("stream name: " + info.getConfiguration().getName());
}
// Get information about all consumers
List<ConsumerInfo> consumers = jsm.getConsumers("example-stream");
for (ConsumerInfo ci : consumers) {
System.out.println("consumer name: " + ci.getName());
}
// Delete a consumer
jsm.deleteConsumer("example-stream", "example-consumer-name");
// Delete a stream
jsm.deleteStream("example-stream");
}import { AckPolicy, connect, Empty } from "../../src/mod.ts";
const nc = await connect();
const jsm = await nc.jetstreamManager();
// list all the streams, the `next()` function
// retrieves a paged result.
const streams = await jsm.streams.list().next();
streams.forEach((si) => {
console.log(si);
});
// add a stream
const stream = "mystream";
const subj = `mystream.*`;
await jsm.streams.add({ name: stream, subjects: [subj] });
// publish a reg nats message directly to the stream
for (let i = 0; i < 10; i++) {
nc.publish(`${subj}.a`, Empty);
}
// find a stream that stores a specific subject:
const name = await jsm.streams.find("mystream.A");
// retrieve info about the stream by its name
const si = await jsm.streams.info(name);
// update a stream configuration
si.config.subjects?.push("a.b");
await jsm.streams.update(name, si.config);
// get a particular stored message in the stream by sequence
// this is not associated with a consumer
const sm = await jsm.streams.getMessage(stream, { seq: 1 });
console.log(sm.seq);
// delete the 5th message in the stream, securely erasing it
await jsm.streams.deleteMessage(stream, 5);
// purge all messages in the stream, the stream itself
// remains.
await jsm.streams.purge(stream);
// purge all messages with a specific subject (filter can be a wildcard)
await jsm.streams.purge(stream, { filter: "a.b" });
// purge messages with a specific subject keeping some messages
await jsm.streams.purge(stream, { filter: "a.c", keep: 5 });
// purge all messages with upto (not including seq)
await jsm.streams.purge(stream, { seq: 100 });
// purge all messages with upto sequence that have a matching subject
await jsm.streams.purge(stream, { filter: "a.d", seq: 100 });
// list all consumers for a stream:
const consumers = await jsm.consumers.list(stream).next();
consumers.forEach((ci) => {
console.log(ci);
});
// add a new durable pull consumer
await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
});
// retrieve a consumer's configuration
const ci = await jsm.consumers.info(stream, "me");
console.log(ci);
// delete a particular consumer
await jsm.consumers.delete(stream, "me");import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo's subject.
await js.add_stream(name="sample-stream", subjects=["foo"])
await nc.close()
if __name__ == '__main__':
asyncio.run(main()) // dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
await using var client = new NatsClient();
INatsJSContext js = client.CreateJetStreamContext();
// Create a stream
var streamConfig = new StreamConfig(name: "example-stream", subjects: ["example-subject"])
{
MaxBytes = 1024,
};
await js.CreateStreamAsync(streamConfig);
// Update the stream
var streamConfigUpdated = streamConfig with { MaxBytes = 2048 };
await js.UpdateStreamAsync(streamConfigUpdated);
// Create a durable consumer
await js.CreateConsumerAsync("example-stream", new ConsumerConfig("example-consumer-name"));
// Get information about all streams
await foreach (var stream in js.ListStreamsAsync())
{
Console.WriteLine($"stream name: {stream.Info.Config.Name}");
}
// Get information about all consumers in a stream
await foreach (var consumer in js.ListConsumersAsync("example-stream"))
{
Console.WriteLine($"consumer name: {consumer.Info.Config.Name}");
}
// Delete a consumer
await js.DeleteConsumerAsync("example-stream", "example-consumer-name");
// Delete a stream
await js.DeleteStreamAsync("example-stream");
// Output:
// stream name: example-stream
// consumer name: example-consumer-name#include "examples.h"
static const char *usage = ""\
"-stream stream name (default is 'foo')\n" \
"-txt text to send (default is 'hello')\n" \
"-count number of messages to send\n" \
"-sync publish synchronously (default is async)\n";
static void
_jsPubErr(jsCtx *js, jsPubAckErr *pae, void *closure)
{
int *errors = (int*) closure;
printf("Error: %u - Code: %u - Text: %s\n", pae->Err, pae->ErrCode, pae->ErrText);
printf("Original message: %.*s\n", natsMsg_GetDataLength(pae->Msg), natsMsg_GetData(pae->Msg));
*errors = (*errors + 1);
// If we wanted to resend the original message, we would do something like that:
//
// js_PublishMsgAsync(js, &(pae->Msg), NULL);
//
// Note that we use `&(pae->Msg)` so that the library set it to NULL if it takes
// ownership, and the library will not destroy the message when this callback returns.
// No need to destroy anything, everything is handled by the library.
}
int main(int argc, char **argv)
{
natsConnection *conn = NULL;
natsStatistics *stats = NULL;
natsOptions *opts = NULL;
jsCtx *js = NULL;
jsOptions jsOpts;
jsErrCode jerr = 0;
natsStatus s;
int dataLen=0;
volatile int errors = 0;
bool delStream = false;
opts = parseArgs(argc, argv, usage);
dataLen = (int) strlen(payload);
s = natsConnection_Connect(&conn, opts);
if (s == NATS_OK)
s = jsOptions_Init(&jsOpts);
if (s == NATS_OK)
{
if (async)
{
jsOpts.PublishAsync.ErrHandler = _jsPubErr;
jsOpts.PublishAsync.ErrHandlerClosure = (void*) &errors;
}
s = natsConnection_JetStream(&js, conn, &jsOpts);
}
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
// First check if the stream already exists.
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_NOT_FOUND)
{
jsStreamConfig cfg;
// Since we are the one creating this stream, we can delete at the end.
delStream = true;
// Initialize the configuration structure.
jsStreamConfig_Init(&cfg);
cfg.Name = stream;
// Set the subject
cfg.Subjects = (const char*[1]){subj};
cfg.SubjectsLen = 1;
// Make it a memory stream.
cfg.Storage = js_MemoryStorage;
// Add the stream,
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
}
if (s == NATS_OK)
{
printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
// Need to destroy the returned stream object.
jsStreamInfo_Destroy(si);
}
}
if (s == NATS_OK)
s = natsStatistics_Create(&stats);
if (s == NATS_OK)
{
printf("\nSending %" PRId64 " messages to subject '%s'\n", total, stream);
start = nats_Now();
}
for (count = 0; (s == NATS_OK) && (count < total); count++)
{
if (async)
s = js_PublishAsync(js, subj, (const void*) payload, dataLen, NULL);
else
{
jsPubAck *pa = NULL;
s = js_Publish(&pa, js, subj, (const void*) payload, dataLen, NULL, &jerr);
if (s == NATS_OK)
{
if (pa->Duplicate)
printf("Got a duplicate message! Sequence=%" PRIu64 "\n", pa->Sequence);
jsPubAck_Destroy(pa);
}
}
}
if ((s == NATS_OK) && async)
{
jsPubOptions jsPubOpts;
jsPubOptions_Init(&jsPubOpts);
// Let's set it to 30 seconds, if getting "Timeout" errors,
// this may need to be increased based on the number of messages
// being sent.
jsPubOpts.MaxWait = 30000;
s = js_PublishAsyncComplete(js, &jsPubOpts);
if (s == NATS_TIMEOUT)
{
// Let's get the list of pending messages. We could resend,
// etc, but for now, just destroy them.
natsMsgList list;
js_PublishAsyncGetPendingList(&list, js);
natsMsgList_Destroy(&list);
}
}
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
elapsed = nats_Now() - start;
printStats(STATS_OUT, conn, NULL, stats);
printPerf("Sent");
if (errors != 0)
printf("There were %d asynchronous errors\n", errors);
// Let's report some stats after the run
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_OK)
{
printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
jsStreamInfo_Destroy(si);
}
}
if (delStream && (js != NULL))
{
printf("\nDeleting stream %s: ", stream);
s = js_DeleteStream(js, stream, NULL, &jerr);
if (s == NATS_OK)
printf("OK!");
printf("\n");
}
if (s != NATS_OK)
{
printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
nats_PrintLastErrorStack(stderr);
}
// Destroy all our objects to avoid report of memory leak
jsCtx_Destroy(js);
natsStatistics_Destroy(stats);
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);
// To silence reports of memory still in used with valgrind
nats_Close();
return 0;
}// KeyValue will lookup and bind to an existing KeyValue store.
KeyValue(bucket string) (KeyValue, error)
// CreateKeyValue will create a KeyValue store with the following configuration.
CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error)
// DeleteKeyValue will delete this KeyValue store (JetStream stream).
DeleteKeyValue(bucket string) error/**
* Create a key value store.
* @param config the key value configuration
* @return bucket info
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
KeyValueStatus create(KeyValueConfiguration config) throws IOException, JetStreamApiException;
/**
* Get the list of bucket names.
* @return list of bucket names
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws InterruptedException if the thread is interrupted
*/
List<String> getBucketNames() throws IOException, JetStreamApiException, InterruptedException;
/**
* Gets the info for an existing bucket.
* @param bucketName the bucket name to use
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @return the bucket status object
*/
KeyValueStatus getBucketInfo(String bucketName) throws IOException, JetStreamApiException;
/**
* Deletes an existing bucket. Will throw a JetStreamApiException if the delete fails.
* @param bucketName the stream name to use.
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
void delete(String bucketName) throws IOException, JetStreamApiException;// Get returns the latest value for the key.
Get(key string) (entry KeyValueEntry, err error)
// GetRevision returns a specific revision value for the key.
GetRevision(key string, revision uint64) (entry KeyValueEntry, err error)/**
* Get the entry for a key
* @param key the key
* @return the KvEntry object or null if not found.
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
KeyValueEntry get(String key) throws IOException, JetStreamApiException;
/**
* Get the specific revision of an entry for a key.
* @param key the key
* @param revision the revision
* @return the KvEntry object or null if not found.
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
KeyValueEntry get(String key, long revision) throws IOException, JetStreamApiException; static async create(
js: JetStreamClient,
name: string,
opts: Partial<KvOptions> = {},
): Promise<KV>
static async bind(
js: JetStreamClient,
name: string,
opts: Partial<{ codec: KvCodecs }> = {},
): Promise<KV>
destroy(): Promise<boolean># from the JetStreamContext
async def key_value(self, bucket: str) -> KeyValue:
async def create_key_value(
self,
config: Optional[api.KeyValueConfig] = None,
**params,
) -> KeyValue:
"""
create_key_value takes an api.KeyValueConfig and creates a KV in JetStream.
"""
async def delete_key_value(self, bucket: str) -> bool:
"""
delete_key_value deletes a JetStream KeyValue store by destroying
the associated stream.
""" // dotnet add package NATS.Net
// Create a new Key Value Store or get an existing one
ValueTask<INatsKVStore> CreateStoreAsync(string bucket, CancellationToken cancellationToken = default);
// Get a list of bucket names
IAsyncEnumerable<string> GetBucketNamesAsync(CancellationToken cancellationToken = default);
// Gets the status for all buckets
IAsyncEnumerable<NatsKVStatus> GetStatusesAsync(CancellationToken cancellationToken = default);
// Delete a Key Value Store
ValueTask<bool> DeleteStoreAsync(string bucket, CancellationToken cancellationToken = default);
//NATS_EXTERN natsStatus kvConfig_Init (kvConfig *cfg)
Initializes a KeyValue configuration structure.
NATS_EXTERN natsStatus js_CreateKeyValue (kvStore **new_kv, jsCtx *js, kvConfig *cfg)
Creates a KeyValue store with a given configuration.
NATS_EXTERN natsStatus js_KeyValue (kvStore **new_kv, jsCtx *js, const char *bucket)
Looks-up and binds to an existing KeyValue store.
NATS_EXTERN natsStatus js_DeleteKeyValue (jsCtx *js, const char *bucket)
Deletes a KeyValue store.
NATS_EXTERN void kvStore_Destroy (kvStore *kv)
Destroys a KeyValue store object.async get(k: string): Promise<KvEntry | null>async def get(self, key: str) -> Entry:
"""
get returns the latest value for the key.
"""// dotnet add package NATS.Net
// Get an entry from the bucket using the key
ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);
//NATS_EXTERN natsStatus kvStore_Get (kvEntry **new_entry, kvStore *kv, const char *key)
Returns the latest entry for the key.
NATS_EXTERN natsStatus kvStore_GetRevision (kvEntry **new_entry, kvStore *kv, const char *key, uint64_t revision)
Returns the entry at the specific revision for the key.Put(key string, value []byte) (revision uint64, err error)
// PutString will place the string for the key into the store.
PutString(key string, value string) (revision uint64, err error)
// Create will add the key/value pair if it does not exist.
Create(key string, value []byte) (revision uint64, err error)
// Update will update the value if the latest revision matches.
Update(key string, value []byte, last uint64) (revision uint64, err error)/**
* Put a byte[] as the value for a key
* @param key the key
* @param value the bytes of the value
* @return the revision number for the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
long put(String key, byte[] value) throws IOException, JetStreamApiException;
/**
* Put a string as the value for a key
* @param key the key
* @param value the UTF-8 string
* @return the revision number for the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
long put(String key, String value) throws IOException, JetStreamApiException;
/**
* Put a long as the value for a key
* @param key the key
* @param value the number
* @return the revision number for the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
long put(String key, Number value) throws IOException, JetStreamApiException;
/**
* Put as the value for a key iff the key does not exist (there is no history)
* or is deleted (history shows the key is deleted)
* @param key the key
* @param value the bytes of the value
* @return the revision number for the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
long create(String key, byte[] value) throws IOException, JetStreamApiException;
/**
* Put as the value for a key iff the key exists and its last revision matches the expected
* @param key the key
* @param value the bytes of the value
* @param expectedRevision the expected last revision
* @return the revision number for the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException; async put(
k: string,
data: Uint8Array,
opts: Partial<KvPutOptions> = {},
): Promise<number>
create(k: string, data: Uint8Array): Promise<number>
update(k: string, data: Uint8Array, version: number): Promise<number>async def put(self, key: str, value: bytes) -> int:
"""
put will place the new value for the key into the store
and return the revision number.
"""
async def update(self, key: str, value: bytes, last: int) -> int:
"""
update will update the value iff the latest revision matches.
""" // dotnet add package NATS.Net
// Put a value into the bucket using the key
// returns revision number
ValueTask<ulong> PutAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);
//NATS_EXTERN natsStatus kvStore_Put (uint64_t *rev, kvStore *kv, const char *key, const void *data, int len)
Places the new value for the key into the store.
NATS_EXTERN natsStatus kvStore_PutString (uint64_t *rev, kvStore *kv, const char *key, const char *data)
Places the new value (as a string) for the key into the store.
NATS_EXTERN natsStatus kvStore_Create (uint64_t *rev, kvStore *kv, const char *key, const void *data, int len)
Places the value for the key into the store if and only if the key does not exist.
NATS_EXTERN natsStatus kvStore_CreateString (uint64_t *rev, kvStore *kv, const char *key, const char *data)
Places the value (as a string) for the key into the store if and only if the key does not exist.
NATS_EXTERN natsStatus kvStore_Update (uint64_t *rev, kvStore *kv, const char *key, const void *data, int len, uint64_t last)
Updates the value for the key into the store if and only if the latest revision matches.
NATS_EXTERN natsStatus kvStore_UpdateString (uint64_t *rev, kvStore *kv, const char *key, const char *data, uint64_t last)
Updates the value (as a string) for the key into the store if and only if the latest revision matches.// Delete will place a delete marker and leave all revisions.
Delete(key string) error
// Purge will place a delete marker and remove all previous revisions.
Purge(key string) error/**
* Soft deletes the key by placing a delete marker.
* @param key the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
void delete(String key) throws IOException, JetStreamApiException;
/**
* Purge all values/history from the specific key
* @param key the key
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
void purge(String key) throws IOException, JetStreamApiException;delete(k: string): Promise<void>
purge(k: string): Promise<void>async def delete(self, key: str) -> bool:
"""
delete will place a delete marker and remove all previous revisions.
"""
async def purge(self, key: str) -> bool:
"""
purge will remove the key and all revisions.
""" // dotnet add package NATS.Net
// Delete an entry from the bucket
ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default);
// Purge an entry from the bucket
ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default);
//NATS_EXTERN natsStatus kvStore_Delete (kvStore *kv, const char *key)
Deletes a key by placing a delete marker and leaving all revisions.
NATS_EXTERN natsStatus kvStore_Purge (kvStore *kv, const char *key, kvPurgeOptions *opts)
Deletes a key by placing a purge marker and removing all revisions.
NATS_EXTERN natsStatus kvStore_PurgeDeletes (kvStore *kv, kvPurgeOptions *opts)
Purge and removes delete markers.// Keys will return all keys.
Keys(opts ...WatchOpt) ([]string, error)/**
* Get a list of the keys in a bucket.
* @return List of keys
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws InterruptedException if the thread is interrupted
*/
List<String> keys() throws IOException, JetStreamApiException, InterruptedException;async keys(k = ">"): Promise<QueuedIterator<string>>// dotnet add package NATS.Net
// Get all the keys in the bucket
IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
// Get a filtered set of keys in the bucket
IAsyncEnumerable<string> GetKeysAsync(IEnumerable<string> filters, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
//NATS_EXTERN natsStatus kvStore_Keys (kvKeysList *list, kvStore *kv, kvWatchOptions *opts)
Returns all keys in the bucket.
NATS_EXTERN void kvKeysList_Destroy (kvKeysList *list)
Destroys this list of KeyValue store key strings.// History will return all historical values for the key.
History(key string, opts ...WatchOpt) ([]KeyValueEntry, error)/**
* Get the history (list of KeyValueEntry) for a key
* @param key the key
* @return List of KvEntry
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws InterruptedException if the thread is interrupted
*/
List<KeyValueEntry> history(String key) throws IOException, JetStreamApiException, InterruptedException;async history(
opts: { key?: string; headers_only?: boolean } = {},
): Promise<QueuedIterator<KvEntry>>// dotnet add package NATS.Net
// Get the history of an entry by key
IAsyncEnumerable<NatsKVEntry<T>> HistoryAsync<T>(string key, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
//NATS_EXTERN natsStatus kvStore_History (kvEntryList *list, kvStore *kv, const char *key, kvWatchOptions *opts)
Returns all historical entries for the key.
NATS_EXTERN void kvEntryList_Destroy (kvEntryList *list)
Destroys this list of KeyValue store entries.// Watch for any updates to keys that match the keys argument which could include wildcards.
// Watch will send a nil entry when it has received all initial values.
Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
// WatchAll will invoke the callback for all updates.
WatchAll(opts ...WatchOpt) (KeyWatcher, error)/**
* Watch updates for a specific key
*/
NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException;
/**
* Watch updates for all keys
*/
NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException; async watch(
opts: {
key?: string;
headers_only?: boolean;
initializedFn?: callbackFn;
} = {},
): Promise<QueuedIterator<KvEntry>>// dotnet add package NATS.Net
// Start a watcher for specific keys
// Key to watch is subject-based and wildcards may be used
IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(string key, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
// Start a watcher for specific keys
// Key to watch are subject-based and wildcards may be used
IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(IEnumerable<string> keys, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
// Start a watcher for all the keys in the bucket
IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
//NATS_EXTERN natsStatus kvStore_Watch (kvWatcher **new_watcher, kvStore *kv, const char *keys, kvWatchOptions *opts)
Returns a watcher for any updates to keys that match the keys argument.
NATS_EXTERN natsStatus kvStore_WatchAll (kvWatcher **new_watcher, kvStore *kv, kvWatchOptions *opts)
Returns a watcher for any updates to any keys of the KeyValue store bucket.wg := sync.WaitGroup{}
wg.Add(1)
errCh := make(chan error, 1)
// To simulate a timeout, you would set the DrainTimeout()
// to a value less than the time spent in the message callback,
// so say: nats.DrainTimeout(10*time.Millisecond).
nc, err := nats.Connect("demo.nats.io",
nats.DrainTimeout(10*time.Second),
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription
errCh <- err
}),
nats.ClosedHandler(func(_ *nats.Conn) {
wg.Done()
}))
if err != nil {
log.Fatal(err)
}
// Just to not collide using the demo server with other users.
subject := nats.NewInbox()
// Subscribe, but add some delay while processing.
if _, err := nc.Subscribe(subject, func(_ *nats
time.Sleep(200 * time.Millisecond)
}); err != nil {
log.Fatal(err)
}
// Publish a message
if err := nc.Publish(subject, []byte("hello")); err
log.Fatal(err)
}
// Drain the connection, which will close it when done.
if err := nc.Drain(); err != nil {
log.Fatal(err)
}
// Wait for the connection to be closed.
wg.Wait()
// Check if there was an error
select {
case e := <-errCh:
log.Fatal(e)
default:
}nats contextConnection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for a message to arrive
CountDownLatch latch = new CountDownLatch(1);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
latch.countDown();
});
// Subscribe
d.subscribe("updates");
// Wait for a message to come in
latch.await();
// Drain the connection, which will close it
CompletableFuture<Boolean> drained = nc.drain(Duration.ofSeconds(10));
// Wait for the drain to complete
drained.get();const nc = await connect({ servers: "demo.nats.io" });
const sub = nc.subscribe(createInbox(), () => {});
nc.publish(sub.getSubject());
await nc.drain();import asyncio
from nats.aio.client import Client as NATS
async def example(loop):
nc = NATS()
await nc.connect("nats://127.0.0.1:4222", loop=loop)
async def handler(msg):
print("[Received] ", msg)
await nc.publish(msg.reply, b'I can help')
# Can check whether client is in draining state
if nc.is_draining:
print("Connection is draining")
await nc.subscribe("help", "workers", cb=handler)
await nc.flush()
requests = []
for i in range(0, 10):
request = nc.request("help", b'help!', timeout=1)
requests.append(request)
# Wait for all the responses
responses = []
responses = await asyncio.gather(*requests)
# Gracefully close the connection.
await nc.drain()
print("Received {} responses".format(len(responses)))// dotnet add package NATS.Net
using NATS.Net;
var client = new NatsClient();
var subject = client.Connection.NewInbox();
// Make sure to use a cancellation token to end all subscriptions
using var cts = new CancellationTokenSource();
var sync = false;
var process = Task.Run(async () =>
{
await foreach (var msg in client.SubscribeAsync<int>(subject, cancellationToken: cts.Token))
{
if (msg.Data == -1)
{
sync = true;
continue;
}
Console.WriteLine($"Received: {msg.Data}");
await Task.Delay(TimeSpan.FromMilliseconds(300));
}
Console.WriteLine("Subscription completed");
});
// Make sure the subscription is ready
while (sync == false)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
await client.PublishAsync(subject, -1);
}
for (var i = 0; i < 5; i++)
{
await client.PublishAsync(subject, i);
}
Console.WriteLine("Published 5 messages");
// Cancelling the subscription will unsubscribe from the subject
// and messages that are already in the buffer will be processed
await cts.CancelAsync();
Console.WriteLine("Cancelled subscription");
// Ping the server to make sure all in-flight messages are processed
// as a side effect of the ping, the server will respond with a pong
// making sure the connection all previous messages are sent on the wire.
await client.PingAsync();
// Disposing the NATS client will close the connection
await client.DisposeAsync();
Console.WriteLine("Disposed NATS client");
Console.WriteLine("Waiting for all messages to be processed");
await process;
Console.WriteLine("Done");NATS.start(drain_timeout: 1) do |nc|
NATS.subscribe('foo', queue: "workers") do |msg, reply, sub|
nc.publish(reply, "ACK:#{msg}")
end
NATS.subscribe('bar', queue: "workers") do |msg, reply, sub|
nc.publish(reply, "ACK:#{msg}")
end
NATS.subscribe('quux', queue: "workers") do |msg, reply, sub|
nc.publish(reply, "ACK:#{msg}")
end
EM.add_timer(2) do
next if NATS.draining?
# Drain gracefully closes the connection.
NATS.drain do
puts "Done draining. Connection is closed."
end
end
endstatic void
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
{
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
// Add some delay while processing
nats_Sleep(200);
// Need to destroy the message!
natsMsg_Destroy(msg);
}
static void
closeHandler(natsConnection *conn, void *closure)
{
cond_variable cv = (cond_variable) closure;
notify_cond_variable(cv);
}
(...)
natsConnection *conn = NULL;
natsOptions *opts = NULL;
natsSubscription *sub = NULL;
natsStatus s = NATS_OK;
cond_variable cv = new_cond_variable(); // some fictuous way to notify between threads.
s = natsOptions_Create(&opts);
if (s == NATS_OK)
// Setup a close handler and pass a reference to our condition variable.
s = natsOptions_SetClosedCB(opts, closeHandler, (void*) cv);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
// Subscribe
if (s == NATS_OK)
s = natsConnection_Subscribe(&sub, conn, "foo", onMsg, NULL);
// Publish a message
if (s == NATS_OK)
s = natsConnection_PublishString(conn, "foo", "hello");
// Drain the connection, which will close it when done.
if (s == NATS_OK)
s = natsConnection_Drain(conn);
// Wait for the connection to be closed
if (s == NATS_OK)
cond_variable_wait(cv);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
natsOptions_Destroy(opts); nc, err := nats.Connect("demo.nats.io")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
done := sync.WaitGroup{}
done.Add(1)
count := 0
errCh := make(chan error, 1)
msgAfterDrain := "not this one"
// Just to not collide using the demo server with other users.
subject := nats.NewInbox()
// This callback will process each message slowly
sub, err := nc.Subscribe(subject, func(m *nats.Msg) {
if string(m.Data) == msgAfterDrain {
errCh <- fmt.Errorf("Should not have received this message")
return
}
time.Sleep(100 * time.Millisecond)
count++
if count == 2 {
done.Done()
}
})
// Send 2 messages
for i := 0; i < 2; i++ {
nc.Publish(subject, []byte("hello"))
}
// Call Drain on the subscription. It unsubscribes but
// wait for all pending messages to be processed.
if err := sub.Drain(); err != nil {
log.Fatal(err)
}
// Send one more message, this message should not be received
nc.Publish(subject, []byte(msgAfterDrain))
// Wait for the subscription to have processed the 2 messages.
done.Wait()
// Now check that the 3rd message was not received
select {
case e := <-errCh:
log.Fatal(e)
case <-time.After(200 * time.Millisecond):
// OK!
}Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for a message to arrive
CountDownLatch latch = new CountDownLatch(1);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);
latch.countDown();
});
// Subscribe
d.subscribe("updates");
// Wait for a message to come in
latch.await();
// Messages that have arrived will be processed
CompletableFuture<Boolean> drained = d.drain(Duration.ofSeconds(10));
// Wait for the drain to complete
drained.get();
// Close the connection
nc.close();const sub = nc.subscribe(subj, { callback: (_err, _msg) => {} });
nc.publish(subj);
nc.publish(subj);
nc.publish(subj);
await sub.drain();import asyncio
from nats.aio.client import Client as NATS
async def example(loop):
nc = NATS()
await nc.connect("nats://127.0.0.1:4222", loop=loop)
async def handler(msg):
print("[Received] ", msg)
await nc.publish(msg.reply, b'I can help')
# Can check whether client is in draining state
if nc.is_draining:
print("Connection is draining")
sid = await nc.subscribe("help", "workers", cb=handler)
await nc.flush()
# Gracefully unsubscribe the subscription
await nc.drain(sid)// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
var subject = client.Connection.NewInbox();
// Make sure to use a cancellation token to end the subscription
using var cts = new CancellationTokenSource();
var sync = false;
var process = Task.Run(async () =>
{
await foreach (var msg in client.SubscribeAsync<int>(subject, cancellationToken: cts.Token))
{
if (msg.Data == -1)
{
sync = true;
continue;
}
Console.WriteLine($"Received: {msg.Data}");
await Task.Delay(TimeSpan.FromMilliseconds(300));
}
Console.WriteLine("Subscription completed");
});
// Make sure the subscription is ready
while (sync == false)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
await client.PublishAsync(subject, -1);
}
for (var i = 0; i < 5; i++)
{
await client.PublishAsync(subject, i);
}
Console.WriteLine("Published 5 messages");
// Cancelling the subscription will unsubscribe from the subject
// and messages that are already in the buffer will be processed
await cts.CancelAsync();
Console.WriteLine("Cancelled subscription");
Console.WriteLine("Waiting for subscription to complete");
await process;
Console.WriteLine("Done");# There is currently no API to drain a single subscription, the whole connection can be drained though via NATS.drainnatsConnection *conn = NULL;
natsSubscription *sub = NULL;
natsStatus s = NATS_OK;
s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
// Subscribe
if (s == NATS_OK)
s = natsConnection_Subscribe(&sub, conn, "foo", onMsg, NULL);
// Publish 2 messages
if (s == NATS_OK)
{
int i;
for (i=0; (s == NATS_OK) && (i<2); i++)
{
s = natsConnection_PublishString(conn, "foo", "hello");
}
}
// Call Drain on the subscription. It unsubscribes but
// wait for all pending messages to be processed.
if (s == NATS_OK)
s = natsSubscription_Drain(sub);
(...)
// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);nsc add operator MyOperator[ OK ] generated and stored operator key "ODSWWTKZLRDFBPXNMNAY7XB2BIJ45SV756BHUT7GX6JQH6W7AHVAFX6C"
[ OK ] added operator "MyOperator"
[ OK ] When running your own nats-server, make sure they run at least version 2.2.0nsc edit operator --service-url nats://localhost:4222[ OK ] added service url "nats://localhost:4222"
[ OK ] edited operator "MyOperator"nsc add account MyAccount[ OK ] generated and stored account key "AD2M34WBNGQFYK37IDX53DPRG74RLLT7FFWBOBMBUXMAVBCVAU5VKWIY"
[ OK ] added account "MyAccount"nsc add user MyUser[ OK ] generated and stored user key "UAWBXLSZVZHNDIURY52F6WETFCFZLXYUEFJAHRXDW7D2K4445IY4BVXP"
[ OK ] generated user creds file `~/.nkeys/creds/MyOperator/MyAccount/MyUser.creds`
[ OK ] added user "MyUser" to account "MyAccount"tree ~/.nsc/nats/Users/myusername/.nsc/nats
└── MyOperator
├── MyOperator.jwt
└── accounts
└── MyAccount
├── MyAccount.jwt
└── users
└── MyUser.jwttree ~/.nkeys/Users/myusername/.nkeys
├── creds
│ └── MyOperator
│ └── MyAccount
│ └── MyUser.creds
└── keys
├── A
│ └── DE
│ └── ADETPT36WBIBUKM3IBCVM4A5YUSDXFEJPW4M6GGVBYCBW7RRNFTV5NGE.nk
├── O
│ └── AF
│ └── OAFEEYZSYYVI4FXLRXJTMM32PQEI3RGOWZJT7Y3YFM4HB7ACPE4RTJPG.nk
└── U
└── DB
└── UDBD5FNQPSLIO6CDMIS5D4EBNFKYWVDNULQTFTUZJXWFNYLGFF52VZN7.nkcat ~/.nkeys/keys/U/DB/UDBD5FNQPSLIO6CDMIS5D4EBNFKYWVDNULQTFTUZJXWFNYLGFF52VZN7.nk SUAG35IAY2EF5DOZRV6MUSOFDGJ6O2BQCZHSRPLIK6J3GVCX366BFAYSNAcat ~/.nkeys/creds/MyOperator/MyAccount/MyUser.creds-----BEGIN NATS USER JWT-----
eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiI0NUc3MkhIQUVCRFBQV05ZWktMTUhQNUFYWFRSSUVDQlNVQUI2VDZRUjdVM1JZUFZaM05BIiwiaWF0IjoxNjM1Mzc1NTYxLCJpc3MiOiJBRDJNMzRXQk5HUUZZSzM3SURYNTNEUFJHNzRSTExUN0ZGV0JPQk1CVVhNQVZCQ1ZBVTVWS1dJWSIsIm5hbWUiOiJNeVVzZXIiLCJzdWIiOiJVQVdCWExTWlZaSE5ESVVSWTUyRjZXRVRGQ0ZaTFhZVUVGSkFIUlhEVzdEMks0NDQ1SVk0QlZYUCIsIm5hdHMiOnsicHViIjp7fSwic3ViIjp7fSwic3VicyI6LTEsImRhdGEiOi0xLCJwYXlsb2FkIjotMSwidHlwZSI6InVzZXIiLCJ2ZXJzaW9uIjoyfX0.CGymhGYHfdZyhUeucxNs9TthSjy_27LVZikqxvm-pPLili8KNe1xyOVnk_w-xPWdrCx_t3Se2lgXmoy3wBcVCw
------END NATS USER JWT------
************************* IMPORTANT *************************
NKEY Seed printed below can be used to sign and prove identity.
NKEYs are sensitive and should be treated as secrets.
-----BEGIN USER NKEY SEED-----
SUAP2AY6UAWHOXJBWDNRNKJ2DHNC5VA2DFJZTF6C6PMLKUCOS2H2E2BA2E
------END USER NKEY SEED------
*************************************************************nsc list keys+----------------------------------------------------------------------------------------------+
| Keys |
+------------+----------------------------------------------------------+-------------+--------+
| Entity | Key | Signing Key | Stored |
+------------+----------------------------------------------------------+-------------+--------+
| MyOperator | ODSWWTKZLRDFBPXNMNAY7XB2BIJ45SV756BHUT7GX6JQH6W7AHVAFX6C | | * |
| MyAccount | AD2M34WBNGQFYK37IDX53DPRG74RLLT7FFWBOBMBUXMAVBCVAU5VKWIY | | * |
| MyUser | UAWBXLSZVZHNDIURY52F6WETFCFZLXYUEFJAHRXDW7D2K4445IY4BVXP | | * |
+------------+----------------------------------------------------------+-------------+--------+nsc list keys --show-seeds+---------------------------------------------------------------------------------------+
| Seeds Keys |
+------------+------------------------------------------------------------+-------------+
| Entity | Private Key | Signing Key |
+------------+------------------------------------------------------------+-------------+
| MyOperator | SOAJ3JDZBE6JKJO277CQP5RIAA7I7HBI44RDCMTIV3TQRYQX35OTXSMHAE | |
| MyAccount | SAAACXWSQIKJ4L2SEAUZJR3BCNSRCN32V5UJSABCSEP35Q7LQRPV6F4JPI | |
| MyUser | SUAP2AY6UAWHOXJBWDNRNKJ2DHNC5VA2DFJZTF6C6PMLKUCOS2H2E2BA2E | |
+------------+------------------------------------------------------------+-------------+
[ ! ] seed is not stored
[ERR] error reading seednsc describe operator+----------------------------------------------------------------------------------+
| Operator Details |
+-----------------------+----------------------------------------------------------+
| Name | MyOperator |
| Operator ID | ODSWWTKZLRDFBPXNMNAY7XB2BIJ45SV756BHUT7GX6JQH6W7AHVAFX6C |
| Issuer ID | ODSWWTKZLRDFBPXNMNAY7XB2BIJ45SV756BHUT7GX6JQH6W7AHVAFX6C |
| Issued | 2021-10-27 22:58:28 UTC |
| Expires | |
| Operator Service URLs | nats://localhost:4222 |
| Require Signing Keys | false |
+-----------------------+----------------------------------------------------------+{
"typ": "jwt",
"alg": "ed25519"
}
{
"jti": "ZP2X3T2R57SLXD2U5J3OLLYIVW2LFBMTXRPMMGISQ5OF7LANUQPQ",
"iat": 1575468772,
"iss": "OAFEEYZSYYVI4FXLRXJTMM32PQEI3RGOWZJT7Y3YFM4HB7ACPE4RTJPG",
"name": "O",
"sub": "OAFEEYZSYYVI4FXLRXJTMM32PQEI3RGOWZJT7Y3YFM4HB7ACPE4RTJPG",
"type": "operator",
"nats": {
"operator_service_urls": [
"nats://localhost:4222"
]
}
}nsc describe account+--------------------------------------------------------------------------------------+
| Account Details |
+---------------------------+----------------------------------------------------------+
| Name | MyAccount |
| Account ID | AD2M34WBNGQFYK37IDX53DPRG74RLLT7FFWBOBMBUXMAVBCVAU5VKWIY |
| Issuer ID | ODSWWTKZLRDFBPXNMNAY7XB2BIJ45SV756BHUT7GX6JQH6W7AHVAFX6C |
| Issued | 2021-10-27 22:59:01 UTC |
| Expires | |
+---------------------------+----------------------------------------------------------+
| Max Connections | Unlimited |
| Max Leaf Node Connections | Unlimited |
| Max Data | Unlimited |
| Max Exports | Unlimited |
| Max Imports | Unlimited |
| Max Msg Payload | Unlimited |
| Max Subscriptions | Unlimited |
| Exports Allows Wildcards | True |
| Response Permissions | Not Set |
+---------------------------+----------------------------------------------------------+
| Jetstream | Disabled |
+---------------------------+----------------------------------------------------------+
| Imports | None |
| Exports | None |
+---------------------------+----------------------------------------------------------+nsc describe user+---------------------------------------------------------------------------------+
| User |
+----------------------+----------------------------------------------------------+
| Name | MyUser |
| User ID | UAWBXLSZVZHNDIURY52F6WETFCFZLXYUEFJAHRXDW7D2K4445IY4BVXP |
| Issuer ID | AD2M34WBNGQFYK37IDX53DPRG74RLLT7FFWBOBMBUXMAVBCVAU5VKWIY |
| Issued | 2021-10-27 22:59:21 UTC |
| Expires | |
| Bearer Token | No |
| Response Permissions | Not Set |
+----------------------+----------------------------------------------------------+
| Max Msg Payload | Unlimited |
| Max Data | Unlimited |
| Max Subs | Unlimited |
| Network Src | Any |
| Time | Any |
+----------------------+----------------------------------------------------------+go get github.com/nats-io/nats-servernsc generate config --nats-resolver > resolver.confserver_name: servertest
listen: 127.0.0.1:4222
http: 8222
jetstream: enabled
include resolver.confnsc add account -n SYS`
nsc edit operator --system-account SYSnsc push -a MyAccount -u nats://localhostnats sub --creds ~/.nkeys/creds/MyOperator/MyAccount/MyUser.creds ">"nats pub --creds ~/.nkeys/creds/MyOperator/MyAccount/MyUser.creds hello NATS Received on [hello]: ’NATS’nats context add myuser --creds ~/.nkeys/creds/MyOperator/MyAccount/MyUser.credsnsc sub --user MyUser ">"
...
nsc pub --user MyUser hello NATS
...nsc add user s --allow-pub "_INBOX.>" --allow-sub q[ OK ] added pub pub "_INBOX.>"
[ OK ] added sub "q"
[ OK ] generated and stored user key "UDYQFIF75SQU2NU3TG4JXJ7C5LFCWAPXX5SSRB276YQOOFXHFIGHXMEL"
[ OK ] generated user creds file `~/.nkeys/creds/MyOperator/MyAccount/s.creds`
[ OK ] added user "s" to account "MyAccount"nsc describe user s+---------------------------------------------------------------------------------+
| User |
+----------------------+----------------------------------------------------------+
| Name | s |
| User ID | UDYQFIF75SQU2NU3TG4JXJ7C5LFCWAPXX5SSRB276YQOOFXHFIGHXMEL |
| Issuer ID | AD2M34WBNGQFYK37IDX53DPRG74RLLT7FFWBOBMBUXMAVBCVAU5VKWIY |
| Issued | 2021-10-27 23:23:16 UTC |
| Expires | |
| Bearer Token | No |
+----------------------+----------------------------------------------------------+
| Pub Allow | _INBOX.> |
| Sub Allow | q |
| Response Permissions | Not Set |
+----------------------+----------------------------------------------------------+
| Max Msg Payload | Unlimited |
| Max Data | Unlimited |
| Max Subs | Unlimited |
| Network Src | Any |
| Time | Any |
+----------------------+----------------------------------------------------------+nsc add user c --allow-pub q --allow-sub "_INBOX.>"[ OK ] added pub pub "q"
[ OK ] added sub "_INBOX.>"
[ OK ] generated and stored user key "UDIRTIVVHCW2FLLDHTS27ENXLVNP4EO4Z5MR7FZUNXFXWREPGQJ4BRRE"
[ OK ] generated user creds file `~/.nkeys/creds/MyOperator/MyAccount/c.creds`
[ OK ] added user "c" to account "MyAccount"nsc describe user c+---------------------------------------------------------------------------------+
| User |
+----------------------+----------------------------------------------------------+
| Name | c |
| User ID | UDIRTIVVHCW2FLLDHTS27ENXLVNP4EO4Z5MR7FZUNXFXWREPGQJ4BRRE |
| Issuer ID | AD2M34WBNGQFYK37IDX53DPRG74RLLT7FFWBOBMBUXMAVBCVAU5VKWIY |
| Issued | 2021-10-27 23:26:09 UTC |
| Expires | |
| Bearer Token | No |
+----------------------+----------------------------------------------------------+
| Pub Allow | q |
| Sub Allow | _INBOX.> |
| Response Permissions | Not Set |
+----------------------+----------------------------------------------------------+
| Max Msg Payload | Unlimited |
| Max Data | Unlimited |
| Max Subs | Unlimited |
| Network Src | Any |
| Time | Any |
+----------------------+----------------------------------------------------------+nsc env+------------------------------------------------------------------------------------------------------+
| NSC Environment |
+--------------------+-----+---------------------------------------------------------------------------+
| Setting | Set | Effective Value |
+--------------------+-----+---------------------------------------------------------------------------+
| $NSC_CWD_ONLY | No | If set, default operator/account from cwd only |
| $NSC_NO_GIT_IGNORE | No | If set, no .gitignore files written |
| $NKEYS_PATH | No | ~/.nkeys |
| $NSC_HOME | No | ~/.nsc |
| Config | | ~/.nsc/nsc.json |
| $NATS_CA | No | If set, root CAs in the referenced file will be used for nats connections |
| | | If not set, will default to the system trust store |
+--------------------+-----+---------------------------------------------------------------------------+
| From CWD | | No |
| Stores Dir | | ~/.nsc/nats |
| Default Operator | | MyOperator |
| Default Account | | MyAccount |
| Root CAs to trust | | Default: System Trust Store |
+--------------------+-----+---------------------------------------------------------------------------+inProgress()func ExampleJetStream() {
nc, err := nats.Connect("localhost")
if err != nil {
log.Fatal(err)
}
// Use the JetStream context to produce and consumer messages
// that have been persisted.
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
log.Fatal(err)
}
js.AddStream(&nats.StreamConfig{
Name: "FOO",
Subjects: []string{"foo"},
})
js.Publish("foo", []byte("Hello JS!"))
// Publish messages asynchronously.
for i := 0; i < 500; i++ {
js.PublishAsync("foo", []byte("Hello JS Async!"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
fmt.Println("Did not resolve in time")
}
// Create Pull based consumer with maximum 128 inflight.
sub, _ := js.PullSubscribe("foo", "wq", nats.PullMaxWaiting(128))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return
default:
}
// Fetch will return as soon as any message is available rather than wait until the full batch size is available, using a batch size of more than 1 allows for higher throughput when needed.
msgs, _ := sub.Fetch(10, nats.Context(ctx))
for _, msg := range msgs {
msg.Ack()
}
}
}package io.nats.examples.jetstream.simple;
import io.nats.client.*;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.examples.jetstream.ResilientPublisher;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static io.nats.examples.jetstream.NatsJsUtils.createOrReplaceStream;
/**
* This example will demonstrate simplified consume with a handler
*/
public class MessageConsumerExample {
private static final String STREAM = "consume-stream";
private static final String SUBJECT = "consume-subject";
private static final String CONSUMER_NAME = "consume-consumer";
private static final String MESSAGE_PREFIX = "consume";
private static final int STOP_COUNT = 500;
private static final int REPORT_EVERY = 100;
private static final String SERVER = "nats://localhost:4222";
public static void main(String[] args) {
Options options = Options.builder().server(SERVER).build();
try (Connection nc = Nats.connect(options)) {
JetStreamManagement jsm = nc.jetStreamManagement();
createOrReplaceStream(jsm, STREAM, SUBJECT);
//Utility for filling the stream with some messages
System.out.println("Starting publish...");
ResilientPublisher publisher = new ResilientPublisher(nc, jsm, STREAM, SUBJECT).basicDataPrefix(MESSAGE_PREFIX).jitter(10);
Thread pubThread = new Thread(publisher);
pubThread.start();
// get stream context, create consumer and get the consumer context
StreamContext streamContext;
ConsumerContext consumerContext;
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger atomicCount = new AtomicInteger();
long start = System.nanoTime();
streamContext = nc.getStreamContext(STREAM);
streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
consumerContext = streamContext.getConsumerContext(CONSUMER_NAME);
MessageHandler handler = msg -> {
msg.ack();
int count = atomicCount.incrementAndGet();
if (count % REPORT_EVERY == 0) {
System.out.println("Handler" + ": Received " + count + " messages in " + (System.nanoTime() - start) / 1_000_000 + "ms.");
}
if (count == STOP_COUNT) {
latch.countDown();
}
};
// create the consumer and install handler
MessageConsumer consumer = consumerContext.consume(handler);
//Waiting for the handler signalling us to stop
latch.await();
// When stop is called, no more pull requests will be made, but messages already requested
// will still come across the wire to the client.
System.out.println("Stopping the consumer...");
consumer.stop();
// wait until the consumer is finished processing backlog
while (!consumer.isFinished()) {
Thread.sleep(10);
}
System.out.println("Final" + ": Received " + atomicCount.get() + " messages in " + (System.nanoTime() - start) / 1_000_000 + "ms.");
publisher.stop(); // otherwise the ConsumerContext background thread will complain when the connection goes away
pubThread.join();
}
catch (JetStreamApiException | IOException e) {
// JetStreamApiException:
// 1. the stream or consumer did not exist
// 2. api calls under the covers theoretically this could fail, but practically it won't.
// IOException:
// likely a connection problem
System.err.println("Exception should not handled, exiting.");
System.exit(-1);
}
catch (Exception e) {
System.err.println("Exception should not handled, exiting.");
System.exit(-1);
}
}
}import { AckPolicy, connect, nanos } from "../../src/mod.ts";
import { nuid } from "../../nats-base-client/nuid.ts";
const nc = await connect();
const stream = nuid.next();
const subj = nuid.next();
const durable = nuid.next();
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: stream, subjects: [subj] });
const js = nc.jetstream();
await js.publish(subj);
await js.publish(subj);
await js.publish(subj);
await js.publish(subj);
const psub = await js.pullSubscribe(subj, {
mack: true,
// artificially low ack_wait, to show some messages
// not getting acked being redelivered
config: {
durable_name: durable,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(4000),
},
});
(async () => {
for await (const m of psub) {
console.log(
`[${m.seq}] ${
m.redelivered ? `- redelivery ${m.info.redeliveryCount}` : ""
}`
);
if (m.seq % 2 === 0) {
m.ack();
}
}
})();
const fn = () => {
console.log("[PULL]");
psub.pull({ batch: 1000, expires: 10000 });
};
// do the initial pull
fn();
// and now schedule a pull every so often
const interval = setInterval(fn, 10000); // and repeat every 2s
setTimeout(() => {
clearInterval(interval);
nc.drain();
}, 20000);import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo's subject.
await js.add_stream(name="sample-stream", subjects=["foo"])
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print(ack)
# Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("foo", "psub")
# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch(1)
for msg in msgs:
print(msg)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
await using var client = new NatsClient();
INatsJSContext js = client.CreateJetStreamContext();
// Create a stream
var streamConfig = new StreamConfig(name: "FOO", subjects: ["foo"]);
await js.CreateStreamAsync(streamConfig);
// Publish a message
{
PubAckResponse ack = await js.PublishAsync("foo", "Hello, JetStream!");
ack.EnsureSuccess();
}
// Publish messages concurrently
List<NatsJSPublishConcurrentFuture> futures = new();
for (var i = 0; i < 500; i++)
{
NatsJSPublishConcurrentFuture future
= await js.PublishConcurrentAsync("foo", "Hello, JetStream 1!");
futures.Add(future);
}
foreach (var future in futures)
{
await using (future)
{
PubAckResponse ack = await future.GetResponseAsync();
ack.EnsureSuccess();
}
}
// Create a consumer with a maximum 128 inflight messages
INatsJSConsumer consumer = await js.CreateConsumerAsync("FOO", new ConsumerConfig(name: "foo")
{
MaxWaiting = 128,
});
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));
while (cts.IsCancellationRequested == false)
{
var opts = new NatsJSFetchOpts { MaxMsgs = 10 };
await foreach (NatsJSMsg<string> msg in consumer.FetchAsync<string>(opts, cancellationToken: cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
}
}#include "examples.h"
static const char *usage = ""\
"-gd use global message delivery thread pool\n" \
"-sync receive synchronously (default is asynchronous)\n" \
"-pull use pull subscription\n" \
"-fc enable flow control\n" \
"-count number of expected messages\n";
static void
onMsg(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
if (print)
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
if (start == 0)
start = nats_Now();
// We should be using a mutex to protect those variables since
// they are used from the subscription's delivery and the main
// threads. For demo purposes, this is fine.
if (++count == total)
elapsed = nats_Now() - start;
// Since this is auto-ack callback, we don't need to ack here.
natsMsg_Destroy(msg);
}
static void
asyncCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure)
{
printf("Async error: %u - %s\n", err, natsStatus_GetText(err));
natsSubscription_GetDropped(sub, (int64_t*) &dropped);
}
int main(int argc, char **argv)
{
natsConnection *conn = NULL;
natsStatistics *stats = NULL;
natsOptions *opts = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
jsCtx *js = NULL;
jsErrCode jerr = 0;
jsOptions jsOpts;
jsSubOptions so;
natsStatus s;
bool delStream = false;
opts = parseArgs(argc, argv, usage);
printf("Created %s subscription on '%s'.\n",
(pull ? "pull" : (async ? "asynchronous" : "synchronous")), subj);
s = natsOptions_SetErrorHandler(opts, asyncCb, NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
if (s == NATS_OK)
s = jsOptions_Init(&jsOpts);
if (s == NATS_OK)
s = jsSubOptions_Init(&so);
if (s == NATS_OK)
{
so.Stream = stream;
so.Consumer = durable;
if (flowctrl)
{
so.Config.FlowControl = true;
so.Config.Heartbeat = (int64_t)1E9;
}
}
if (s == NATS_OK)
s = natsConnection_JetStream(&js, conn, &jsOpts);
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
// First check if the stream already exists.
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_NOT_FOUND)
{
jsStreamConfig cfg;
// Since we are the one creating this stream, we can delete at the end.
delStream = true;
// Initialize the configuration structure.
jsStreamConfig_Init(&cfg);
cfg.Name = stream;
// Set the subject
cfg.Subjects = (const char*[1]){subj};
cfg.SubjectsLen = 1;
// Make it a memory stream.
cfg.Storage = js_MemoryStorage;
// Add the stream,
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
}
if (s == NATS_OK)
{
printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
// Need to destroy the returned stream object.
jsStreamInfo_Destroy(si);
}
}
if (s == NATS_OK)
{
if (pull)
s = js_PullSubscribe(&sub, js, subj, durable, &jsOpts, &so, &jerr);
else if (async)
s = js_Subscribe(&sub, js, subj, onMsg, NULL, &jsOpts, &so, &jerr);
else
s = js_SubscribeSync(&sub, js, subj, &jsOpts, &so, &jerr);
}
if (s == NATS_OK)
s = natsSubscription_SetPendingLimits(sub, -1, -1);
if (s == NATS_OK)
s = natsStatistics_Create(&stats);
if ((s == NATS_OK) && pull)
{
natsMsgList list;
int i;
for (count = 0; (s == NATS_OK) && (count < total); )
{
s = natsSubscription_Fetch(&list, sub, 1024, 5000, &jerr);
if (s != NATS_OK)
break;
if (start == 0)
start = nats_Now();
count += (int64_t) list.Count;
for (i=0; (s == NATS_OK) && (i<list.Count); i++)
s = natsMsg_Ack(list.Msgs[i], &jsOpts);
natsMsgList_Destroy(&list);
}
}
else if ((s == NATS_OK) && async)
{
while (s == NATS_OK)
{
if (count + dropped == total)
break;
nats_Sleep(1000);
}
}
else if (s == NATS_OK)
{
for (count = 0; (s == NATS_OK) && (count < total); count++)
{
s = natsSubscription_NextMsg(&msg, sub, 5000);
if (s != NATS_OK)
break;
if (start == 0)
start = nats_Now();
s = natsMsg_Ack(msg, &jsOpts);
natsMsg_Destroy(msg);
}
}
if (s == NATS_OK)
{
printStats(STATS_IN|STATS_COUNT, conn, sub, stats);
printPerf("Received");
}
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
// Let's report some stats after the run
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_OK)
{
printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
jsStreamInfo_Destroy(si);
}
if (delStream)
{
printf("\nDeleting stream %s: ", stream);
s = js_DeleteStream(js, stream, NULL, &jerr);
if (s == NATS_OK)
printf("OK!");
printf("\n");
}
}
else
{
printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
nats_PrintLastErrorStack(stderr);
}
// Destroy all our objects to avoid report of memory leak
jsCtx_Destroy(js);
natsStatistics_Destroy(stats);
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);
// To silence reports of memory still in used with valgrind
nats_Close();
return 0;
}func ExampleJetStream() {
nc, err := nats.Connect("localhost")
if err != nil {
log.Fatal(err)
}
// Use the JetStream context to produce and consumer messages
// that have been persisted.
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
log.Fatal(err)
}
js.AddStream(&nats.StreamConfig{
Name: "FOO",
Subjects: []string{"foo"},
})
js.Publish("foo", []byte("Hello JS!"))
// Publish messages asynchronously.
for i := 0; i < 500; i++ {
js.PublishAsync("foo", []byte("Hello JS Async!"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
fmt.Println("Did not resolve in time")
}
// Create async consumer on subject 'foo'. Async subscribers
// ack a message once exiting the callback.
js.Subscribe("foo", func(msg *nats.Msg) {
meta, _ := msg.Metadata()
fmt.Printf("Stream Sequence : %v\n", meta.Sequence.Stream)
fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer)
})
// Async subscriber with manual acks.
js.Subscribe("foo", func(msg *nats.Msg) {
msg.Ack()
}, nats.ManualAck())
// Async queue subscription where members load balance the
// received messages together.
// If no consumer name is specified, either with nats.Bind()
// or nats.Durable() options, the queue name is used as the
// durable name (that is, as if you were passing the
// nats.Durable(<queue group name>) option.
// It is recommended to use nats.Bind() or nats.Durable()
// and preferably create the JetStream consumer beforehand
// (using js.AddConsumer) so that the JS consumer is not
// deleted on an Unsubscribe() or Drain() when the member
// that created the consumer goes away first.
// Check Godoc for the QueueSubscribe() API for more details.
js.QueueSubscribe("foo", "group", func(msg *nats.Msg) {
msg.Ack()
}, nats.ManualAck())
// Subscriber to consume messages synchronously.
sub, _ := js.SubscribeSync("foo")
msg, _ := sub.NextMsg(2 * time.Second)
msg.Ack()
// We can add a member to the group, with this member using
// the synchronous version of the QueueSubscribe.
sub, _ = js.QueueSubscribeSync("foo", "group")
msg, _ = sub.NextMsg(2 * time.Second)
msg.Ack()
// ChanSubscribe
msgCh := make(chan *nats.Msg, 8192)
sub, _ = js.ChanSubscribe("foo", msgCh)
select {
case msg := <-msgCh:
fmt.Println("[Received]", msg)
case <-time.After(1 * time.Second):
}
}package io.nats.examples.jetstream;
import io.nats.client.*;
import io.nats.client.api.PublishAck;
import io.nats.examples.ExampleArgs;
import io.nats.examples.ExampleUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static io.nats.examples.jetstream.NatsJsUtils.createStreamExitWhenExists;
/**
* This example will demonstrate JetStream push subscribing using a durable consumer and a queue
*/
public class NatsJsPushSubQueueDurable {
static final String usageString =
"\nUsage: java -cp <classpath> NatsJsPushSubQueueDurable [-s server] [-strm stream] [-sub subject] [-q queue] [-dur durable] [-mcnt msgCount] [-scnt subCount]"
+ "\n\nDefault Values:"
+ "\n [-strm stream] qdur-stream"
+ "\n [-sub subject] qdur-subject"
+ "\n [-q queue] qdur-queue"
+ "\n [-dur durable] qdur-durable"
+ "\n [-mcnt msgCount] 100"
+ "\n [-scnt subCount] 5"
+ "\n\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n"
+ "\nSet the environment variable NATS_NKEY to use challenge response authentication by setting a file containing your private key.\n"
+ "\nSet the environment variable NATS_CREDS to use JWT/NKey authentication by setting a file containing your user creds.\n"
+ "\nUse the URL in the -s server parameter for user/pass/token authentication.\n";
public static void main(String[] args) {
ExampleArgs exArgs = ExampleArgs.builder("Push Subscribe, Durable Consumer, Queue", args, usageString)
.defaultStream("qdur-stream")
.defaultSubject("qdur-subject")
.defaultQueue("qdur-queue")
.defaultDurable("qdur-durable")
.defaultMsgCount(100)
.defaultSubCount(5)
.build();
try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions(exArgs.server, true))) {
// Create a JetStreamManagement context.
JetStreamManagement jsm = nc.jetStreamManagement();
// Use the utility to create a stream stored in memory.
createStreamExitWhenExists(jsm, exArgs.stream, exArgs.subject);
// Create our JetStream context
JetStream js = nc.jetStream();
System.out.println();
// Setup the subscribers
// - the PushSubscribeOptions can be re-used since all the subscribers are the same
// - use a concurrent integer to track all the messages received
// - have a list of subscribers and threads so I can track them
PushSubscribeOptions pso = PushSubscribeOptions.builder().durable(exArgs.durable).build();
AtomicInteger allReceived = new AtomicInteger();
List<JsQueueSubscriber> subscribers = new ArrayList<>();
List<Thread> subThreads = new ArrayList<>();
for (int id = 1; id <= exArgs.subCount; id++) {
// setup the subscription
JetStreamSubscription sub = js.subscribe(exArgs.subject, exArgs.queue, pso);
// create and track the runnable
JsQueueSubscriber qs = new JsQueueSubscriber(id, exArgs, js, sub, allReceived);
subscribers.add(qs);
// create, track and start the thread
Thread t = new Thread(qs);
subThreads.add(t);
t.start();
}
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server
// create and start the publishing
Thread pubThread = new Thread(new JsPublisher(js, exArgs));
pubThread.start();
// wait for all threads to finish
pubThread.join();
for (Thread t : subThreads) {
t.join();
}
// report
for (JsQueueSubscriber qs : subscribers) {
qs.report();
}
System.out.println();
// delete the stream since we are done with it.
jsm.deleteStream(exArgs.stream);
}
catch (Exception e) {
e.printStackTrace();
}
}
static class JsPublisher implements Runnable {
JetStream js;
ExampleArgs exArgs;
public JsPublisher(JetStream js, ExampleArgs exArgs) {
this.js = js;
this.exArgs = exArgs;
}
@Override
public void run() {
for (int x = 1; x <= exArgs.msgCount; x++) {
try {
PublishAck pa = js.publish(exArgs.subject, ("Data # " + x).getBytes(StandardCharsets.US_ASCII));
} catch (IOException | JetStreamApiException e) {
// something pretty wrong here
e.printStackTrace();
System.exit(-1);
}
}
}
}
static class JsQueueSubscriber implements Runnable {
int id;
int thisReceived;
List<String> datas;
ExampleArgs exArgs;
JetStream js;
JetStreamSubscription sub;
AtomicInteger allReceived;
public JsQueueSubscriber(int id, ExampleArgs exArgs, JetStream js, JetStreamSubscription sub, AtomicInteger allReceived) {
this.id = id;
thisReceived = 0;
datas = new ArrayList<>();
this.exArgs = exArgs;
this.js = js;
this.sub = sub;
this.allReceived = allReceived;
}
public void report() {
System.out.printf("Sub # %d handled %d messages.\n", id, thisReceived);
}
@Override
public void run() {
while (allReceived.get() < exArgs.msgCount) {
try {
Message msg = sub.nextMessage(Duration.ofMillis(500));
while (msg != null) {
thisReceived++;
allReceived.incrementAndGet();
String data = new String(msg.getData(), StandardCharsets.US_ASCII);
datas.add(data);
System.out.printf("QS # %d message # %d %s\n", id, thisReceived, data);
msg.ack();
msg = sub.nextMessage(Duration.ofMillis(500));
}
} catch (InterruptedException e) {
// just try again
}
}
System.out.printf("QS # %d completed.\n", id);
}
}
}import { AckPolicy, connect } from "../../src/mod.ts";
import { nuid } from "../../nats-base-client/nuid.ts";
const nc = await connect();
// create a regular subscription - this is plain nats
const sub = nc.subscribe("my.messages", { max: 5 });
const done = (async () => {
for await (const m of sub) {
console.log(m.subject);
m.respond();
}
})();
const jsm = await nc.jetstreamManager();
const stream = nuid.next();
const subj = nuid.next();
await jsm.streams.add({ name: stream, subjects: [`${subj}.>`] });
// create a consumer that delivers to the subscription
await jsm.consumers.add(stream, {
ack_policy: AckPolicy.Explicit,
deliver_subject: "my.messages",
});
// publish some old nats messages
nc.publish(`${subj}.A`);
nc.publish(`${subj}.B`);
nc.publish(`${subj}.C`);
nc.publish(`${subj}.D.A`);
nc.publish(`${subj}.F.A.B`);
await done;
await nc.close();import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo's subject.
await js.add_stream(name="sample-stream", subjects=["foo"])
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print(ack)
# Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("foo", "psub")
# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch(1)
for msg in msgs:
print(msg)
# Create single push based subscriber that is durable across restarts.
sub = await js.subscribe("foo", durable="myapp")
msg = await sub.next_msg()
await msg.ack()
# Create deliver group that will be have load balanced messages.
async def qsub_a(msg):
print("QSUB A:", msg)
await msg.ack()
async def qsub_b(msg):
print("QSUB B:", msg)
await msg.ack()
await js.subscribe("foo", "workers", cb=qsub_a)
await js.subscribe("foo", "workers", cb=qsub_b)
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print("\t", ack)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())// NATS .NET doesn't publicly support push consumers and treats all consumers
// as just consumers. The mecahnics of the consuming messages are abstracted
// away from the applications and are handled by the library.#include "examples.h"
static const char *usage = ""\
"-gd use global message delivery thread pool\n" \
"-sync receive synchronously (default is asynchronous)\n" \
"-pull use pull subscription\n" \
"-fc enable flow control\n" \
"-count number of expected messages\n";
static void
onMsg(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
if (print)
printf("Received msg: %s - %.*s\n",
natsMsg_GetSubject(msg),
natsMsg_GetDataLength(msg),
natsMsg_GetData(msg));
if (start == 0)
start = nats_Now();
// We should be using a mutex to protect those variables since
// they are used from the subscription's delivery and the main
// threads. For demo purposes, this is fine.
if (++count == total)
elapsed = nats_Now() - start;
// Since this is auto-ack callback, we don't need to ack here.
natsMsg_Destroy(msg);
}
static void
asyncCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure)
{
printf("Async error: %u - %s\n", err, natsStatus_GetText(err));
natsSubscription_GetDropped(sub, (int64_t*) &dropped);
}
int main(int argc, char **argv)
{
natsConnection *conn = NULL;
natsStatistics *stats = NULL;
natsOptions *opts = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
jsCtx *js = NULL;
jsErrCode jerr = 0;
jsOptions jsOpts;
jsSubOptions so;
natsStatus s;
bool delStream = false;
opts = parseArgs(argc, argv, usage);
printf("Created %s subscription on '%s'.\n",
(pull ? "pull" : (async ? "asynchronous" : "synchronous")), subj);
s = natsOptions_SetErrorHandler(opts, asyncCb, NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
if (s == NATS_OK)
s = jsOptions_Init(&jsOpts);
if (s == NATS_OK)
s = jsSubOptions_Init(&so);
if (s == NATS_OK)
{
so.Stream = stream;
so.Consumer = durable;
if (flowctrl)
{
so.Config.FlowControl = true;
so.Config.Heartbeat = (int64_t)1E9;
}
}
if (s == NATS_OK)
s = natsConnection_JetStream(&js, conn, &jsOpts);
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
// First check if the stream already exists.
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_NOT_FOUND)
{
jsStreamConfig cfg;
// Since we are the one creating this stream, we can delete at the end.
delStream = true;
// Initialize the configuration structure.
jsStreamConfig_Init(&cfg);
cfg.Name = stream;
// Set the subject
cfg.Subjects = (const char*[1]){subj};
cfg.SubjectsLen = 1;
// Make it a memory stream.
cfg.Storage = js_MemoryStorage;
// Add the stream,
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
}
if (s == NATS_OK)
{
printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
// Need to destroy the returned stream object.
jsStreamInfo_Destroy(si);
}
}
if (s == NATS_OK)
{
if (pull)
s = js_PullSubscribe(&sub, js, subj, durable, &jsOpts, &so, &jerr);
else if (async)
s = js_Subscribe(&sub, js, subj, onMsg, NULL, &jsOpts, &so, &jerr);
else
s = js_SubscribeSync(&sub, js, subj, &jsOpts, &so, &jerr);
}
if (s == NATS_OK)
s = natsSubscription_SetPendingLimits(sub, -1, -1);
if (s == NATS_OK)
s = natsStatistics_Create(&stats);
if ((s == NATS_OK) && pull)
{
natsMsgList list;
int i;
for (count = 0; (s == NATS_OK) && (count < total); )
{
s = natsSubscription_Fetch(&list, sub, 1024, 5000, &jerr);
if (s != NATS_OK)
break;
if (start == 0)
start = nats_Now();
count += (int64_t) list.Count;
for (i=0; (s == NATS_OK) && (i<list.Count); i++)
s = natsMsg_Ack(list.Msgs[i], &jsOpts);
natsMsgList_Destroy(&list);
}
}
else if ((s == NATS_OK) && async)
{
while (s == NATS_OK)
{
if (count + dropped == total)
break;
nats_Sleep(1000);
}
}
else if (s == NATS_OK)
{
for (count = 0; (s == NATS_OK) && (count < total); count++)
{
s = natsSubscription_NextMsg(&msg, sub, 5000);
if (s != NATS_OK)
break;
if (start == 0)
start = nats_Now();
s = natsMsg_Ack(msg, &jsOpts);
natsMsg_Destroy(msg);
}
}
if (s == NATS_OK)
{
printStats(STATS_IN|STATS_COUNT, conn, sub, stats);
printPerf("Received");
}
if (s == NATS_OK)
{
jsStreamInfo *si = NULL;
// Let's report some stats after the run
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
if (s == NATS_OK)
{
printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
si->Config->Name, si->State.Msgs, si->State.Bytes);
jsStreamInfo_Destroy(si);
}
if (delStream)
{
printf("\nDeleting stream %s: ", stream);
s = js_DeleteStream(js, stream, NULL, &jerr);
if (s == NATS_OK)
printf("OK!");
printf("\n");
}
}
else
{
printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
nats_PrintLastErrorStack(stderr);
}
// Destroy all our objects to avoid report of memory leak
jsCtx_Destroy(js);
natsStatistics_Destroy(stats);
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);
// To silence reports of memory still in used with valgrind
nats_Close();.
return 0;
}func ExampleJetStream() {
nc, err := nats.Connect("localhost")
if err != nil {
log.Fatal(err)
}
// Use the JetStream context to produce and consumer messages
// that have been persisted.
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
log.Fatal(err)
}
js.AddStream(&nats.StreamConfig{
Name: "FOO",
Subjects: []string{"foo"},
})
js.Publish("foo", []byte("Hello JS!"))
// ordered push consumer
js.Subscribe("foo", func(msg *nats.Msg) {
meta, _ := msg.Metadata()
fmt.Printf("Stream Sequence : %v\n", meta.Sequence.Stream)
fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer)
}, nats.OrderedConsumer())
}package io.nats.examples.jetstream;
import io.nats.client.*;
import io.nats.client.api.PublishAck;
import io.nats.client.impl.NatsMessage;
import io.nats.examples.ExampleArgs;
import io.nats.examples.ExampleUtils;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.TemporalUnit;
public class myExample {
public static void main(String[] args) {
final String subject = "foo";
try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions("localhost"))) {
// Create a JetStream context. This hangs off the original connection
// allowing us to produce data to streams and consume data from
// JetStream consumers.
JetStream js = nc.jetStream();
// This example assumes there is a stream already created on subject "foo" and some messages already stored in that stream
// create our message handler.
MessageHandler handler = msg -> {
System.out.println("\nMessage Received:");
if (msg.hasHeaders()) {
System.out.println(" Headers:");
for (String key : msg.getHeaders().keySet()) {
for (String value : msg.getHeaders().get(key)) {
System.out.printf(" %s: %s\n", key, value);
}
}
}
System.out.printf(" Subject: %s\n Data: %s\n",
msg.getSubject(), new String(msg.getData(), StandardCharsets.UTF_8));
System.out.println(" " + msg.metaData());
};
Dispatcher dispatcher = nc.createDispatcher();
PushSubscribeOptions pso = PushSubscribeOptions.builder().ordered(true).build();
JetStreamSubscription sub = js.subscribe(subject, dispatcher, handler, false, pso);
Thread.sleep(100);
sub.drain(Duration.ofMillis(100));
nc.drain(Duration.ofMillis(100));
}
catch(Exception e)
{
e.printStackTrace();
}
}
}import { connect, consumerOpts } from "../../src/mod.ts";
const nc = await connect();
const js = nc.jetstream();
// note the consumer is not a durable - so when after the
// subscription ends, the server will auto destroy the
// consumer
const opts = consumerOpts();
opts.manualAck();
opts.maxMessages(2);
opts.deliverTo("xxx");
const sub = await js.subscribe("a.>", opts);
await (async () => {
for await (const m of sub) {
console.log(m.seq, m.subject);
m.ack();
}
})();
await nc.close();import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Create ordered consumer with flow control and heartbeats
# that auto resumes on failures.
osub = await js.subscribe("foo", ordered_consumer=True)
data = bytearray()
while True:
try:
msg = await osub.next_msg()
data.extend(msg.data)
except TimeoutError:
break
print("All data in stream:", len(data))
await nc.close()
if __name__ == '__main__':
asyncio.run(main())// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
await using var client = new NatsClient();
INatsJSContext js = client.CreateJetStreamContext();
var streamConfig = new StreamConfig(name: "FOO", subjects: ["foo"]);
await js.CreateStreamAsync(streamConfig);
PubAckResponse ack = await js.PublishAsync("foo", "Hello, JetStream!");
ack.EnsureSuccess();
INatsJSConsumer orderedConsumer = await js.CreateOrderedConsumerAsync("FOO");
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await foreach (NatsJSMsg<string> msg in orderedConsumer.ConsumeAsync<string>(cancellationToken: cts.Token))
{
NatsJSMsgMetadata? meta = msg.Metadata;
Console.WriteLine($"Stream Sequence : {meta?.Sequence.Stream}");
Console.WriteLine($"Consumer Sequence: {meta?.Sequence.Consumer}");
}