Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Octo-proxy or `octo` is simple TCP & TLS Proxy with mutual authentication and tr
- Support for multiple targets, accessed in random order (load balancer)
- Reload configuration or certificate without dropping connection
- Expose metrics that can be consumed by prometheus
- Monitor CNAME records to force reconnection achieving DNS failover with long lived connections

### Usage
#### Run octo with ad-hoc command
Expand Down
5 changes: 4 additions & 1 deletion cmd/octo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ Flags:
Specify target backend which traffic will be forwarded
-metrics
Specify address and port to run the metrics server
-monitor
Specify a CNAME record to monitor and reset connections on changes
-debug
Enable debug log messages
-version
Expand Down Expand Up @@ -77,6 +79,7 @@ func runMain() error {
listener = flag.String("listener", "127.0.0.1:5000", "Specify listener for running octo-proxy")
target = flag.String("target", "", "Specify comma-separated list of targets for running octo-proxy")
metrics = flag.String("metrics", "0.0.0.0:9123", "Address and port to run the metrics server on")
monitor = flag.String("monitor", "", "Specify a CNAME record to monitor and reset connections on changes")
debug = flag.Bool("debug", false, "Enable debug messages")
)

Expand All @@ -93,7 +96,7 @@ func runMain() error {
if *target != "" {
targets := strings.Split(*target, ",")

c, err := config.GenerateConfig(*listener, targets, *metrics)
c, err := config.GenerateConfig(*listener, targets, *metrics, *monitor)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
| listener | [`Hostconfig`](#hostconfig) | Set of listener related configuration. All of the incoming request to octo-proxy will be handled by this listener. | yes |
| targets | [`Hostconfig[]`](#hostconfig) | Set of target related configurations. These targets are backends which octo-proxy will forward all incoming traffic accepted by the listener. | yes |
| mirror | [`Hostconfig`](#hostconfig) | Set of mirror related configuration. If this configuration is enabled, all incoming requests will also be forwarded to this mirror. Unlike the `target`, in a `mirror` setup, we implement 'fire and forget,' where every request is only forwarded, and the response is ignored. | no |
| monitor | `<string>` | A CNAME to monitor for changes. When the CNAME changes the connections are closed forcing reconnection. When unset the monitoring feature is disabled. | no |

## Hostconfig
| Field | Type | Description | Required |
Expand Down
4 changes: 3 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ServerConfig struct {
Listener HostConfig `yaml:"listener"`
Targets []HostConfig `yaml:"targets"`
Mirror HostConfig `yaml:"mirror"`
Monitor string `yaml:"monitor"`
}

type HostConfig struct {
Expand Down Expand Up @@ -115,7 +116,7 @@ func readConfig(r io.Reader) (*Config, error) {
return config, nil
}

func GenerateConfig(listener string, targets []string, metrics string) (*Config, error) {
func GenerateConfig(listener string, targets []string, metrics string, monitor string) (*Config, error) {
l := strings.Split(listener, ":")

if len(l) != 2 {
Expand All @@ -131,6 +132,7 @@ func GenerateConfig(listener string, targets []string, metrics string) (*Config,
Port: l[1],
},
Targets: []HostConfig{},
Monitor: monitor,
},
},
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func TestGenerateConfig(t *testing.T) {
Listener string
Targets []string
Metrics string
Monitor string
expectedConfig *Config
expectedError string
}{
Expand All @@ -122,6 +123,7 @@ func TestGenerateConfig(t *testing.T) {
Listener: "127.0.0.1:8080",
Targets: []string{"127.0.0.1:80"},
Metrics: "127.0.0.1:9123",
Monitor: "octo.example.com",
expectedConfig: &Config{
ServerConfigs: []ServerConfig{
{
Expand All @@ -142,6 +144,7 @@ func TestGenerateConfig(t *testing.T) {
},
},
},
Monitor: "octo.example.com",
},
},
MetricsConfig: HostConfig{
Expand Down Expand Up @@ -212,7 +215,7 @@ func TestGenerateConfig(t *testing.T) {

for _, tt := range tests {
t.Run(tt.Name, func(t *testing.T) {
c, err := GenerateConfig(tt.Listener, tt.Targets, tt.Metrics)
c, err := GenerateConfig(tt.Listener, tt.Targets, tt.Metrics, tt.Monitor)
if err != nil {
if !strings.Contains(err.Error(), tt.expectedError) {
t.Fatalf("got %v, want %s", err, tt.expectedError)
Expand Down
64 changes: 64 additions & 0 deletions pkg/proxy/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package proxy

import (
"context"
"net"
"time"

"github.com/rs/zerolog/log"
)

type Resolver interface {
LookupCNAME(targetCname string) (string, error)
}

type goNetResolver struct{}

func (goNetResolver) LookupCNAME(targetCname string) (string, error) {
return net.LookupCNAME(targetCname)
}

func GoNetResolver() goNetResolver {
return goNetResolver{}
}

type Monitor struct {
targetCname string
lastData string
resolver Resolver
interval time.Duration
}

func NewMonitor(resolver Resolver, targetCname string, interval time.Duration) *Monitor {
return &Monitor{
targetCname: targetCname,
resolver: resolver,
interval: interval,
}
}

func (m *Monitor) Run(ctx context.Context, callback func()) {
for {
select {
case <-ctx.Done():
return
default:
}

cname, err := m.resolver.LookupCNAME(m.targetCname)
if err != nil {
log.Error().Err(err).Msg("failed to lookup CNAME")
cname = m.lastData
}
if m.lastData != "" && m.lastData != cname {
log.Info().Msg("CNAME changed, running callback")
callback()
}
m.lastData = cname

// Go net doesn't provide us with the TTL, therefor we poll frequently and
// expect the appropriate levels of caching to be in place, in order for this
// not to cause undue load on the DNS servers.
time.Sleep(m.interval)
}
}
94 changes: 94 additions & 0 deletions pkg/proxy/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package proxy

import (
"context"
"errors"
"testing"
"time"
)

type constantResolver struct {
value string
}

func (c constantResolver) LookupCNAME(targetCname string) (string, error) {
return c.value, nil
}

func TestMonitorConstantResolver(t *testing.T) {
monitorInterval := 100 * time.Millisecond
m := NewMonitor(constantResolver{"test"}, "test", monitorInterval)
ctx, cancel := context.WithCancel(context.Background())

var called bool
go m.Run(ctx, func() {
called = true
})

time.Sleep(2 * monitorInterval)
cancel()

if called {
t.Error("expected callback not to be called")
}
}

type changingResolver struct {
values []string
errs []error
index int
}

func (c *changingResolver) LookupCNAME(targetCname string) (string, error) {
if c.index >= len(c.values) {
c.index = 0
}
value := c.values[c.index]
err := c.errs[c.index]
c.index++
return value, err
}

func TestMonitorChangingResolver(t *testing.T) {
monitorInterval := 100 * time.Millisecond
m := NewMonitor(&changingResolver{
values: []string{"test1", "test2"},
errs: []error{nil, nil},
index: 0,
}, "test", monitorInterval)
ctx, cancel := context.WithCancel(context.Background())

var called bool
go m.Run(ctx, func() {
called = true
})

time.Sleep(2 * monitorInterval)
cancel()

if !called {
t.Error("expected callback to be called")
}
}

func TestMonitorContinuesOnError(t *testing.T) {
monitorInterval := 100 * time.Millisecond
m := NewMonitor(&changingResolver{
values: []string{"test1", "", "test2"},
errs: []error{nil, errors.New("test"), nil},
index: 0,
}, "test", monitorInterval)
ctx, cancel := context.WithCancel(context.Background())

var called bool
go m.Run(ctx, func() {
called = true
})

time.Sleep(2 * monitorInterval)
cancel()

if !called {
t.Error("expected callback to be called")
}
}
16 changes: 16 additions & 0 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,26 @@ func (p *Proxy) handleConn(ctx context.Context, c config.ServerConfig) {
continue
}

connectionCtx, connectionCancel := context.WithCancel(ctx)
if c.Monitor != "" {
p.Wg.Add(1)
go func() {
monitor := NewMonitor(goNetResolver{}, c.Monitor, 5*time.Second)
monitor.Run(connectionCtx, func() {
log.Info().Msg("CNAME changed, closing connection")
if err := srcConn.Close(); err != nil {
log.Error().Err(err).Msg("error closing connection")
}
})
p.Wg.Done()
}()
}

p.Wg.Add(1)
go func() {
p.forwardConn(ctx, c, srcConn)
p.Wg.Done()
connectionCancel()
downstreamConnActive.With(prometheus.Labels{"name": p.Name}).Dec()
}()
}
Expand Down
Loading