diff --git a/README.md b/README.md index dc63819..45b0d47 100644 --- a/README.md +++ b/README.md @@ -96,6 +96,51 @@ for location := range omlox.ReceiveAs[omlox.Location](sub) { } ``` +#### Auto-Reconnection + +The client supports automatic WebSocket reconnection with configurable retry policies. This is essential for production deployments where connections may be disrupted due to network issues, server restarts, or high load. + +```go +// Create client with auto-reconnection enabled, unlimited retries, and backoff timing +// Dials a Omlox Hub websocket interface, subscribes to +// the location_updates topic and listens to new +// location messages. + +ctx, cancel := context.WithCancel(context.Background()) +defer cancel() + +client, err := omlox.Connect( + ctx, + "localhost:7081/v2", + omlox.WithWSAutoReconnect(true), + omlox.WithWSMaxRetries(-1), + omlox.WithWSRetryWait(time.Second, 30*time.Second), +) +if err != nil { + log.Fatal(err) +} +defer client.Close() + +sub, err := client.Subscribe(ctx, omlox.TopicLocationUpdates) +if err != nil { + log.Fatal(err) +} + +for location := range omlox.ReceiveAs[omlox.Location](sub) { + _ = location // handle location update +} +``` + +For high-load scenarios, also configure the HTTP connection pool: + +```go +client, err := omlox.New( + "https://localhost:7081/v2", + omlox.WithWSAutoReconnect(true), + omlox.WithConnectionPoolSettings(200, 100, 0), // Prevent pool exhaustion +) +``` + ### Error Handling Errors are returned when Omlox Hub responds with an HTTP status code outside of the 200 to 399 range. @@ -310,4 +355,4 @@ If you have any trouble getting started, reach out to us by email (see the [MAIN > [!NOTE] > The code provided by this library is not certified by Omlox or the Profibus & Profinet International. -> Solutions using this library should go through the [certification process](https://omlox.com/certification) defined by the Omlox™ consortium to be an _"omlox certified solution"_. +> Solutions using this library should go through the [certification process](https://omlox.com/certification) defined by the Omlox™ consortium to be an _"omlox certified solution"_. \ No newline at end of file diff --git a/client.go b/client.go index 4a9da63..de3b228 100644 --- a/client.go +++ b/client.go @@ -13,7 +13,6 @@ import ( "net/url" "sync" - "golang.org/x/sync/errgroup" "nhooyr.io/websocket" ) @@ -33,13 +32,18 @@ type Client struct { // websockets client fields - errg *errgroup.Group - cancel context.CancelFunc + lifecycleWg sync.WaitGroup + cancel context.CancelFunc // websockets connection conn *websocket.Conn closed bool + // reconnection support + reconnectCtx context.Context + reconnectCancel context.CancelFunc + reconnecting bool + // subscriptions subs map[int]*Subcription diff --git a/client_configuration.go b/client_configuration.go index 5b813e5..ecb3ef4 100644 --- a/client_configuration.go +++ b/client_configuration.go @@ -4,14 +4,59 @@ package omlox import ( + "context" "fmt" + "math" "net/http" "time" "github.com/hashicorp/go-cleanhttp" "golang.org/x/time/rate" + "nhooyr.io/websocket" ) +// WSCheckRetry specifies a policy for handling WebSocket connection retries. +// It is called following each connection attempt with the response (if any) and error values. +// If CheckRetry returns false, the Client stops retrying and returns the error to the caller. +// If CheckRetry returns an error, that error value is returned in lieu of the error from the connection attempt. +type WSCheckRetry func(ctx context.Context, attemptNum int, err error) (bool, error) + +// WSBackoff specifies a policy for how long to wait between connection retry attempts. +// It is called after a failing connection attempt to determine the amount of time +// that should pass before trying again. +type WSBackoff func(min, max time.Duration, attemptNum int) time.Duration + +// DefaultWSRetryPolicy provides a default callback for WebSocket connection retries, +// which will retry on connection errors and WebSocket close errors (except normal closure). +func DefaultWSRetryPolicy(ctx context.Context, attemptNum int, err error) (bool, error) { + // Do not retry on context.Canceled or context.DeadlineExceeded + if ctx.Err() != nil { + return false, ctx.Err() + } + + if err == nil { + return false, nil + } + + if websocket.CloseStatus(err) == websocket.StatusNormalClosure || + websocket.CloseStatus(err) == websocket.StatusGoingAway { + return false, err + } + + return true, nil +} + +// DefaultWSBackoff provides exponential backoff based on the attempt number and limited +// by the provided minimum and maximum durations. +func DefaultWSBackoff(min, max time.Duration, attemptNum int) time.Duration { + mult := math.Pow(2, float64(attemptNum)) * float64(min) + sleep := time.Duration(mult) + if sleep > max { + sleep = max + } + return sleep +} + // GetDefaultOptions returns default configuration options for the client. func DefaultConfiguration() ClientConfiguration { // Use cleanhttp, which has the same default values as net/http client, but @@ -21,6 +66,12 @@ func DefaultConfiguration() ClientConfiguration { return ClientConfiguration{ HTTPClient: defaultClient, RequestTimeout: 60 * time.Second, + + WSMaxRetries: 2, + WSMinRetryWait: time.Second, + WSMaxRetryWait: 30 * time.Second, + WSBackoff: DefaultWSBackoff, + WSCheckRetry: DefaultWSRetryPolicy, } } @@ -45,6 +96,39 @@ type ClientConfiguration struct { // UserAgent sets a name for the http client User-Agent header. UserAgent string + + // WSMaxRetries sets the maximum number of reconnection attempts for WebSocket connections. + // Set to -1 for unlimited retries, 0 to disable retries. + // + // Default: 2 + WSMaxRetries int + + // WSMinRetryWait sets the minimum time to wait before retrying a WebSocket connection. + // + // Default: 1s + WSMinRetryWait time.Duration + + // WSMaxRetryWait sets the maximum time to wait before retrying a WebSocket connection. + // + // Default: 30s + WSMaxRetryWait time.Duration + + // WSBackoff specifies the policy for how long to wait between WebSocket reconnection attempts. + // + // Default: DefaultWSBackoff (exponential backoff) + WSBackoff WSBackoff + + // WSCheckRetry specifies the policy for handling WebSocket reconnection retries. + // + // Default: DefaultWSRetryPolicy + WSCheckRetry WSCheckRetry + + // WSAutoReconnect enables automatic reconnection when WebSocket connection is lost. + // When true, the client will automatically attempt to reconnect and resubscribe + // to all active subscriptions. + // + // Default: false (for backward compatibility, but recommended to enable) + WSAutoReconnect bool } // ClientOption is a configuration option to initialize a client. @@ -84,3 +168,88 @@ func WithRateLimiter(limiter *rate.Limiter) ClientOption { return nil } } + +// WithWSAutoReconnect enables or disables automatic WebSocket reconnection. +// When enabled, the client will automatically attempt to reconnect and resubscribe +// to all active subscriptions when the connection is lost. +// +// Default: false (for backward compatibility) +func WithWSAutoReconnect(enabled bool) ClientOption { + return func(c *ClientConfiguration) error { + c.WSAutoReconnect = enabled + return nil + } +} + +// WithWSMaxRetries sets the maximum number of WebSocket reconnection attempts. +// Set to -1 for unlimited retries, 0 to disable retries. +// +// Default: 2 +func WithWSMaxRetries(retries int) ClientOption { + return func(c *ClientConfiguration) error { + if retries < -1 { + return fmt.Errorf("retries must not be less than -1") + } + c.WSMaxRetries = retries + return nil + } +} + +// WithWSRetryWait sets the minimum and maximum time to wait between WebSocket reconnection attempts. +// +// Default: min=1s, max=30s +func WithWSRetryWait(min, max time.Duration) ClientOption { + return func(c *ClientConfiguration) error { + if min < 0 { + return fmt.Errorf("min retry wait must not be negative") + } + if max < min { + return fmt.Errorf("max retry wait must be greater than or equal to min retry wait") + } + c.WSMinRetryWait = min + c.WSMaxRetryWait = max + return nil + } +} + +// WithWSBackoff sets a custom backoff policy for WebSocket reconnection attempts. +// +// Default: DefaultWSBackoff (exponential backoff) +func WithWSBackoff(backoff WSBackoff) ClientOption { + return func(c *ClientConfiguration) error { + c.WSBackoff = backoff + return nil + } +} + +// WithWSCheckRetry sets a custom retry policy for WebSocket reconnection attempts. +// +// Default: DefaultWSRetryPolicy +func WithWSCheckRetry(checkRetry WSCheckRetry) ClientOption { + return func(c *ClientConfiguration) error { + c.WSCheckRetry = checkRetry + return nil + } +} + +// WithConnectionPoolSettings configures HTTP connection pool settings for better +// performance under high load. This adjusts MaxIdleConns, MaxIdleConnsPerHost, +// and MaxConnsPerHost on the transport. +func WithConnectionPoolSettings(maxIdleConns, maxIdleConnsPerHost, maxConnsPerHost int) ClientOption { + return func(c *ClientConfiguration) error { + if c.HTTPClient == nil { + c.HTTPClient = cleanhttp.DefaultPooledClient() + } + + transport, ok := c.HTTPClient.Transport.(*http.Transport) + if !ok { + return fmt.Errorf("HTTPClient transport must be *http.Transport to configure connection pool") + } + + transport.MaxIdleConns = maxIdleConns + transport.MaxIdleConnsPerHost = maxIdleConnsPerHost + transport.MaxConnsPerHost = maxConnsPerHost + + return nil + } +} diff --git a/client_websockets.go b/client_websockets.go index 7131fff..d474036 100644 --- a/client_websockets.go +++ b/client_websockets.go @@ -14,7 +14,6 @@ import ( "net/url" "time" - "golang.org/x/sync/errgroup" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" ) @@ -69,8 +68,68 @@ func Connect(ctx context.Context, addr string, options ...ClientOption) (*Client return c, nil } -// Connect dials the Omlox™ Hub websockets interface. +// Connect dials the Omlox™ Hub websockets interface with automatic retry support. func (c *Client) Connect(ctx context.Context) error { + var err error + var shouldRetry bool + var attempt int + + if c.configuration.WSAutoReconnect { + c.mu.Lock() + c.reconnectCtx, c.reconnectCancel = context.WithCancel(context.Background()) + c.mu.Unlock() + } + + for attempt = 0; ; attempt++ { + err = c.connect(ctx) + if err == nil { + return nil + } + + shouldRetry, err = c.configuration.WSCheckRetry(ctx, attempt, err) + if !shouldRetry { + break + } + + remain := c.configuration.WSMaxRetries - attempt + if c.configuration.WSMaxRetries >= 0 && remain <= 0 { + break + } + + wait := c.configuration.WSBackoff( + c.configuration.WSMinRetryWait, + c.configuration.WSMaxRetryWait, + attempt, + ) + + slog.LogAttrs( + ctx, + slog.LevelDebug, + "retrying connection", + slog.Int("attempt", attempt+1), + slog.Duration("wait", wait), + slog.Int("remaining", remain), + slog.Any("error", err), + ) + + timer := time.NewTimer(wait) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + } + } + + // all retries exhausted + if err == nil { + return fmt.Errorf("connection failed after %d attempt(s)", attempt) + } + return fmt.Errorf("connection failed after %d attempt(s): %w", attempt, err) +} + +// connect performs a single connection attempt to the Omlox™ Hub websockets interface. +func (c *Client) connect(ctx context.Context) error { if !c.isClosed() { // close the connection if it happens to be open if err := c.Close(); err != nil { @@ -85,7 +144,6 @@ func (c *Client) Connect(ctx context.Context) error { } ctx, cancel := context.WithCancel(ctx) - errg, ctx := errgroup.WithContext(ctx) conn, _, err := websocket.Dial(ctx, wsURL.String(), &websocket.DialOptions{ HTTPClient: c.client, @@ -107,21 +165,86 @@ func (c *Client) Connect(ctx context.Context) error { c.mu.Lock() c.conn = conn c.closed = false - c.errg = errg + c.lifecycleWg.Add(2) c.cancel = cancel c.mu.Unlock() - c.errg.Go(func() error { - return c.readLoop(ctx) - }) + go func() { + defer c.lifecycleWg.Done() + if err := c.readLoop(ctx, conn); err != nil { + c.handleConnectionLoss(err) + } + }() - c.errg.Go(func() error { - return c.pingLoop(ctx) - }) + go func() { + defer c.lifecycleWg.Done() + if err := c.pingLoop(ctx, conn); err != nil { + c.handleConnectionLoss(err) + } + }() return nil } +// handleConnectionLoss handles unexpected connection failures and triggers reconnection. +// This is called by readLoop or pingLoop when they detect a connection error. +func (c *Client) handleConnectionLoss(err error) { + c.mu.RLock() + reconnectCtx := c.reconnectCtx + autoReconnect := c.configuration.WSAutoReconnect + c.mu.RUnlock() + + if !autoReconnect { + slog.LogAttrs( + context.Background(), + slog.LevelWarn, + "connection lost, auto-reconnect disabled", + slog.Any("error", err), + ) + return + } + + // client is shutting down, do not attempt reconnect + if reconnectCtx.Err() != nil { + return + } + + slog.LogAttrs( + context.Background(), + slog.LevelWarn, + "connection lost, attempting to reconnect", + slog.Any("error", err), + ) + + c.mu.Lock() + c.reconnecting = true + c.mu.Unlock() + + // attempt to reconnect with backoff + if err := c.reconnect(reconnectCtx); err != nil { + slog.LogAttrs( + context.Background(), + slog.LevelError, + "reconnection failed", + slog.Any("error", err), + ) + c.mu.Lock() + c.reconnecting = false + c.mu.Unlock() + return + } + + c.mu.Lock() + c.reconnecting = false + c.mu.Unlock() + + slog.LogAttrs( + context.Background(), + slog.LevelInfo, + "successfully reconnected", + ) +} + // Publish a message to the Omlox Hub. func (c *Client) Publish(ctx context.Context, topic Topic, payload ...json.RawMessage) error { if topic == "" { @@ -141,7 +264,11 @@ func (c *Client) publish(ctx context.Context, wrObj *WrapperObject) (err error) // TODO @dvcorreia: maybe this log should be a metric instead. defer slog.LogAttrs(context.Background(), slog.LevelDebug, "published", slog.Any("err", err), slog.Any("event", wrObj)) - if c.isClosed() { + c.mu.RLock() + closed := c.closed + c.mu.RUnlock() + + if closed { return net.ErrClosed } @@ -158,7 +285,25 @@ func (c *Client) Subscribe(ctx context.Context, topic Topic, params ...Parameter } } - return c.subscribe(ctx, topic, parameters) + // perform the subscription handshake + sid, err := c.subscribe(ctx, topic, parameters) + if err != nil { + return nil, err + } + + sub := &Subcription{ + sid: sid, // BUG: deephub doesn't return the sid in subsequent messages (NEEDS FIX!) + topic: topic, + params: parameters, + mch: make(chan *WrapperObject, receiveChanSize), + } + + // promote a pending subcription + c.mu.Lock() + c.subs[sub.sid] = sub + c.mu.Unlock() + + return sub, nil } // Sends a subscription message and handles the confirmation from the server. @@ -167,17 +312,16 @@ func (c *Client) Subscribe(ctx context.Context, topic Topic, params ...Parameter // There can only be one pending subscription at each time. // Subsequent subscriptions will wait while the pending one is waiting for an ID from the server. // Since each subscription on a topic can have a distinct parameters, we must synchronisly wait to match each one to its ID. -func (c *Client) subscribe(ctx context.Context, topic Topic, params Parameters) (*Subcription, error) { +func (c *Client) subscribe(ctx context.Context, topic Topic, params Parameters) (int, error) { // channel to await subscription confirmation await := make(chan struct { sid int err error }) - defer close(await) select { case <-ctx.Done(): - return nil, ctx.Err() + return 0, ctx.Err() // lock for pending subscription confirmation. // the pending will be freed by the subribed message handler. case c.pending <- await: @@ -191,8 +335,11 @@ func (c *Client) subscribe(ctx context.Context, topic Topic, params Parameters) } if err := c.publish(ctx, wrObj); err != nil { - <-c.pending // clear pending subscription - return nil, err + select { + case <-c.pending: + default: + } + return 0, err } // wait for subcription ID @@ -202,32 +349,19 @@ func (c *Client) subscribe(ctx context.Context, topic Topic, params Parameters) } select { case <-ctx.Done(): - return nil, ctx.Err() + return 0, ctx.Err() case r = <-await: } if r.err != nil { - return nil, r.err - } - - sub := &Subcription{ - // sid: r.sid, - sid: 0, // BUG: deephub doesn't return the sid in subsequent messages (NEEDS FIX!) - topic: topic, - params: params, - mch: make(chan *WrapperObject, 1), + return 0, r.err } - // promote a pending subcription - c.mu.Lock() - c.subs[sub.sid] = sub - c.mu.Unlock() - - return sub, nil + return 0, nil // BUG: deephub doesn't return the sid in subsequent messages (NEEDS FIX!) } // ping pong loop that manages the websocket connection health. -func (c *Client) pingLoop(ctx context.Context) error { +func (c *Client) pingLoop(ctx context.Context, conn *websocket.Conn) error { t := time.NewTicker(pingPeriod) defer t.Stop() @@ -242,7 +376,7 @@ func (c *Client) pingLoop(ctx context.Context) error { defer cancel() begin := time.Now() - err := c.conn.Ping(ctx) + err := conn.Ping(ctx) if err != nil { // context was exceded and the client should close @@ -264,18 +398,19 @@ func (c *Client) pingLoop(ctx context.Context) error { } // readLoop that will handle incomming data. -func (c *Client) readLoop(ctx context.Context) error { - defer c.clearSubs() - - // set the client to closed state +func (c *Client) readLoop(ctx context.Context, conn *websocket.Conn) error { + // set the client to closed state when this goroutine exits defer func() { c.mu.Lock() defer c.mu.Unlock() - c.closed = true + // marked closed if it is the active connection + if c.conn == conn { + c.closed = true + } }() for { - msgType, r, err := c.conn.Reader(ctx) + msgType, r, err := conn.Reader(ctx) if err != nil { if errors.Is(err, context.Canceled) { @@ -322,6 +457,121 @@ func (c *Client) readLoop(ctx context.Context) error { } } +// reconnect attempts to re-establish the WebSocket connection with retry logic. +func (c *Client) reconnect(ctx context.Context) error { + var err error + var shouldRetry bool + var attempt int + + for attempt = 0; ; attempt++ { + err = c.connect(ctx) + if err == nil { + // connection successful, restore subscriptions + return c.restoreSubscriptions(ctx) + } + + shouldRetry, err = c.configuration.WSCheckRetry(ctx, attempt, err) + if !shouldRetry { + break + } + + remain := c.configuration.WSMaxRetries - attempt + if c.configuration.WSMaxRetries >= 0 && remain <= 0 { + break + } + + wait := c.configuration.WSBackoff( + c.configuration.WSMinRetryWait, + c.configuration.WSMaxRetryWait, + attempt, + ) + + slog.LogAttrs( + ctx, + slog.LevelDebug, + "retrying reconnection", + slog.Int("attempt", attempt+1), + slog.Duration("wait", wait), + slog.Int("remaining", remain), + slog.Any("error", err), + ) + + timer := time.NewTimer(wait) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + } + } + + // all retries exhausted + if err == nil { + return fmt.Errorf("reconnection failed after %d attempt(s)", attempt) + } + return fmt.Errorf("reconnection failed after %d attempt(s): %w", attempt, err) +} + +// restoreSubscriptions re-establishes all active subscriptions after reconnection. +func (c *Client) restoreSubscriptions(ctx context.Context) error { + c.mu.RLock() + // snapshot the subscriptions to avoid holding the lock during network calls + subsToRestore := make([]*Subcription, 0, len(c.subs)) + for _, sub := range c.subs { + subsToRestore = append(subsToRestore, sub) + } + c.mu.RUnlock() + + slog.LogAttrs( + ctx, + slog.LevelInfo, + "restoring subscriptions", + slog.Int("count", len(subsToRestore)), + ) + + // clear old subscriptions map since ids will change. + // subscriptions will be repopulated in resubscription + c.mu.Lock() + c.subs = make(map[int]*Subcription) + c.mu.Unlock() + + for _, sub := range subsToRestore { + newSid, err := c.subscribe(ctx, sub.topic, sub.params) + if err != nil { + slog.LogAttrs( + ctx, + slog.LevelError, + "failed to restore subscription", + slog.Int("old sid", sub.sid), + slog.String("topic", string(sub.topic)), + slog.Any("error", err), + ) + continue + } + + // update the existing subscription object with the new sid + // the user keeps reading from the same channel + oldSid := sub.sid + sub.sid = newSid + + // re-register in the subscriptions with the new sid + c.mu.Lock() + c.subs[newSid] = sub + c.mu.Unlock() + + slog.LogAttrs( + ctx, + slog.LevelDebug, + "subscription restored", + slog.Int("old sid", oldSid), + slog.Int("new sid", newSid), + slog.String("topic", string(sub.topic)), + ) + } + + return nil +} + // handleMessage received from the Omlox Hub server. func (c *Client) handleMessage(ctx context.Context, msg *wrapperObject) { switch msg.Event { @@ -329,13 +579,17 @@ func (c *Client) handleMessage(ctx context.Context, msg *wrapperObject) { c.handleError(ctx, msg) case EventSubscribed: // pop pending subscription and assign subscription ID - pendingc := <-c.pending - chsend(ctx, pendingc, struct { - sid int - err error - }{ - sid: msg.SubscriptionID, - }) + select { + case pendingc := <-c.pending: + chsend(ctx, pendingc, struct { + sid int + err error + }{ + sid: msg.SubscriptionID, + }) + default: + slog.Warn("received subscription confirmation but no pending subscription found") + } return case EventUnsubscribed: // TODO @dvcorreia: close subscription @@ -416,6 +670,13 @@ func (c *Client) clearSubs() { // Close releases any resources held by the client, // such as connections, memory and goroutines. func (c *Client) Close() error { + // stop automatic reconnection + c.mu.Lock() + if c.reconnectCancel != nil { + c.reconnectCancel() + } + c.mu.Unlock() + if !c.isClosed() { err := c.conn.Close(websocket.StatusNormalClosure, "") if err != nil { @@ -426,7 +687,18 @@ func (c *Client) Close() error { // close the client context c.cancel() - return c.errg.Wait() + // cancel context to stop goroutines + if c.cancel != nil { + c.cancel() + } + + // wait for all goroutines to finish + c.lifecycleWg.Wait() + + // clear subscriptions after all goroutines have stopped + c.clearSubs() + + return nil } // isClosed reports if the client closed. @@ -436,6 +708,21 @@ func (c *Client) isClosed() bool { return c.closed } +// IsConnected reports if the client is currently connected to the WebSocket. +// Returns true if connected, false if closed or reconnecting. +func (c *Client) IsConnected() bool { + c.mu.RLock() + defer c.mu.RUnlock() + return !c.closed && !c.reconnecting +} + +// IsReconnecting reports if the client is currently attempting to reconnect. +func (c *Client) IsReconnecting() bool { + c.mu.RLock() + defer c.mu.RUnlock() + return c.reconnecting +} + func upgradeToWebsocketScheme(u *url.URL) error { switch u.Scheme { case httpScheme: diff --git a/go.mod b/go.mod index a47b6c6..f2b754f 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,4 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -require ( - github.com/josharian/intern v1.0.0 // indirect - golang.org/x/sync v0.2.0 -) +require github.com/josharian/intern v1.0.0 // indirect diff --git a/go.sum b/go.sum index 6b47ae6..7edb753 100644 --- a/go.sum +++ b/go.sum @@ -38,8 +38,6 @@ github.com/tidwall/rtree v1.3.1 h1:xu3vJPKJrmGce7YJcFUCoqLrp9DTUEJBnVgdPSXHgHs= github.com/tidwall/rtree v1.3.1/go.mod h1:S+JSsqPTI8LfWA4xHBo5eXzie8WJLVFeppAutSegl6M= github.com/tidwall/sjson v1.2.4 h1:cuiLzLnaMeBhRmEv00Lpk3tkYrcxpmbU81tAY4Dw0tc= github.com/tidwall/sjson v1.2.4/go.mod h1:098SZ494YoMWPmMO6ct4dcFnqxwj9r/gF0Etp19pSNM= -golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= -golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/time v0.4.0 h1:Z81tqI5ddIoXDPvVQ7/7CC9TnLM7ubaFG2qXYd5BbYY= golang.org/x/time v0.4.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/omlox_test.go b/omlox_test.go index 9be00a2..fdd5962 100644 --- a/omlox_test.go +++ b/omlox_test.go @@ -6,6 +6,7 @@ package omlox_test import ( "context" "log" + "time" "github.com/wavecomtech/omlox-client-go" ) @@ -32,7 +33,13 @@ func ExampleConnect() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client, err := omlox.Connect(ctx, "localhost:7081/v2") + client, err := omlox.Connect( + ctx, + "localhost:7081/v2", + omlox.WithWSAutoReconnect(true), + omlox.WithWSMaxRetries(-1), // Unlimited retries + omlox.WithWSRetryWait(time.Second, 30*time.Second), // Backoff timing + ) if err != nil { log.Fatal(err) }