Skip to content
This repository was archived by the owner on Sep 26, 2018. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions provider/sidecar.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package provider

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -186,36 +187,42 @@ func (provider *Sidecar) sidecarWatcher(stop chan bool, pool *safe.Pool) {
}

func (provider *Sidecar) recycleConn(stop chan bool, pool *safe.Pool) {
var err error
var resp *http.Response
var req *http.Request
for {
select {
case <-stop:
return
default:
//use refresh interval to occasionally reconnect to Sidecar in case the stream connection is lost
req, err = http.NewRequest("GET", provider.Endpoint+"/watch?by_service=false", nil)
if err != nil {
log.Errorf("Error creating http request to Sidecar: %s, Error: %s", provider.Endpoint, err)
continue
}
resp, err = watcherHTTPClient.Do(req)
if err != nil {
log.Errorf("Error connecting to Sidecar: %s, Error: %s", provider.Endpoint, err)
time.Sleep(5 * time.Second)
continue
}
// Wrap logic in an anonymous function because the defer statement has function scope
func() {
// Use refresh interval to occasionally reconnect to Sidecar in case the stream connection is lost
req, err := http.NewRequest("GET", provider.Endpoint+"/watch?by_service=false", nil)
if err != nil {
log.Errorf("Error creating watch request for Sidecar instance '%s': %s", provider.Endpoint, err)
time.Sleep(5 * time.Second)
return
}

safe.Go(func() { decodeStream(resp.Body, provider.callbackLoader) })
ctx, cancel := context.WithCancel(context.Background())
// Cancel the infinite timeout request automatically after we reset connTimer
defer cancel()

req = req.WithContext(ctx)

resp, err := watcherHTTPClient.Do(req)
if err != nil {
log.Errorf("Error connecting to Sidecar instance '%s': %s", provider.Endpoint, err)
time.Sleep(5 * time.Second)
return
}
defer resp.Body.Close()

//wait on refresh connection timer. If this expires we haven't seen an update in a
//while and should cancel the request, reset the time, and reconnect just in case
<-provider.connTimer.C
provider.connTimer.Reset(time.Duration(provider.RefreshConn))
safe.Go(func() { decodeStream(resp.Body, provider.callbackLoader) })

//TODO: Deprecated method. Refactor this to use a context.
watcherHTTPTransport.CancelRequest(req)
// Wait on refresh connection timer. If this expires we haven't seen an update in a
// while and should cancel the request, reset the time, and reconnect just in case
<-provider.connTimer.C
provider.connTimer.Reset(time.Duration(provider.RefreshConn))
}()
}
}
}
Expand Down
1 change: 0 additions & 1 deletion provider/sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,6 @@ func TestSidecar(t *testing.T) {

So(configMsg.Configuration.Backends, ShouldContainKey, "api")
So(configMsg.Configuration.Backends["api"].Servers["another-aws-host_9000"].URL, ShouldEqual, "http://169.254.1.1:9000")

})

Reset(func() {
Expand Down