Powered By GitBook
Listen for Connection Events
While the connection status is interesting, it is perhaps more interesting to know when the status changes. Most, if not all, of the NATS client libraries provide a way to listen for events related to the connection and its status.
The actual API for these listeners is language dependent, but the following examples show a few of the more common use cases. See the API documentation for the client library you are using for more specific instructions.
Connection events may include the connection being closed, disconnected or reconnected. Reconnecting involves a disconnect and connect, but depending on the library implementation may also include multiple disconnects as the client tries to find a server, or the server is rebooted.
Go
Java
JavaScript
Python
Ruby
TypeScript
C
1
// There is not a single listener for connection events in the NATS Go Client.
2
// Instead, you can set individual event handlers using:
3
nc, err := nats.Connect("demo.nats.io",
4
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
5
log.Printf("client disconnected: %v", err)
6
}),
7
nats.ReconnectHandler(func(_ *nats.Conn) {
8
log.Printf("client reconnected")
9
}),
10
nats.ClosedHandler(func(_ *nats.Conn) {
11
log.Printf("client closed")
12
}))
13
if err != nil {
14
log.Fatal(err)
15
}
16
defer nc.Close()
17
18
DisconnectHandler(cb ConnHandler)
19
ReconnectHandler(cb ConnHandler)
20
ClosedHandler(cb ConnHandler)
21
DiscoveredServersHandler(cb ConnHandler)
22
ErrorHandler(cb ErrHandler)
Copied!
1
class MyConnectionListener implements ConnectionListener {
2
public void connectionEvent(Connection natsConnection, Events event) {
3
System.out.println("Connection event - "+event);
4
}
5
}
6
7
public class SetConnectionListener {
8
public static void main(String[] args) {
9
10
try {
11
Options options = new Options.Builder().
12
server("nats://demo.nats.io:4222").
13
connectionListener(new MyConnectionListener()). // Set the listener
14
build();
15
Connection nc = Nats.connect(options);
16
17
// Do something with the connection
18
19
nc.close();
20
} catch (Exception e) {
21
e.printStackTrace();
22
}
23
}
24
}
Copied!
1
let nc = NATS.connect("nats://demo.nats.io:4222");
2
3
nc.on('error', (err) => {
4
t.log('error', err);
5
});
6
7
nc.on('connect', () => {
8
t.log('client connected');
9
});
10
11
nc.on('disconnect', () => {
12
t.log('client disconnected');
13
});
14
15
nc.on('reconnecting', () => {
16
t.log('client reconnecting');
17
});
18
19
nc.on('reconnect', () => {
20
t.log('client reconnected');
21
});
22
23
nc.on('close', () => {
24
t.log('client closed');
25
});
26
27
nc.on('permission_error', (err) => {
28
t.log('permission_error', err);
29
});
Copied!
1
# Asyncio NATS client can be defined a number of event callbacks
2
async def disconnected_cb():
3
print("Got disconnected!")
4
5
async def reconnected_cb():
6
# See who we are connected to on reconnect.
7
print("Got reconnected to {url}".format(url=nc.connected_url.netloc))
8
9
async def error_cb(e):
10
print("There was an error: {}".format(e))
11
12
async def closed_cb():
13
print("Connection is closed")
14
15
# Setup callbacks to be notified on disconnects and reconnects
16
options["disconnected_cb"] = disconnected_cb
17
options["reconnected_cb"] = reconnected_cb
18
19
# Setup callbacks to be notified when there is an error
20
# or connection is closed.
21
options["error_cb"] = error_cb
22
options["closed_cb"] = closed_cb
23
24
await nc.connect(**options)
Copied!
1
# There is not a single listener for connection events in the Ruby NATS Client.
2
# Instead, you can set individual event handlers using:
3
4
NATS.on_disconnect do
5
end
6
7
NATS.on_reconnect do
8
end
9
10
NATS.on_close do
11
end
12
13
NATS.on_error do
14
end
Copied!
1
// connect will happen once - the first connect
2
nc.on('connect', (nc) => {
3
// nc is the connection that connected
4
t.log('client connected');
5
});
6
7
nc.on('disconnect', (url) => {
8
// nc is the connection that reconnected
9
t.log('disconnected from', url);
10
});
11
12
nc.on('reconnecting', (url) => {
13
t.log('reconnecting to', url);
14
});
15
16
nc.on('reconnect', (nc, url) => {
17
// nc is the connection that reconnected
18
t.log('reconnected to', url);
19
});
Copied!
1
static void
2
disconnectedCB(natsConnection *conn, void *closure)
3
{
4
// Do something
5
printf("Connection disconnected\n");
6
}
7
8
static void
9
reconnectedCB(natsConnection *conn, void *closure)
10
{
11
// Do something
12
printf("Connection reconnected\n");
13
}
14
15
static void
16
closedCB(natsConnection *conn, void *closure)
17
{
18
// Do something
19
printf("Connection closed\n");
20
}
21
22
(...)
23
24
natsConnection *conn = NULL;
25
natsOptions *opts = NULL;
26
natsStatus s = NATS_OK;
27
28
s = natsOptions_Create(&opts);
29
if (s == NATS_OK)
30
s = natsOptions_SetDisconnectedCB(opts, disconnectedCB, NULL);
31
if (s == NATS_OK)
32
s = natsOptions_SetReconnectedCB(opts, reconnectedCB, NULL);
33
if (s == NATS_OK)
34
s = natsOptions_SetClosedCB(opts, closedCB, NULL);
35
if (s == NATS_OK)
36
s = natsConnection_Connect(&conn, opts);
37
38
(...)
39
40
// Destroy objects that were created
41
natsConnection_Destroy(conn);
42
natsOptions_Destroy(opts);
Copied!

Listen for New Servers

When working with a cluster, servers may be added or changed. Some of the clients allow you to listen for this notification:
Go
Java
JavaScript
Python
Ruby
TypeScript
C
1
// Be notified if a new server joins the cluster.
2
// Print all the known servers and the only the ones that were discovered.
3
nc, err := nats.Connect("demo.nats.io",
4
nats.DiscoveredServersHandler(func(nc *nats.Conn) {
5
log.Printf("Known servers: %v\n", nc.Servers())
6
log.Printf("Discovered servers: %v\n", nc.DiscoveredServers())
7
}))
8
if err != nil {
9
log.Fatal(err)
10
}
11
defer nc.Close()
12
13
// Do something with the connection
Copied!
1
class ServersAddedListener implements ConnectionListener {
2
public void connectionEvent(Connection nc, Events event) {
3
if (event == Events.DISCOVERED_SERVERS) {
4
for (String server : nc.getServers()) {
5
System.out.println("Known server: "+server);
6
}
7
}
8
}
9
}
10
11
public class ListenForNewServers {
12
public static void main(String[] args) {
13
14
try {
15
Options options = new Options.Builder().
16
server("nats://demo.nats.io:4222").
17
connectionListener(new ServersAddedListener()). // Set the listener
18
build();
19
Connection nc = Nats.connect(options);
20
21
// Do something with the connection
22
23
nc.close();
24
} catch (Exception e) {
25
e.printStackTrace();
26
}
27
}
28
}
Copied!
1
let nc = NATS.connect("nats://demo.nats.io:4222");
2
nc.on('serversDiscovered', (urls) => {
3
t.log('serversDiscovered', urls);
4
});
Copied!
1
# Asyncio NATS client does not support discovered servers handler right now
Copied!
1
# The Ruby NATS client does not support discovered servers handler right now
Copied!
1
nc.on('serversChanged', (ce) => {
2
t.log('servers changed\n', 'added: ',ce.added, 'removed:', ce.removed);
3
});
Copied!
1
static void
2
discoveredServersCB(natsConnection *conn, void *closure)
3
{
4
natsStatus s = NATS_OK;
5
char **servers = NULL;
6
int count = 0;
7
8
s = natsConnection_GetDiscoveredServers(conn, &servers, &count);
9
if (s == NATS_OK)
10
{
11
int i;
12
13
// Do something...
14
for (i=0; i<count; i++)
15
printf("Discovered server: %s\n", servers[i]);
16
17
// Free allocated memory
18
for (i=0; i<count; i++)
19
free(servers[i]);
20
free(servers);
21
}
22
}
23
24
(...)
25
26
natsConnection *conn = NULL;
27
natsOptions *opts = NULL;
28
natsStatus s = NATS_OK;
29
30
s = natsOptions_Create(&opts);
31
if (s == NATS_OK)
32
s = natsOptions_SetDiscoveredServersCB(opts, discoveredServersCB, NULL);
33
if (s == NATS_OK)
34
s = natsConnection_Connect(&conn, opts);
35
36
(...)
37
38
39
// Destroy objects that were created
40
natsConnection_Destroy(conn);
41
natsOptions_Destroy(opts);
Copied!

Listen for Errors

The client library may separate server-to-client errors from events. Many server events are not handled by application code and result in the connection being closed. Listening for the errors can be very useful for debugging problems.
Go
Java
JavaScript
Python
Ruby
TypeScript
C
1
// Set the callback that will be invoked when an asynchronous error occurs.
2
nc, err := nats.Connect("demo.nats.io",
3
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
4
log.Printf("Error: %v", err)
5
}))
6
if err != nil {
7
log.Fatal(err)
8
}
9
defer nc.Close()
10
11
// Do something with the connection
Copied!
1
class MyErrorListener implements ErrorListener {
2
public void errorOccurred(Connection conn, String error)
3
{
4
System.out.println("The server notificed the client with: "+error);
5
}
6
7
public void exceptionOccurred(Connection conn, Exception exp) {
8
System.out.println("The connection handled an exception: "+exp.getLocalizedMessage());
9
}
10
11
public void slowConsumerDetected(Connection conn, Consumer consumer) {
12
System.out.println("A slow consumer was detected.");
13
}
14
}
15
16
public class SetErrorListener {
17
public static void main(String[] args) {
18
19
try {
20
Options options = new Options.Builder().
21
server("nats://demo.nats.io:4222").
22
errorListener(new MyErrorListener()). // Set the listener
23
build();
24
Connection nc = Nats.connect(options);
25
26
// Do something with the connection
27
28
nc.close();
29
} catch (Exception e) {
30
e.printStackTrace();
31
}
32
}
33
}
Copied!
1
let nc = NATS.connect("nats://demo.nats.io:4222");
2
3
// on node you *must* register an error listener. If not registered
4
// the library emits an 'error' event, the node process will exit.
5
nc.on('error', (err) => {
6
t.log('client got an error:', err);
7
});
Copied!
1
nc = NATS()
2
3
async def error_cb(e):
4
print("Error: ", e)
5
6
await nc.connect(
7
servers=["nats://demo.nats.io:4222"],
8
reconnect_time_wait=10,
9
error_cb=error_cb,
10
)
11
12
# Do something with the connection.
Copied!
1
require 'nats/client'
2
3
NATS.start(servers:["nats://demo.nats.io:4222"]) do |nc|
4
nc.on_error do |e|
5
puts "Error: #{e}"
6
end
7
8
nc.close
9
end
Copied!
1
// on node you *must* register an error listener. If not registered
2
// the library emits an 'error' event, the node process will exit.
3
nc.on('error', (err) => {
4
t.log('client got an out of band error:', err);
5
});
Copied!
1
static void
2
errorCB(natsConnection *conn, natsSubscription *sub, natsStatus s, void *closure)
3
{
4
// Do something
5
printf("Error: %d - %s\n", s, natsStatus_GetText(s));
6
}
7
8
(...)
9
10
natsConnection *conn = NULL;
11
natsOptions *opts = NULL;
12
natsStatus s = NATS_OK;
13
14
s = natsOptions_Create(&opts);
15
if (s == NATS_OK)
16
s = natsOptions_SetErrorHandler(opts, errorCB, NULL);
17
if (s == NATS_OK)
18
s = natsConnection_Connect(&conn, opts);
19
20
(...)
21
22
// Destroy objects that were created
23
natsConnection_Destroy(conn);
24
natsOptions_Destroy(opts);
Copied!
Last modified 1yr ago
Export as PDF
Copy link