Skip to content
14 changes: 14 additions & 0 deletions Sources/ConnectionPoolModule/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,17 @@ public struct ConnectionPoolConfiguration: Sendable {
/// pool before it is closed.
public var idleTimeout: Duration

/// Maximum number of in-progress new connection requests to run at any one time
public var maximumConcurrentConnectionRequests: Int

/// initializer
public init() {
self.minimumConnectionCount = 0
self.maximumConnectionSoftLimit = 16
self.maximumConnectionHardLimit = 16
self.circuitBreakerTripAfter = .seconds(60)
self.idleTimeout = .seconds(60)
self.maximumConcurrentConnectionRequests = 20
}
}

Expand Down Expand Up @@ -380,6 +384,15 @@ public final class ConnectionPool<
self.cancelTimers(timers)
self.eventContinuation.yield(.makeConnection(request))

case .makeConnectionsCancelAndScheduleTimers(let requests, let cancelledTimers, let scheduledTimers):
self.cancelTimers(cancelledTimers)
for request in requests {
self.eventContinuation.yield(.makeConnection(request))
}
for timer in scheduledTimers {
self.eventContinuation.yield(.scheduleTimer(timer))
}

case .runKeepAlive(let connection, let cancelContinuation):
cancelContinuation?.resume(returning: ())
self.eventContinuation.yield(.runKeepAlive(connection))
Expand Down Expand Up @@ -581,6 +594,7 @@ extension PoolConfiguration {
self.keepAliveDuration = keepAliveBehavior.keepAliveFrequency
self.idleTimeoutDuration = configuration.idleTimeout
self.circuitBreakerTripAfter = configuration.circuitBreakerTripAfter
self.maximumConcurrentConnectionRequests = configuration.maximumConcurrentConnectionRequests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ extension PoolStateMachine {
preconditionFailure("Failing a connection we don't have a record of.")
}

self.stats.connecting -= 1
self.connections[index].destroyFailedConnection()
return self.swapForDeletion(index: index)
}
Expand Down
65 changes: 57 additions & 8 deletions Sources/ConnectionPoolModule/PoolStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ struct PoolConfiguration: Sendable {

@usableFromInline
var idleTimeoutDuration: Duration = .seconds(30)

@usableFromInline
var maximumConcurrentConnectionRequests: Int = 20
}

@usableFromInline
Expand Down Expand Up @@ -90,6 +93,7 @@ struct PoolStateMachine<

case scheduleTimers(Max2Sequence<Timer>)
case makeConnection(ConnectionRequest, TinyFastSequence<TimerCancellationToken>)
case makeConnectionsCancelAndScheduleTimers(TinyFastSequence<ConnectionRequest>, TinyFastSequence<TimerCancellationToken>, Max2Sequence<Timer>)
case runKeepAlive(Connection, TimerCancellationToken?)
case cancelTimers(TinyFastSequence<TimerCancellationToken>)
case closeConnection(Connection, Max2Sequence<TimerCancellationToken>)
Expand Down Expand Up @@ -312,6 +316,12 @@ struct PoolStateMachine<
request: requestAction,
connection: .none
)
} else if self.connections.stats.connecting >= self.configuration.maximumConcurrentConnectionRequests {
// We have too many connection requests, lets delay creating any new connections
return .init(
request: requestAction,
connection: .none
)
} else if let request = self.connections.createNewDemandConnectionIfPossible() {
// Can we create a demand connection
return .init(
Expand Down Expand Up @@ -673,9 +683,20 @@ struct PoolStateMachine<
let requests = self.requestQueue.pop(max: availableContext.info.availableStreams)
if !requests.isEmpty {
let leaseResult = self.connections.leaseConnection(at: index, streams: UInt16(requests.count))
let connectionsRequired: Int
if self.requestQueue.count <= self.connections.stats.availableStreams + self.connections.stats.leasedStreams {
connectionsRequired = self.configuration.minimumConnectionCount - Int(self.connections.stats.active)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does active include already connecting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

} else {
connectionsRequired = 1
}
let connectionAction = self.createMultipleConnectionsAction(
connectionsRequired,
cancelledTimers: .init(leaseResult.timersToCancel),
scheduledTimers: []
) ?? .cancelTimers(.init(leaseResult.timersToCancel))
return .init(
request: .leaseConnection(requests, leaseResult.connection),
connection: .cancelTimers(.init(leaseResult.timersToCancel))
connection: connectionAction
)
}

Expand Down Expand Up @@ -704,10 +725,13 @@ struct PoolStateMachine<
}
let timers = self.connections.parkConnection(at: index, hasBecomeIdle: newIdle).map(self.mapTimers)

return .init(
request: .none,
connection: .scheduleTimers(timers)
)
let connectionsRequired = self.configuration.minimumConnectionCount - Int(self.connections.stats.active)
let connectionAction = self.createMultipleConnectionsAction(
connectionsRequired,
cancelledTimers: [],
scheduledTimers: timers
) ?? .scheduleTimers(timers)
return .init(request: .none, connection: connectionAction)
}

case .overflow:
Expand All @@ -723,6 +747,31 @@ struct PoolStateMachine<

}

@inlinable
/* private */ mutating func createMultipleConnectionsAction(
_ connectionCount: Int,
cancelledTimers: TinyFastSequence<TimerCancellationToken>,
scheduledTimers: Max2Sequence<Timer>
) -> ConnectionAction? {
let connectionCountLimitedByNumberOfRequests = min(
connectionCount,
self.configuration.maximumConcurrentConnectionRequests - Int(self.connections.stats.connecting)
)
let connectionCountLimitedByHardLimit = min(
connectionCountLimitedByNumberOfRequests,
self.configuration.maximumConnectionHardLimit - Int(self.connections.stats.active)
)
guard connectionCountLimitedByHardLimit > 0 else { return nil }

var connectionRequests = TinyFastSequence<ConnectionRequest>()
connectionRequests.reserveCapacity(connectionCountLimitedByHardLimit)
for _ in 0..<connectionCountLimitedByHardLimit {
connectionRequests.append(self.connections.createNewConnection())
}
return .makeConnectionsCancelAndScheduleTimers(connectionRequests, cancelledTimers, scheduledTimers)

}

@inlinable
/* private */ func mapTimers(_ connectionTimer: ConnectionTimer) -> Timer {
switch connectionTimer.usecase {
Expand Down Expand Up @@ -796,9 +845,6 @@ extension PoolStateMachine {
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
extension PoolStateMachine.Action: Equatable where TimerCancellationToken: Equatable, Request: Equatable {}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
//extension PoolStateMachine.PoolState: Equatable {}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationToken: Equatable {
@usableFromInline
Expand All @@ -808,6 +854,9 @@ extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationTo
return lhs == rhs
case (.makeConnection(let lhsRequest, let lhsToken), .makeConnection(let rhsRequest, let rhsToken)):
return lhsRequest == rhsRequest && lhsToken == rhsToken
case (.makeConnectionsCancelAndScheduleTimers(let lhsRequests, let lhsTokens, let lhsTimers),
.makeConnectionsCancelAndScheduleTimers(let rhsRequests, let rhsTokens, let rhsTimers)):
return lhsRequests == rhsRequests && lhsTokens == rhsTokens && lhsTimers == rhsTimers
case (.runKeepAlive(let lhsConn, let lhsToken), .runKeepAlive(let rhsConn, let rhsToken)):
return lhsConn === rhsConn && lhsToken == rhsToken
case (.closeConnection(let lhsConn, let lhsTimers), .closeConnection(let rhsConn, let rhsTimers)):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ import Testing
}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
@Test func testConnectionTimeout() async throws {
@Test func testCircuitBreaker() async throws {
struct ConnectionFailedError: Error {}
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
Expand Down
Loading
Loading