Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,51 @@ for location := range omlox.ReceiveAs[omlox.Location](sub) {
}
```

#### Auto-Reconnection

The client supports automatic WebSocket reconnection with configurable retry policies. This is essential for production deployments where connections may be disrupted due to network issues, server restarts, or high load.

```go
// Create client with auto-reconnection enabled, unlimited retries, and backoff timing
// Dials a Omlox Hub websocket interface, subscribes to
// the location_updates topic and listens to new
// location messages.

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client, err := omlox.Connect(
ctx,
"localhost:7081/v2",
omlox.WithWSAutoReconnect(true),
omlox.WithWSMaxRetries(-1),
omlox.WithWSRetryWait(time.Second, 30*time.Second),
)
if err != nil {
log.Fatal(err)
}
defer client.Close()

sub, err := client.Subscribe(ctx, omlox.TopicLocationUpdates)
if err != nil {
log.Fatal(err)
}

for location := range omlox.ReceiveAs[omlox.Location](sub) {
_ = location // handle location update
}
```

For high-load scenarios, also configure the HTTP connection pool:

```go
client, err := omlox.New(
"https://localhost:7081/v2",
omlox.WithWSAutoReconnect(true),
omlox.WithConnectionPoolSettings(200, 100, 0), // Prevent pool exhaustion
)
```

### Error Handling

Errors are returned when Omlox Hub responds with an HTTP status code outside of the 200 to 399 range.
Expand Down Expand Up @@ -310,4 +355,4 @@ If you have any trouble getting started, reach out to us by email (see the [MAIN

> [!NOTE]
> The code provided by this library is not certified by Omlox or the Profibus & Profinet International.
> Solutions using this library should go through the [certification process](https://omlox.com/certification) defined by the Omlox™ consortium to be an _"omlox certified solution"_.
> Solutions using this library should go through the [certification process](https://omlox.com/certification) defined by the Omlox™ consortium to be an _"omlox certified solution"_.
10 changes: 7 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"net/url"
"sync"

"golang.org/x/sync/errgroup"
"nhooyr.io/websocket"
)

Expand All @@ -33,13 +32,18 @@ type Client struct {

// websockets client fields

errg *errgroup.Group
cancel context.CancelFunc
lifecycleWg sync.WaitGroup
cancel context.CancelFunc

// websockets connection
conn *websocket.Conn
closed bool

// reconnection support
reconnectCtx context.Context
reconnectCancel context.CancelFunc
reconnecting bool

// subscriptions
subs map[int]*Subcription

Expand Down
169 changes: 169 additions & 0 deletions client_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,59 @@
package omlox

import (
"context"
"fmt"
"math"
"net/http"
"time"

"github.com/hashicorp/go-cleanhttp"
"golang.org/x/time/rate"
"nhooyr.io/websocket"
)

// WSCheckRetry specifies a policy for handling WebSocket connection retries.
// It is called following each connection attempt with the response (if any) and error values.
// If CheckRetry returns false, the Client stops retrying and returns the error to the caller.
// If CheckRetry returns an error, that error value is returned in lieu of the error from the connection attempt.
type WSCheckRetry func(ctx context.Context, attemptNum int, err error) (bool, error)

// WSBackoff specifies a policy for how long to wait between connection retry attempts.
// It is called after a failing connection attempt to determine the amount of time
// that should pass before trying again.
type WSBackoff func(min, max time.Duration, attemptNum int) time.Duration

// DefaultWSRetryPolicy provides a default callback for WebSocket connection retries,
// which will retry on connection errors and WebSocket close errors (except normal closure).
func DefaultWSRetryPolicy(ctx context.Context, attemptNum int, err error) (bool, error) {
// Do not retry on context.Canceled or context.DeadlineExceeded
if ctx.Err() != nil {
return false, ctx.Err()
}

if err == nil {
return false, nil
}

if websocket.CloseStatus(err) == websocket.StatusNormalClosure ||
websocket.CloseStatus(err) == websocket.StatusGoingAway {
return false, err
}

return true, nil
}

// DefaultWSBackoff provides exponential backoff based on the attempt number and limited
// by the provided minimum and maximum durations.
func DefaultWSBackoff(min, max time.Duration, attemptNum int) time.Duration {
mult := math.Pow(2, float64(attemptNum)) * float64(min)
sleep := time.Duration(mult)
if sleep > max {
sleep = max
}
return sleep
}

// GetDefaultOptions returns default configuration options for the client.
func DefaultConfiguration() ClientConfiguration {
// Use cleanhttp, which has the same default values as net/http client, but
Expand All @@ -21,6 +66,12 @@ func DefaultConfiguration() ClientConfiguration {
return ClientConfiguration{
HTTPClient: defaultClient,
RequestTimeout: 60 * time.Second,

WSMaxRetries: 2,
WSMinRetryWait: time.Second,
WSMaxRetryWait: 30 * time.Second,
WSBackoff: DefaultWSBackoff,
WSCheckRetry: DefaultWSRetryPolicy,
}
}

Expand All @@ -45,6 +96,39 @@ type ClientConfiguration struct {

// UserAgent sets a name for the http client User-Agent header.
UserAgent string

// WSMaxRetries sets the maximum number of reconnection attempts for WebSocket connections.
// Set to -1 for unlimited retries, 0 to disable retries.
//
// Default: 2
WSMaxRetries int

// WSMinRetryWait sets the minimum time to wait before retrying a WebSocket connection.
//
// Default: 1s
WSMinRetryWait time.Duration

// WSMaxRetryWait sets the maximum time to wait before retrying a WebSocket connection.
//
// Default: 30s
WSMaxRetryWait time.Duration

// WSBackoff specifies the policy for how long to wait between WebSocket reconnection attempts.
//
// Default: DefaultWSBackoff (exponential backoff)
WSBackoff WSBackoff

// WSCheckRetry specifies the policy for handling WebSocket reconnection retries.
//
// Default: DefaultWSRetryPolicy
WSCheckRetry WSCheckRetry

// WSAutoReconnect enables automatic reconnection when WebSocket connection is lost.
// When true, the client will automatically attempt to reconnect and resubscribe
// to all active subscriptions.
//
// Default: false (for backward compatibility, but recommended to enable)
WSAutoReconnect bool
}

// ClientOption is a configuration option to initialize a client.
Expand Down Expand Up @@ -84,3 +168,88 @@ func WithRateLimiter(limiter *rate.Limiter) ClientOption {
return nil
}
}

// WithWSAutoReconnect enables or disables automatic WebSocket reconnection.
// When enabled, the client will automatically attempt to reconnect and resubscribe
// to all active subscriptions when the connection is lost.
//
// Default: false (for backward compatibility)
func WithWSAutoReconnect(enabled bool) ClientOption {
return func(c *ClientConfiguration) error {
c.WSAutoReconnect = enabled
return nil
}
}

// WithWSMaxRetries sets the maximum number of WebSocket reconnection attempts.
// Set to -1 for unlimited retries, 0 to disable retries.
//
// Default: 2
func WithWSMaxRetries(retries int) ClientOption {
return func(c *ClientConfiguration) error {
if retries < -1 {
return fmt.Errorf("retries must not be less than -1")
}
c.WSMaxRetries = retries
return nil
}
}

// WithWSRetryWait sets the minimum and maximum time to wait between WebSocket reconnection attempts.
//
// Default: min=1s, max=30s
func WithWSRetryWait(min, max time.Duration) ClientOption {
return func(c *ClientConfiguration) error {
if min < 0 {
return fmt.Errorf("min retry wait must not be negative")
}
if max < min {
return fmt.Errorf("max retry wait must be greater than or equal to min retry wait")
}
c.WSMinRetryWait = min
c.WSMaxRetryWait = max
return nil
}
}

// WithWSBackoff sets a custom backoff policy for WebSocket reconnection attempts.
//
// Default: DefaultWSBackoff (exponential backoff)
func WithWSBackoff(backoff WSBackoff) ClientOption {
return func(c *ClientConfiguration) error {
c.WSBackoff = backoff
return nil
}
}

// WithWSCheckRetry sets a custom retry policy for WebSocket reconnection attempts.
//
// Default: DefaultWSRetryPolicy
func WithWSCheckRetry(checkRetry WSCheckRetry) ClientOption {
return func(c *ClientConfiguration) error {
c.WSCheckRetry = checkRetry
return nil
}
}

// WithConnectionPoolSettings configures HTTP connection pool settings for better
// performance under high load. This adjusts MaxIdleConns, MaxIdleConnsPerHost,
// and MaxConnsPerHost on the transport.
func WithConnectionPoolSettings(maxIdleConns, maxIdleConnsPerHost, maxConnsPerHost int) ClientOption {
return func(c *ClientConfiguration) error {
if c.HTTPClient == nil {
c.HTTPClient = cleanhttp.DefaultPooledClient()
}

transport, ok := c.HTTPClient.Transport.(*http.Transport)
if !ok {
return fmt.Errorf("HTTPClient transport must be *http.Transport to configure connection pool")
}

transport.MaxIdleConns = maxIdleConns
transport.MaxIdleConnsPerHost = maxIdleConnsPerHost
transport.MaxConnsPerHost = maxConnsPerHost

return nil
}
}
Loading
Loading