Skip to content
Merged
9 changes: 9 additions & 0 deletions Sources/ConnectionPoolModule/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,15 @@ public final class ConnectionPool<
self.cancelTimers(timers)
self.eventContinuation.yield(.makeConnection(request))

case .makeConnectionsCancelAndScheduleTimers(let requests, let cancelledTimers, let scheduledTimers):
self.cancelTimers(cancelledTimers)
for request in requests {
self.eventContinuation.yield(.makeConnection(request))
}
for timer in scheduledTimers {
self.eventContinuation.yield(.scheduleTimer(timer))
}

case .runKeepAlive(let connection, let cancelContinuation):
cancelContinuation?.resume(returning: ())
self.eventContinuation.yield(.runKeepAlive(connection))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ extension PoolStateMachine {
preconditionFailure("Failing a connection we don't have a record of.")
}

self.stats.connecting -= 1
self.connections[index].destroyFailedConnection()
return self.swapForDeletion(index: index)
}
Expand Down
53 changes: 48 additions & 5 deletions Sources/ConnectionPoolModule/PoolStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ struct PoolConfiguration: Sendable {

@usableFromInline
var idleTimeoutDuration: Duration = .seconds(30)

@usableFromInline
var maximumConnectionRequestsAtOneTime: Int = 20
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a follow up pr that allows us to configure this after we have landed this pr.

}

@usableFromInline
Expand Down Expand Up @@ -90,6 +93,7 @@ struct PoolStateMachine<

case scheduleTimers(Max2Sequence<Timer>)
case makeConnection(ConnectionRequest, TinyFastSequence<TimerCancellationToken>)
case makeConnectionsCancelAndScheduleTimers(TinyFastSequence<ConnectionRequest>, TinyFastSequence<TimerCancellationToken>, Max2Sequence<Timer>)
case runKeepAlive(Connection, TimerCancellationToken?)
case cancelTimers(TinyFastSequence<TimerCancellationToken>)
case closeConnection(Connection, Max2Sequence<TimerCancellationToken>)
Expand Down Expand Up @@ -673,9 +677,20 @@ struct PoolStateMachine<
let requests = self.requestQueue.pop(max: availableContext.info.availableStreams)
if !requests.isEmpty {
let leaseResult = self.connections.leaseConnection(at: index, streams: UInt16(requests.count))
let connectionsRequired: Int
if requests.count <= self.connections.stats.availableStreams + self.connections.stats.leasedStreams {
connectionsRequired = self.configuration.minimumConnectionCount - Int(self.connections.stats.active)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does active include already connecting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

} else {
connectionsRequired = 1
}
let connectionAction = self.createMultipleConnectionsAction(
connectionsRequired,
cancelledTimers: .init(leaseResult.timersToCancel),
scheduledTimers: []
) ?? .cancelTimers(.init(leaseResult.timersToCancel))
return .init(
request: .leaseConnection(requests, leaseResult.connection),
connection: .cancelTimers(.init(leaseResult.timersToCancel))
connection: connectionAction
)
}

Expand Down Expand Up @@ -704,10 +719,13 @@ struct PoolStateMachine<
}
let timers = self.connections.parkConnection(at: index, hasBecomeIdle: newIdle).map(self.mapTimers)

return .init(
request: .none,
connection: .scheduleTimers(timers)
)
let connectionsRequired = self.configuration.minimumConnectionCount - Int(self.connections.stats.active)
let connectionAction = self.createMultipleConnectionsAction(
connectionsRequired,
cancelledTimers: [],
scheduledTimers: timers
) ?? .scheduleTimers(timers)
return .init(request: .none, connection: connectionAction)
}

case .overflow:
Expand All @@ -723,6 +741,28 @@ struct PoolStateMachine<

}

@inlinable
/* private */ mutating func createMultipleConnectionsAction(
_ connectionCount: Int,
cancelledTimers: TinyFastSequence<TimerCancellationToken>,
scheduledTimers: Max2Sequence<Timer>
) -> ConnectionAction? {
if connectionCount > 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets use a guard here, instead.

self.connections.stats.connecting < self.configuration.maximumConnectionRequestsAtOneTime {
let connectionCount = min(
connectionCount,
self.configuration.maximumConnectionRequestsAtOneTime - Int(self.connections.stats.connecting)
)
var connectionRequests = TinyFastSequence<ConnectionRequest>()
connectionRequests.reserveCapacity(connectionCount)
for _ in 0..<connectionCount {
connectionRequests.append(self.connections.createNewConnection())
}
return .makeConnectionsCancelAndScheduleTimers(connectionRequests, cancelledTimers, scheduledTimers)
}
return nil
}

@inlinable
/* private */ func mapTimers(_ connectionTimer: ConnectionTimer) -> Timer {
switch connectionTimer.usecase {
Expand Down Expand Up @@ -808,6 +848,9 @@ extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationTo
return lhs == rhs
case (.makeConnection(let lhsRequest, let lhsToken), .makeConnection(let rhsRequest, let rhsToken)):
return lhsRequest == rhsRequest && lhsToken == rhsToken
case (.makeConnectionsCancelAndScheduleTimers(let lhsRequests, let lhsTokens, let lhsTimers),
.makeConnectionsCancelAndScheduleTimers(let rhsRequests, let rhsTokens, let rhsTimers)):
return lhsRequests == rhsRequests && lhsTokens == rhsTokens && lhsTimers == rhsTimers
case (.runKeepAlive(let lhsConn, let lhsToken), .runKeepAlive(let rhsConn, let rhsToken)):
return lhsConn === rhsConn && lhsToken == rhsToken
case (.closeConnection(let lhsConn, let lhsTimers), .closeConnection(let rhsConn, let rhsTimers)):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ import Testing
}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
@Test func testConnectionTimeout() async throws {
@Test func testCircuitBreaker() async throws {
struct ConnectionFailedError: Error {}
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
Expand Down
Loading