From 11c0c2f7ddc1cc88c8a86300632d0edc444f4959 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 15 Dec 2025 15:43:29 +0000 Subject: [PATCH 1/9] handleAvailableConnection will return create multiple connections if we are below the minimum connection count --- .../ConnectionPoolModule/ConnectionPool.swift | 9 +++ .../PoolStateMachine+ConnectionGroup.swift | 1 + .../PoolStateMachine.swift | 57 ++++++++++++++++--- .../ConnectionPoolTests.swift | 2 +- .../PoolStateMachineTests.swift | 52 +++++++++++++++++ 5 files changed, 111 insertions(+), 10 deletions(-) diff --git a/Sources/ConnectionPoolModule/ConnectionPool.swift b/Sources/ConnectionPoolModule/ConnectionPool.swift index b34c1435..11001f6a 100644 --- a/Sources/ConnectionPoolModule/ConnectionPool.swift +++ b/Sources/ConnectionPoolModule/ConnectionPool.swift @@ -380,6 +380,15 @@ public final class ConnectionPool< self.cancelTimers(timers) self.eventContinuation.yield(.makeConnection(request)) + case .makeConnectionsCancelAndScheduleTimers(let requests, let cancelledTimers, let scheduledTimers): + self.cancelTimers(cancelledTimers) + for request in requests { + self.eventContinuation.yield(.makeConnection(request)) + } + for timer in scheduledTimers { + self.eventContinuation.yield(.scheduleTimer(timer)) + } + case .runKeepAlive(let connection, let cancelContinuation): cancelContinuation?.resume(returning: ()) self.eventContinuation.yield(.runKeepAlive(connection)) diff --git a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift index bcc7ca45..0ad2e105 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift @@ -588,6 +588,7 @@ extension PoolStateMachine { preconditionFailure("Failing a connection we don't have a record of.") } + self.stats.connecting -= 1 self.connections[index].destroyFailedConnection() return self.swapForDeletion(index: index) } diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index fabb672c..e589a0b1 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -32,6 +32,9 @@ struct PoolConfiguration: Sendable { @usableFromInline var idleTimeoutDuration: Duration = .seconds(30) + + @usableFromInline + var maximumConnectionRequestsAtOneTime: Int = 20 } @usableFromInline @@ -90,6 +93,7 @@ struct PoolStateMachine< case scheduleTimers(Max2Sequence) case makeConnection(ConnectionRequest, TinyFastSequence) + case makeConnectionsCancelAndScheduleTimers(TinyFastSequence, TinyFastSequence, Max2Sequence) case runKeepAlive(Connection, TimerCancellationToken?) case cancelTimers(TinyFastSequence) case closeConnection(Connection, Max2Sequence) @@ -337,7 +341,7 @@ struct PoolStateMachine< guard let (index, context) = self.connections.releaseConnection(connection.id, streams: streams) else { return .none() } - return self.handleAvailableConnection(index: index, availableContext: context) + return self.handleAvailableConnection(index: index, availableContext: context, minimumConnectionCount: self.configuration.minimumConnectionCount) } mutating func cancelRequest(id: RequestID) -> Action { @@ -368,7 +372,7 @@ struct PoolStateMachine< } let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams) - return self.handleAvailableConnection(index: index, availableContext: context) + return self.handleAvailableConnection(index: index, availableContext: context, minimumConnectionCount: self.configuration.minimumConnectionCount) } @inlinable @@ -562,7 +566,7 @@ struct PoolStateMachine< guard let (index, context) = self.connections.keepAliveSucceeded(connection.id) else { return .none() } - return self.handleAvailableConnection(index: index, availableContext: context) + return self.handleAvailableConnection(index: index, availableContext: context, minimumConnectionCount: self.configuration.minimumConnectionCount) } @inlinable @@ -667,15 +671,22 @@ struct PoolStateMachine< @inlinable /*private*/ mutating func handleAvailableConnection( index: Int, - availableContext: ConnectionGroup.AvailableConnectionContext + availableContext: ConnectionGroup.AvailableConnectionContext, + minimumConnectionCount: Int ) -> Action { // this connection was busy before let requests = self.requestQueue.pop(max: availableContext.info.availableStreams) if !requests.isEmpty { let leaseResult = self.connections.leaseConnection(at: index, streams: UInt16(requests.count)) + let connectionsRequired = minimumConnectionCount - Int(self.connections.stats.active) + let connectionAction = self.createMultipleConnectionsAction( + connectionsRequired, + cancelledTimers: .init(leaseResult.timersToCancel), + scheduledTimers: [] + ) ?? .cancelTimers(.init(leaseResult.timersToCancel)) return .init( request: .leaseConnection(requests, leaseResult.connection), - connection: .cancelTimers(.init(leaseResult.timersToCancel)) + connection: connectionAction ) } @@ -704,10 +715,13 @@ struct PoolStateMachine< } let timers = self.connections.parkConnection(at: index, hasBecomeIdle: newIdle).map(self.mapTimers) - return .init( - request: .none, - connection: .scheduleTimers(timers) - ) + let connectionsRequired = minimumConnectionCount - Int(self.connections.stats.active) + let connectionAction = self.createMultipleConnectionsAction( + connectionsRequired, + cancelledTimers: [], + scheduledTimers: timers + ) ?? .scheduleTimers(timers) + return .init(request: .none, connection: connectionAction) } case .overflow: @@ -723,6 +737,28 @@ struct PoolStateMachine< } + @inlinable + /* private */ mutating func createMultipleConnectionsAction( + _ connectionCount: Int, + cancelledTimers: TinyFastSequence, + scheduledTimers: Max2Sequence + ) -> ConnectionAction? { + if connectionCount > 0, + self.connections.stats.connecting < self.configuration.maximumConnectionRequestsAtOneTime { + let connectionCount = min( + connectionCount, + self.configuration.maximumConnectionRequestsAtOneTime - Int(self.connections.stats.connecting) + ) + var connectionRequests = TinyFastSequence() + connectionRequests.reserveCapacity(connectionCount) + for _ in 0.. Timer { switch connectionTimer.usecase { @@ -808,6 +844,9 @@ extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationTo return lhs == rhs case (.makeConnection(let lhsRequest, let lhsToken), .makeConnection(let rhsRequest, let rhsToken)): return lhsRequest == rhsRequest && lhsToken == rhsToken + case (.makeConnectionsCancelAndScheduleTimers(let lhsRequests, let lhsTokens, let lhsTimers), + .makeConnectionsCancelAndScheduleTimers(let rhsRequests, let rhsTokens, let rhsTimers)): + return lhsRequests == rhsRequests && lhsTokens == rhsTokens && lhsTimers == rhsTimers case (.runKeepAlive(let lhsConn, let lhsToken), .runKeepAlive(let rhsConn, let rhsToken)): return lhsConn === rhsConn && lhsToken == rhsToken case (.closeConnection(let lhsConn, let lhsTimers), .closeConnection(let rhsConn, let rhsTimers)): diff --git a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift index 8782c53c..50c82893 100644 --- a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift +++ b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift @@ -1044,7 +1044,7 @@ import Testing } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) - @Test func testConnectionTimeout() async throws { + @Test func testCircuitBreaker() async throws { struct ConnectionFailedError: Error {} let clock = MockClock() let factory = MockConnectionFactory() diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift index c12769ef..ff53d1d5 100644 --- a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift @@ -606,6 +606,58 @@ typealias TestPoolStateMachine = PoolStateMachine< } + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testRefillConnectionPoolAfterConnectionFail() { + + struct ConnectionFailed: Error, Equatable {} + let clock = MockClock() + var configuration = PoolConfiguration() + configuration.minimumConnectionCount = 5 + configuration.maximumConnectionSoftLimit = 10 + configuration.maximumConnectionHardLimit = 10 + configuration.keepAliveDuration = .seconds(2) + configuration.idleTimeoutDuration = .seconds(4) + configuration.maximumConnectionRequestsAtOneTime = 3 + + var stateMachine = TestPoolStateMachine( + configuration: configuration, + generator: .init(), + timerCancellationTokenType: MockTimerCancellationToken.self, + clock: clock + ) + + // refill pool + let requests = stateMachine.refillConnections() + #expect(requests.count == 5) + + _ = stateMachine.connectionEstablishFailed(ConnectionFailed(), for: requests[0]) + _ = stateMachine.connectionEstablishFailed(ConnectionFailed(), for: requests[1]) + _ = stateMachine.connectionEstablishFailed(ConnectionFailed(), for: requests[2]) + _ = stateMachine.connectionEstablishFailed(ConnectionFailed(), for: requests[3]) + _ = stateMachine.connectionEstablishFailed(ConnectionFailed(), for: requests[4]) + + let backOffDone2 = stateMachine.connectionCreationBackoffDone(requests[0].connectionID) + #expect(backOffDone2.request == .none) + #expect(backOffDone2.connection == .makeConnection(requests[0], [])) + + // make connection. Should return request to create 3 new connections + let connection = MockConnection(id: 0) + let createdAction = stateMachine.connectionEstablished(connection, maxStreams: 1) + let newRequests = (5..<8).map { TestPoolStateMachine.ConnectionRequest(connectionID: $0) } + let connectionKeepAliveTimer = TestPoolStateMachine.Timer(.init(timerID: 1, connectionID: 0, usecase: .keepAlive), duration: .seconds(2)) + #expect(createdAction.request == .none) + #expect(createdAction.connection == .makeConnectionsCancelAndScheduleTimers(.init(newRequests), [], .init(connectionKeepAliveTimer))) + + // make connection. Return + let connection2 = MockConnection(id: 5) + let createdAction2 = stateMachine.connectionEstablished(connection2, maxStreams: 1) + let connectionKeepAliveTimer2 = TestPoolStateMachine.Timer(.init(timerID: 0, connectionID: 5, usecase: .keepAlive), duration: .seconds(2)) + #expect(createdAction2.request == .none) + #expect(createdAction2.connection == .makeConnectionsCancelAndScheduleTimers( + .init(element: TestPoolStateMachine.ConnectionRequest(connectionID: 8)), [], .init(connectionKeepAliveTimer2)) + ) + } + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @Test func testTriggerForceShutdownWithBackingOffRequest() { struct ConnectionFailed: Error {} From dbc0b760f0b235541cea0adfbf48cd6c52600492 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 15 Dec 2025 16:26:16 +0000 Subject: [PATCH 2/9] Update testBackingOffRequests --- .../PoolStateMachineTests.swift | 66 ++++++++++++------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift index ff53d1d5..1de69959 100644 --- a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift @@ -525,7 +525,7 @@ typealias TestPoolStateMachine = PoolStateMachine< struct ConnectionFailed: Error, Equatable {} let clock = MockClock() var configuration = PoolConfiguration() - configuration.minimumConnectionCount = 2 + configuration.minimumConnectionCount = 0 configuration.maximumConnectionSoftLimit = 2 configuration.maximumConnectionHardLimit = 2 configuration.keepAliveDuration = .seconds(2) @@ -538,12 +538,22 @@ typealias TestPoolStateMachine = PoolStateMachine< clock: clock ) - // refill pool - let requests = stateMachine.refillConnections() - #expect(requests.count == 2) + // request two connections + let mockRequest1 = MockRequest(connectionType: MockConnection.self) + let leaseAction1 = stateMachine.leaseConnection(mockRequest1) + guard case .makeConnection(let request1, _) = leaseAction1.connection else { + Issue.record() + return + } + let mockRequest2 = MockRequest(connectionType: MockConnection.self) + let leaseAction2 = stateMachine.leaseConnection(mockRequest2) + guard case .makeConnection(let request2, _) = leaseAction2.connection else { + Issue.record() + return + } // fail connection 1 - let failedAction = stateMachine.connectionEstablishFailed(ConnectionFailed(), for: requests[0]) + let failedAction = stateMachine.connectionEstablishFailed(ConnectionFailed(), for: request1) #expect(failedAction.request == .none) switch failedAction.connection { case .scheduleTimers(let timers): @@ -553,26 +563,33 @@ typealias TestPoolStateMachine = PoolStateMachine< Issue.record() } - let request = MockRequest(connectionType: MockConnection.self) - let leaseAction = stateMachine.leaseConnection(request) + let mockRequest3 = MockRequest(connectionType: MockConnection.self) + let leaseAction = stateMachine.leaseConnection(mockRequest3) #expect(leaseAction.request == .none) #expect(leaseAction.connection == .none) clock.advance(to: clock.now.advanced(by: .seconds(30))) // fail connection 2. Connection request is removed as we already have a failing connection - let failedAction2 = stateMachine.connectionEstablishFailed(ConnectionFailed(), for: requests[1]) + let failedAction2 = stateMachine.connectionEstablishFailed(ConnectionFailed(), for: request2) #expect(failedAction2.request == .none) #expect(failedAction2.connection == .cancelTimers(.init())) #expect(stateMachine.connections.connections.count == 1) - let backOffDone = stateMachine.connectionCreationBackoffDone(requests[0].connectionID) + let backOffDone = stateMachine.connectionCreationBackoffDone(request1.connectionID) #expect(backOffDone.request == .none) - #expect(backOffDone.connection == .makeConnection(requests[0], [])) + #expect(backOffDone.connection == .makeConnection(request1, [])) // fail connection 1 again - let failedAction3 = stateMachine.connectionEstablishFailed(ConnectionFailed(), for: requests[0]) - #expect(failedAction3.request == .failRequests([request], ConnectionPoolError.connectionCreationCircuitBreakerTripped)) + let failedAction3 = stateMachine.connectionEstablishFailed(ConnectionFailed(), for: request1) + print(failedAction3) + switch failedAction3.request { + case .failRequests(let requests, let error): + #expect(Set(requests) == Set([mockRequest1, mockRequest2, mockRequest3])) + #expect(error == ConnectionPoolError.connectionCreationCircuitBreakerTripped) + default: + Issue.record() + } switch failedAction3.connection { case .scheduleTimers(let timers): #expect(timers.count == 1) @@ -582,27 +599,28 @@ typealias TestPoolStateMachine = PoolStateMachine< } // lease fails immediately as we are in circuitBreak state - let request2 = MockRequest(connectionType: MockConnection.self) - let leaseAction2 = stateMachine.leaseConnection(request2) - #expect(leaseAction2.request == .failRequest(request2, ConnectionPoolError.connectionCreationCircuitBreakerTripped)) - #expect(leaseAction2.connection == .none) + let request3 = MockRequest(connectionType: MockConnection.self) + let leaseAction3 = stateMachine.leaseConnection(request3) + #expect(leaseAction3.request == .failRequest(request3, ConnectionPoolError.connectionCreationCircuitBreakerTripped)) + #expect(leaseAction3.connection == .none) - let backOffDone2 = stateMachine.connectionCreationBackoffDone(requests[0].connectionID) + let backOffDone2 = stateMachine.connectionCreationBackoffDone(request1.connectionID) #expect(backOffDone2.request == .none) - #expect(backOffDone2.connection == .makeConnection(requests[0], [])) + #expect(backOffDone2.connection == .makeConnection(request1, [])) // make connection let connection = MockConnection(id: 0) let createdAction = stateMachine.connectionEstablished(connection, maxStreams: 1) - let connection2KeepAliveTimer = TestPoolStateMachine.Timer(.init(timerID: 2, connectionID: 0, usecase: .keepAlive), duration: .seconds(2)) + let connectionKeepAliveTimer = TestPoolStateMachine.Timer(.init(timerID: 2, connectionID: 0, usecase: .keepAlive), duration: .seconds(2)) + let connectionIdleTimer = TestPoolStateMachine.Timer(.init(timerID: 3, connectionID: 0, usecase: .idleTimeout), duration: .seconds(4)) #expect(createdAction.request == .none) - #expect(createdAction.connection == .scheduleTimers([connection2KeepAliveTimer])) + #expect(createdAction.connection == .scheduleTimers([connectionKeepAliveTimer, connectionIdleTimer])) // lease connection (successful) - let request3 = MockRequest(connectionType: MockConnection.self) - let leaseAction3 = stateMachine.leaseConnection(request3) - #expect(leaseAction3.request == .leaseConnection(.init(element: request3), connection)) - #expect(leaseAction3.connection == .none) + let request4 = MockRequest(connectionType: MockConnection.self) + let leaseAction4 = stateMachine.leaseConnection(request4) + #expect(leaseAction4.request == .leaseConnection(.init(element: request4), connection)) + #expect(leaseAction4.connection == .none) } From e72ed8bd85804bee384cad3104937bbe5b94dd76 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 15 Dec 2025 17:37:36 +0000 Subject: [PATCH 3/9] Remove minimumConnectionCount parameter --- .../ConnectionPoolModule/PoolStateMachine.swift | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index e589a0b1..b8385fea 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -341,7 +341,7 @@ struct PoolStateMachine< guard let (index, context) = self.connections.releaseConnection(connection.id, streams: streams) else { return .none() } - return self.handleAvailableConnection(index: index, availableContext: context, minimumConnectionCount: self.configuration.minimumConnectionCount) + return self.handleAvailableConnection(index: index, availableContext: context) } mutating func cancelRequest(id: RequestID) -> Action { @@ -372,7 +372,7 @@ struct PoolStateMachine< } let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams) - return self.handleAvailableConnection(index: index, availableContext: context, minimumConnectionCount: self.configuration.minimumConnectionCount) + return self.handleAvailableConnection(index: index, availableContext: context) } @inlinable @@ -566,7 +566,7 @@ struct PoolStateMachine< guard let (index, context) = self.connections.keepAliveSucceeded(connection.id) else { return .none() } - return self.handleAvailableConnection(index: index, availableContext: context, minimumConnectionCount: self.configuration.minimumConnectionCount) + return self.handleAvailableConnection(index: index, availableContext: context) } @inlinable @@ -671,14 +671,14 @@ struct PoolStateMachine< @inlinable /*private*/ mutating func handleAvailableConnection( index: Int, - availableContext: ConnectionGroup.AvailableConnectionContext, - minimumConnectionCount: Int + availableContext: ConnectionGroup.AvailableConnectionContext ) -> Action { // this connection was busy before let requests = self.requestQueue.pop(max: availableContext.info.availableStreams) if !requests.isEmpty { let leaseResult = self.connections.leaseConnection(at: index, streams: UInt16(requests.count)) - let connectionsRequired = minimumConnectionCount - Int(self.connections.stats.active) + //var connectionsRequired = (requests.count + self.configuration. + let connectionsRequired = self.configuration.minimumConnectionCount - Int(self.connections.stats.active) let connectionAction = self.createMultipleConnectionsAction( connectionsRequired, cancelledTimers: .init(leaseResult.timersToCancel), @@ -715,7 +715,7 @@ struct PoolStateMachine< } let timers = self.connections.parkConnection(at: index, hasBecomeIdle: newIdle).map(self.mapTimers) - let connectionsRequired = minimumConnectionCount - Int(self.connections.stats.active) + let connectionsRequired = self.configuration.minimumConnectionCount - Int(self.connections.stats.active) let connectionAction = self.createMultipleConnectionsAction( connectionsRequired, cancelledTimers: [], From 1f27837839bb4704e8dcccbe30ff80e190c01ab8 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 15 Dec 2025 17:54:14 +0000 Subject: [PATCH 4/9] Add stats check to tests --- .../PoolStateMachine.swift | 1 - .../PoolStateMachineTests.swift | 31 ++++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index b8385fea..844dea61 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -677,7 +677,6 @@ struct PoolStateMachine< let requests = self.requestQueue.pop(max: availableContext.info.availableStreams) if !requests.isEmpty { let leaseResult = self.connections.leaseConnection(at: index, streams: UInt16(requests.count)) - //var connectionsRequired = (requests.count + self.configuration. let connectionsRequired = self.configuration.minimumConnectionCount - Int(self.connections.stats.active) let connectionAction = self.createMultipleConnectionsAction( connectionsRequired, diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift index 1de69959..9eb5d3ec 100644 --- a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift @@ -52,6 +52,9 @@ typealias TestPoolStateMachine = PoolStateMachine< #expect(createdAction2.connection == .scheduleTimers([connection2KeepAliveTimer])) #expect(stateMachine.timerScheduled(connection2KeepAliveTimer, cancelContinuation: connection2KeepAliveTimerCancellationToken) == .none) } + + #expect(stateMachine.connections.stats.active == 2) + #expect(stateMachine.connections.stats.idle == 2) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -116,6 +119,9 @@ typealias TestPoolStateMachine = PoolStateMachine< #expect(stateMachine.timerScheduled(connection2IdleTimer, cancelContinuation: connection2IdleTimerCancellationToken) == .none) #expect(stateMachine.timerTriggered(connection2IdleTimer) == .init(request: .none, connection: .closeConnection(connection2, [connection2IdleTimerCancellationToken]))) + + #expect(stateMachine.connections.stats.active == 1) + #expect(stateMachine.connections.stats.leased == 1) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -176,6 +182,8 @@ typealias TestPoolStateMachine = PoolStateMachine< let shutdownAction = stateMachine.triggerForceShutdown() #expect(shutdownAction.request == .failRequests(.init(), .poolShutdown)) #expect(shutdownAction.connection == .initiateShutdown(.init())) + + #expect(stateMachine.connections.stats.active == 0) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -233,6 +241,9 @@ typealias TestPoolStateMachine = PoolStateMachine< // connection 1 is dropped #expect(stateMachine.connectionClosed(connection1) == .init(request: .none, connection: .cancelTimers([connection2IdleTimerCancellationToken]))) + + #expect(stateMachine.connections.stats.active == 1) + #expect(stateMachine.connections.stats.idle == 1) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -276,6 +287,8 @@ typealias TestPoolStateMachine = PoolStateMachine< let releaseRequest1 = stateMachine.releaseConnection(connection1, streams: 1) #expect(releaseRequest1.request == .none) #expect(releaseRequest1.connection == .none) + + #expect(stateMachine.connections.stats.active == 0) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -344,6 +357,8 @@ typealias TestPoolStateMachine = PoolStateMachine< // fail keep alive and cause closed let keepAliveFailed2 = stateMachine.connectionKeepAliveFailed(connection2.id) #expect(keepAliveFailed2.connection == .closeConnection(connection2, [])) + + #expect(stateMachine.connections.stats.active == 0) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -397,6 +412,9 @@ typealias TestPoolStateMachine = PoolStateMachine< } else { Issue.record("Unexpected connection action") } + + #expect(stateMachine.connections.stats.active == 1) + #expect(stateMachine.connections.stats.idle == 1) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -438,6 +456,7 @@ typealias TestPoolStateMachine = PoolStateMachine< #expect(closedAction.connection == .cancelEventStreamAndFinalCleanup([])) #expect(stateMachine.isShutdown) + #expect(stateMachine.connections.stats.active == 0) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -483,6 +502,7 @@ typealias TestPoolStateMachine = PoolStateMachine< #expect(closedAction.connection == .cancelEventStreamAndFinalCleanup([])) #expect(stateMachine.isShutdown) + #expect(stateMachine.connections.stats.active == 0) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -518,6 +538,7 @@ typealias TestPoolStateMachine = PoolStateMachine< #expect(closedAction.connection == .cancelEventStreamAndFinalCleanup([])) #expect(stateMachine.isShutdown) + #expect(stateMachine.connections.stats.active == 0) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -530,6 +551,7 @@ typealias TestPoolStateMachine = PoolStateMachine< configuration.maximumConnectionHardLimit = 2 configuration.keepAliveDuration = .seconds(2) configuration.idleTimeoutDuration = .seconds(4) + configuration.circuitBreakerTripAfter = .seconds(30) var stateMachine = TestPoolStateMachine( configuration: configuration, @@ -568,7 +590,7 @@ typealias TestPoolStateMachine = PoolStateMachine< #expect(leaseAction.request == .none) #expect(leaseAction.connection == .none) - clock.advance(to: clock.now.advanced(by: .seconds(30))) + clock.advance(to: clock.now.advanced(by: .seconds(31))) // fail connection 2. Connection request is removed as we already have a failing connection let failedAction2 = stateMachine.connectionEstablishFailed(ConnectionFailed(), for: request2) @@ -622,6 +644,8 @@ typealias TestPoolStateMachine = PoolStateMachine< #expect(leaseAction4.request == .leaseConnection(.init(element: request4), connection)) #expect(leaseAction4.connection == .none) + #expect(stateMachine.connections.stats.leased == 1) + #expect(stateMachine.connections.stats.active == 1) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -674,6 +698,10 @@ typealias TestPoolStateMachine = PoolStateMachine< #expect(createdAction2.connection == .makeConnectionsCancelAndScheduleTimers( .init(element: TestPoolStateMachine.ConnectionRequest(connectionID: 8)), [], .init(connectionKeepAliveTimer2)) ) + + #expect(stateMachine.connections.stats.active == 5) + #expect(stateMachine.connections.stats.idle == 2) + #expect(stateMachine.connections.stats.connecting == 3) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -729,5 +757,6 @@ typealias TestPoolStateMachine = PoolStateMachine< #expect(closedAction.connection == .cancelEventStreamAndFinalCleanup([])) #expect(stateMachine.isShutdown) + #expect(stateMachine.connections.stats.active == 0) } } From d883b9bae8e33f7ee3b41a24acf818906f4fef5f Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 15 Dec 2025 18:32:24 +0000 Subject: [PATCH 5/9] Add additional connection for every call to handleAvailableConnection if needed --- Sources/ConnectionPoolModule/PoolStateMachine.swift | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index 844dea61..98208bbc 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -677,7 +677,12 @@ struct PoolStateMachine< let requests = self.requestQueue.pop(max: availableContext.info.availableStreams) if !requests.isEmpty { let leaseResult = self.connections.leaseConnection(at: index, streams: UInt16(requests.count)) - let connectionsRequired = self.configuration.minimumConnectionCount - Int(self.connections.stats.active) + let connectionsRequired: Int + if requests.count <= self.connections.stats.availableStreams + self.connections.stats.leasedStreams { + connectionsRequired = self.configuration.minimumConnectionCount - Int(self.connections.stats.active) + } else { + connectionsRequired = 1 + } let connectionAction = self.createMultipleConnectionsAction( connectionsRequired, cancelledTimers: .init(leaseResult.timersToCancel), From 4c10e152ead80a81e78dcd37606bc2439c143dec Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 17 Dec 2025 11:03:32 +0000 Subject: [PATCH 6/9] Changes from review --- .../PoolStateMachine.swift | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index 98208bbc..e57e9e83 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -747,20 +747,22 @@ struct PoolStateMachine< cancelledTimers: TinyFastSequence, scheduledTimers: Max2Sequence ) -> ConnectionAction? { - if connectionCount > 0, - self.connections.stats.connecting < self.configuration.maximumConnectionRequestsAtOneTime { - let connectionCount = min( - connectionCount, - self.configuration.maximumConnectionRequestsAtOneTime - Int(self.connections.stats.connecting) - ) - var connectionRequests = TinyFastSequence() - connectionRequests.reserveCapacity(connectionCount) - for _ in 0.. 0, self.connections.stats.connecting < self.configuration.maximumConnectionRequestsAtOneTime else { + return nil + } + let connectionCount = min( + connectionCount, + self.configuration.maximumConnectionRequestsAtOneTime - Int(self.connections.stats.connecting) + ) + var connectionRequests = TinyFastSequence() + connectionRequests.reserveCapacity(connectionCount) + for _ in 0.. Date: Wed, 17 Dec 2025 18:23:04 +0000 Subject: [PATCH 7/9] Don't request new connections on lease if concurrent requests is above limit --- .../ConnectionPoolModule/ConnectionPool.swift | 5 +++++ .../PoolStateMachine.swift | 19 +++++++++++-------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/Sources/ConnectionPoolModule/ConnectionPool.swift b/Sources/ConnectionPoolModule/ConnectionPool.swift index 11001f6a..c26c03f5 100644 --- a/Sources/ConnectionPoolModule/ConnectionPool.swift +++ b/Sources/ConnectionPoolModule/ConnectionPool.swift @@ -124,6 +124,9 @@ public struct ConnectionPoolConfiguration: Sendable { /// pool before it is closed. public var idleTimeout: Duration + /// Maximum number of in-progress new connection requests to run at any one time + public var maximumConcurrentConnectionRequests: Int + /// initializer public init() { self.minimumConnectionCount = 0 @@ -131,6 +134,7 @@ public struct ConnectionPoolConfiguration: Sendable { self.maximumConnectionHardLimit = 16 self.circuitBreakerTripAfter = .seconds(60) self.idleTimeout = .seconds(60) + self.maximumConcurrentConnectionRequests = 20 } } @@ -590,6 +594,7 @@ extension PoolConfiguration { self.keepAliveDuration = keepAliveBehavior.keepAliveFrequency self.idleTimeoutDuration = configuration.idleTimeout self.circuitBreakerTripAfter = configuration.circuitBreakerTripAfter + self.maximumConcurrentConnectionRequests = configuration.maximumConcurrentConnectionRequests } } diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index e57e9e83..2dfc12e1 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -34,7 +34,7 @@ struct PoolConfiguration: Sendable { var idleTimeoutDuration: Duration = .seconds(30) @usableFromInline - var maximumConnectionRequestsAtOneTime: Int = 20 + var maximumConcurrentConnectionRequests: Int = 20 } @usableFromInline @@ -316,6 +316,12 @@ struct PoolStateMachine< request: requestAction, connection: .none ) + } else if self.connections.stats.connecting >= self.configuration.maximumConcurrentConnectionRequests { + // We have too many connection requests, lets delay creating any new connections + return .init( + request: requestAction, + connection: .none + ) } else if let request = self.connections.createNewDemandConnectionIfPossible() { // Can we create a demand connection return .init( @@ -678,7 +684,7 @@ struct PoolStateMachine< if !requests.isEmpty { let leaseResult = self.connections.leaseConnection(at: index, streams: UInt16(requests.count)) let connectionsRequired: Int - if requests.count <= self.connections.stats.availableStreams + self.connections.stats.leasedStreams { + if self.requestQueue.count <= self.connections.stats.availableStreams + self.connections.stats.leasedStreams { connectionsRequired = self.configuration.minimumConnectionCount - Int(self.connections.stats.active) } else { connectionsRequired = 1 @@ -748,13 +754,13 @@ struct PoolStateMachine< scheduledTimers: Max2Sequence ) -> ConnectionAction? { // only create connections if request connections is greater than zero and the number of already connecting - // connections is less than maximumConnectionRequestsAtOneTime - guard connectionCount > 0, self.connections.stats.connecting < self.configuration.maximumConnectionRequestsAtOneTime else { + // connections is less than maximumConcurrentConnectionRequests + guard connectionCount > 0, self.connections.stats.connecting < self.configuration.maximumConcurrentConnectionRequests else { return nil } let connectionCount = min( connectionCount, - self.configuration.maximumConnectionRequestsAtOneTime - Int(self.connections.stats.connecting) + self.configuration.maximumConcurrentConnectionRequests - Int(self.connections.stats.connecting) ) var connectionRequests = TinyFastSequence() connectionRequests.reserveCapacity(connectionCount) @@ -838,9 +844,6 @@ extension PoolStateMachine { @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) extension PoolStateMachine.Action: Equatable where TimerCancellationToken: Equatable, Request: Equatable {} -@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) -//extension PoolStateMachine.PoolState: Equatable {} - @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationToken: Equatable { @usableFromInline From 1a6a8f56f1a2f2536552d81b52835eb7236507aa Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 17 Dec 2025 18:23:23 +0000 Subject: [PATCH 8/9] Add test verifying concurrent requests limit --- .../PoolStateMachineTests.swift | 46 ++++++++++++++++++- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift index 9eb5d3ec..16c0559c 100644 --- a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift @@ -648,9 +648,51 @@ typealias TestPoolStateMachine = PoolStateMachine< #expect(stateMachine.connections.stats.active == 1) } + /// Test that we limit concurrent connection requests and that when connections are established + /// we request new connections @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) - @Test func testRefillConnectionPoolAfterConnectionFail() { + @Test func testConcurrentConnectionRequestsLimit() { + struct ConnectionFailed: Error, Equatable {} + let clock = MockClock() + var configuration = PoolConfiguration() + configuration.minimumConnectionCount = 0 + configuration.maximumConnectionSoftLimit = 10 + configuration.maximumConnectionHardLimit = 10 + configuration.keepAliveDuration = .seconds(2) + configuration.idleTimeoutDuration = .seconds(4) + configuration.maximumConcurrentConnectionRequests = 3 + + var stateMachine = TestPoolStateMachine( + configuration: configuration, + generator: .init(), + timerCancellationTokenType: MockTimerCancellationToken.self, + clock: clock + ) + let requests = (0..<5).map { _ in MockRequest(connectionType: MockConnection.self) } + let leaseRequests = requests.map { stateMachine.leaseConnection($0) } + #expect(leaseRequests[0].connection == .makeConnection(.init(connectionID: 0), [])) + #expect(leaseRequests[1].connection == .makeConnection(.init(connectionID: 1), [])) + #expect(leaseRequests[2].connection == .makeConnection(.init(connectionID: 2), [])) + #expect(leaseRequests[3].connection == .none) + #expect(leaseRequests[4].connection == .none) + for i in 0..<5 { + #expect(leaseRequests[i].request == .none) + } + let connections = (0..<5).map { MockConnection(id: $0) } + let connectedActions = (0..<5).map { stateMachine.connectionEstablished(connections[$0], maxStreams: 1) } + #expect(connectedActions[0].connection == .makeConnectionsCancelAndScheduleTimers(.init(element: .init(connectionID: 3)), [], [])) + #expect(connectedActions[1].connection == .makeConnectionsCancelAndScheduleTimers(.init(element: .init(connectionID: 4)), [], [])) + #expect(connectedActions[2].connection == .cancelTimers([])) + #expect(connectedActions[3].connection == .cancelTimers([])) + #expect(connectedActions[4].connection == .cancelTimers([])) + for i in 0..<5 { + #expect(connectedActions[i].request == .leaseConnection([requests[i]], connections[i])) + } + } + + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testRefillConnectionPoolAfterConnectionFail() { struct ConnectionFailed: Error, Equatable {} let clock = MockClock() var configuration = PoolConfiguration() @@ -659,7 +701,7 @@ typealias TestPoolStateMachine = PoolStateMachine< configuration.maximumConnectionHardLimit = 10 configuration.keepAliveDuration = .seconds(2) configuration.idleTimeoutDuration = .seconds(4) - configuration.maximumConnectionRequestsAtOneTime = 3 + configuration.maximumConcurrentConnectionRequests = 3 var stateMachine = TestPoolStateMachine( configuration: configuration, From d7bf5f99958379f7be710abd657e8626ca622acc Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 18 Dec 2025 09:14:35 +0000 Subject: [PATCH 9/9] Use connection hard limit in handleAvailableConnection --- .../PoolStateMachine.swift | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index 2dfc12e1..33227cfb 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -753,18 +753,19 @@ struct PoolStateMachine< cancelledTimers: TinyFastSequence, scheduledTimers: Max2Sequence ) -> ConnectionAction? { - // only create connections if request connections is greater than zero and the number of already connecting - // connections is less than maximumConcurrentConnectionRequests - guard connectionCount > 0, self.connections.stats.connecting < self.configuration.maximumConcurrentConnectionRequests else { - return nil - } - let connectionCount = min( - connectionCount, - self.configuration.maximumConcurrentConnectionRequests - Int(self.connections.stats.connecting) + let connectionCountLimitedByNumberOfRequests = min( + connectionCount, + self.configuration.maximumConcurrentConnectionRequests - Int(self.connections.stats.connecting) + ) + let connectionCountLimitedByHardLimit = min( + connectionCountLimitedByNumberOfRequests, + self.configuration.maximumConnectionHardLimit - Int(self.connections.stats.active) ) + guard connectionCountLimitedByHardLimit > 0 else { return nil } + var connectionRequests = TinyFastSequence() - connectionRequests.reserveCapacity(connectionCount) - for _ in 0..