44 "container/ring"
55 "fmt"
66 "log/slog"
7+ "math"
78 "net"
89 "sync"
9- "sync/atomic"
1010 "time"
1111)
1212
@@ -28,22 +28,22 @@ func newConnPool(config Config, metrics *metrics, quit chan struct{}) *connPool
2828 metrics : metrics ,
2929 quit : quit ,
3030 }
31-
31+
3232 // Initialize connection states
3333 for i := 0 ; i < config .MaxConnections ; i ++ {
3434 pool .connections [i ] = & connectionState {
3535 active : false ,
3636 index : i ,
3737 }
3838 }
39-
39+
4040 // Initialize ring for round-robin selection
4141 pool .connRing = ring .New (config .MaxConnections )
4242 for i := 0 ; i < config .MaxConnections ; i ++ {
4343 pool .connRing .Value = i
4444 pool .connRing = pool .connRing .Next ()
4545 }
46-
46+
4747 return pool
4848}
4949
@@ -52,7 +52,7 @@ func (p *connPool) start() {
5252 for i := 0 ; i < p .config .MaxConnections ; i ++ {
5353 p .createConnection (i )
5454 }
55-
55+
5656 // Start health check routine
5757 go p .healthCheckLoop ()
5858}
@@ -61,7 +61,7 @@ func (p *connPool) start() {
6161func (p * connPool ) healthCheckLoop () {
6262 ticker := time .NewTicker (p .config .HealthCheckInterval )
6363 defer ticker .Stop ()
64-
64+
6565 for {
6666 select {
6767 case <- p .quit :
@@ -76,30 +76,30 @@ func (p *connPool) healthCheckLoop() {
7676func (p * connPool ) performHealthCheck () {
7777 p .connMutex .RLock ()
7878 connsToCheck := make ([]int , 0 , len (p .connections ))
79-
79+
8080 // Build list of connections to check under read lock (minimizing lock time)
8181 for i , conn := range p .connections {
8282 if conn == nil || ! conn .active {
8383 continue
8484 }
85-
85+
8686 // Only check idle connections or connections idle for too long
8787 if ! conn .processing .Load () && time .Since (conn .lastUsed ) > p .config .HealthCheckInterval {
8888 connsToCheck = append (connsToCheck , i )
8989 }
9090 }
9191 p .connMutex .RUnlock ()
92-
92+
9393 // Check connections outside the lock
9494 for _ , idx := range connsToCheck {
9595 p .connMutex .RLock ()
9696 conn := p .connections [idx ].conn
9797 p .connMutex .RUnlock ()
98-
98+
9999 if conn == nil {
100100 continue
101101 }
102-
102+
103103 // Simple health check: set a short deadline and try to write
104104 if err := conn .SetWriteDeadline (time .Now ().Add (500 * time .Millisecond )); err != nil {
105105 slog .Warn (fmt .Sprintf ("Health check failed for connection %d: %v" , idx , err ))
@@ -120,12 +120,12 @@ func (p *connPool) getConnection() (int, error) {
120120 p .connMutex .RLock ()
121121 initial := p .connRing
122122 current := initial
123-
123+
124124 // Do one round to find an available connection
125125 for i := 0 ; i < p .config .MaxConnections ; i ++ {
126126 idx := current .Value .(int )
127127 current = current .Next ()
128-
128+
129129 if p .connections [idx ].active && ! p .connections [idx ].processing .Load () {
130130 // Move the ring forward for next selection
131131 p .connRing = current
@@ -134,13 +134,13 @@ func (p *connPool) getConnection() (int, error) {
134134 }
135135 }
136136 p .connMutex .RUnlock ()
137-
137+
138138 // If no active connection found, try to find the first inactive one to activate
139139 p .connMutex .RLock ()
140140 for i , conn := range p .connections {
141141 if conn != nil && ! conn .active {
142142 p .connMutex .RUnlock ()
143-
143+
144144 // Try to create this connection
145145 if p .createConnection (i ) {
146146 return i , nil
@@ -149,35 +149,35 @@ func (p *connPool) getConnection() (int, error) {
149149 }
150150 }
151151 p .connMutex .RUnlock ()
152-
152+
153153 // Last resort: poll for an available connection
154154 maxWaitTime := 3 * time .Second
155155 pollInterval := 50 * time .Millisecond
156156 endTime := time .Now ().Add (maxWaitTime )
157-
157+
158158 for time .Now ().Before (endTime ) {
159159 for i := 0 ; i < p .config .MaxConnections ; i ++ {
160160 p .connMutex .RLock ()
161161 connExists := p .connections [i ] != nil && p .connections [i ].active
162162 p .connMutex .RUnlock ()
163-
163+
164164 if connExists {
165165 // Try to acquire this connection atomically
166166 if p .connections [i ].processing .CompareAndSwap (false , true ) {
167167 p .connMutex .RLock ()
168168 stillActive := p .connections [i ].active
169169 p .connMutex .RUnlock ()
170-
170+
171171 if stillActive {
172172 return i , nil
173173 }
174-
174+
175175 // Release if not actually active
176176 p .connections [i ].processing .Store (false )
177177 }
178178 }
179179 }
180-
180+
181181 // Wait before polling again
182182 select {
183183 case <- p .quit :
@@ -186,7 +186,7 @@ func (p *connPool) getConnection() (int, error) {
186186 // Continue polling
187187 }
188188 }
189-
189+
190190 return - 1 , fmt .Errorf ("no target connections available after %.1f seconds" , maxWaitTime .Seconds ())
191191}
192192
@@ -195,28 +195,29 @@ func (p *connPool) createConnection(index int) bool {
195195 if index < 0 || index >= len (p .connections ) {
196196 return false
197197 }
198-
198+
199199 // Get fail count atomically
200200 p .connMutex .RLock ()
201201 failCount := p .connections [index ].failCount
202202 p .connMutex .RUnlock ()
203-
203+
204204 // Calculate backoff with exponential increase
205205 backoff := p .config .ReconnectBackoff
206206 if failCount > 0 {
207207 maxBackoff := 30 * time .Second
208- backoffSeconds := float64 (p .config .ReconnectBackoff ) * (1 << uint (failCount - 1 ))
208+ // Fix: Use math.Pow instead of bit shift for exponential calculation
209+ backoffSeconds := float64 (p .config .ReconnectBackoff ) * math .Pow (2 , float64 (failCount - 1 ))
209210 if backoffSeconds > float64 (maxBackoff ) {
210211 backoffSeconds = float64 (maxBackoff )
211212 }
212213 backoff = time .Duration (backoffSeconds )
213214 }
214-
215- slog .Info (fmt .Sprintf ("Creating target connection %d (attempt #%d, backoff: %v)" ,
215+
216+ slog .Info (fmt .Sprintf ("Creating target connection %d (attempt #%d, backoff: %v)" ,
216217 index , failCount + 1 , backoff ))
217-
218+
218219 p .metrics .reconnectionAttempts .Add (1 )
219-
220+
220221 // Wait before attempting to connect (for non-first attempts)
221222 if failCount > 0 {
222223 select {
@@ -226,13 +227,13 @@ func (p *connPool) createConnection(index int) bool {
226227 // Continue after backoff
227228 }
228229 }
229-
230+
230231 // Set deadline for connection establishment
231232 dialer := & net.Dialer {
232- Timeout : 10 * time .Second ,
233+ Timeout : 10 * time .Second ,
233234 KeepAlive : 30 * time .Second ,
234235 }
235-
236+
236237 // Try to establish connection
237238 conn , err := dialer .Dial ("tcp" , p .config .TargetServer )
238239 if err != nil {
@@ -246,10 +247,10 @@ func (p *connPool) createConnection(index int) bool {
246247 p .connections [index ].conn = nil
247248 }
248249 p .connMutex .Unlock ()
249-
250- slog .Error (fmt .Sprintf ("Failed to connect to target server %s (attempt #%d): %v" ,
250+
251+ slog .Error (fmt .Sprintf ("Failed to connect to target server %s (attempt #%d): %v" ,
251252 p .config .TargetServer , failCount + 1 , err ))
252-
253+
253254 // Schedule next attempt
254255 select {
255256 case <- p .quit :
@@ -259,26 +260,26 @@ func (p *connPool) createConnection(index int) bool {
259260 return false
260261 }
261262 }
262-
263+
263264 // Configure TCP connection for optimal performance
264265 if tcpConn , ok := conn .(* net.TCPConn ); ok {
265266 _ = tcpConn .SetKeepAlive (true )
266267 _ = tcpConn .SetKeepAlivePeriod (30 * time .Second )
267268 _ = tcpConn .SetNoDelay (true ) // Disable Nagle's algorithm for lower latency
268269 }
269-
270- slog .Info (fmt .Sprintf ("New target connection %d: %v <-> %v" ,
270+
271+ slog .Info (fmt .Sprintf ("New target connection %d: %v <-> %v" ,
271272 index , conn .LocalAddr (), conn .RemoteAddr ()))
272-
273+
273274 // Initial delay if configured
274275 if p .config .InitialDelay > 0 {
275276 time .Sleep (p .config .InitialDelay )
276277 }
277-
278+
278279 // Update connection state
279280 p .connMutex .Lock ()
280281 oldConn := p .connections [index ].conn
281-
282+
282283 // Check if we're shutting down
283284 select {
284285 case <- p .quit :
@@ -290,19 +291,19 @@ func (p *connPool) createConnection(index int) bool {
290291 if oldConn != nil {
291292 _ = oldConn .Close ()
292293 }
293-
294+
294295 // Update connection state
295296 p .connections [index ].conn = conn
296297 p .connections [index ].active = true
297298 p .connections [index ].lastUsed = time .Now ()
298299 p .connections [index ].failCount = 0
299300 p .connections [index ].processing .Store (false )
300-
301+
301302 // Update metrics
302303 if oldConn == nil {
303304 p .metrics .activeConnections .Add (1 )
304305 }
305-
306+
306307 p .connMutex .Unlock ()
307308 return true
308309 }
@@ -312,33 +313,33 @@ func (p *connPool) createConnection(index int) bool {
312313func (p * connPool ) closeConnection (index int ) {
313314 p .connMutex .Lock ()
314315 defer p .connMutex .Unlock ()
315-
316+
316317 if index < 0 || index >= len (p .connections ) || ! p .connections [index ].active {
317318 return
318319 }
319-
320+
320321 if p .connections [index ].conn != nil {
321322 _ = p .connections [index ].conn .Close ()
322323 p .connections [index ].conn = nil
323324 }
324-
325+
325326 if p .connections [index ].active {
326327 p .metrics .activeConnections .Add (- 1 )
327328 }
328-
329+
329330 p .connections [index ].active = false
330331}
331332
332333// close shuts down all connections in the pool
333334func (p * connPool ) close () {
334335 p .connMutex .Lock ()
335336 defer p .connMutex .Unlock ()
336-
337+
337338 for i , conn := range p .connections {
338339 if conn != nil && conn .conn != nil {
339340 _ = conn .conn .Close ()
340341 p .connections [i ].conn = nil
341342 p .connections [i ].active = false
342343 }
343344 }
344- }
345+ }
0 commit comments