Powered By GitBook
Advanced Connect and Custom Dialer in Go
The Go NATS client features a CustomDialer option which allows you to customize the connection logic against the NATS server without having to modify the internals of the client. For example, let's say that you want to make the client use the context package to use DialContext and be able to cancel connecting to NATS altogether with a deadline, you could then do define a Dialer implementation as follows:
1
package main
2
3
import (
4
"context"
5
"log"
6
"net"
7
"time"
8
9
"github.com/nats-io/nats.go"
10
)
11
12
type customDialer struct {
13
ctx context.Context
14
nc *nats.Conn
15
connectTimeout time.Duration
16
connectTimeWait time.Duration
17
}
18
19
func (cd *customDialer) Dial(network, address string) (net.Conn, error) {
20
ctx, cancel := context.WithTimeout(cd.ctx, cd.connectTimeout)
21
defer cancel()
22
23
for {
24
log.Println("Attempting to connect to", address)
25
if ctx.Err() != nil {
26
return nil, ctx.Err()
27
}
28
29
select {
30
case <-cd.ctx.Done():
31
return nil, cd.ctx.Err()
32
default:
33
d := &net.Dialer{}
34
if conn, err := d.DialContext(ctx, network, address); err == nil {
35
log.Println("Connected to NATS successfully")
36
return conn, nil
37
} else {
38
time.Sleep(cd.connectTimeWait)
39
}
40
}
41
}
42
}
Copied!
With the dialer implementation above, the NATS client will retry a number of times to connect to the NATS server until the context is no longer valid:
1
func main() {
2
// Parent context cancels connecting/reconnecting altogether.
3
ctx, cancel := context.WithCancel(context.Background())
4
defer cancel()
5
6
var err error
7
var nc *nats.Conn
8
cd := &customDialer{
9
ctx: ctx,
10
connectTimeout: 10 * time.Second,
11
connectTimeWait: 1 * time.Second,
12
}
13
opts := []nats.Option{
14
nats.SetCustomDialer(cd),
15
nats.ReconnectWait(2 * time.Second),
16
nats.ReconnectHandler(func(c *nats.Conn) {
17
log.Println("Reconnected to", c.ConnectedUrl())
18
}),
19
nats.DisconnectHandler(func(c *nats.Conn) {
20
log.Println("Disconnected from NATS")
21
}),
22
nats.ClosedHandler(func(c *nats.Conn) {
23
log.Println("NATS connection is closed.")
24
}),
25
nats.NoReconnect(),
26
}
27
go func() {
28
nc, err = nats.Connect("127.0.0.1:4222", opts...)
29
}()
30
31
WaitForEstablishedConnection:
32
for {
33
if err != nil {
34
log.Fatal(err)
35
}
36
37
// Wait for context to be canceled either by timeout
38
// or because of establishing a connection...
39
select {
40
case <-ctx.Done():
41
break WaitForEstablishedConnection
42
default:
43
}
44
45
if nc == nil || !nc.IsConnected() {
46
log.Println("Connection not ready")
47
time.Sleep(200 * time.Millisecond)
48
continue
49
}
50
break WaitForEstablishedConnection
51
}
52
if ctx.Err() != nil {
53
log.Fatal(ctx.Err())
54
}
55
56
for {
57
if nc.IsClosed() {
58
break
59
}
60
if err := nc.Publish("hello", []byte("world")); err != nil {
61
log.Println(err)
62
time.Sleep(1 * time.Second)
63
continue
64
}
65
log.Println("Published message")
66
time.Sleep(1 * time.Second)
67
}
68
69
// Disconnect and flush pending messages
70
if err := nc.Drain(); err != nil {
71
log.Println(err)
72
}
73
log.Println("Disconnected")
74
}
Copied!
Last modified 2yr ago
Export as PDF
Copy link