-
-
Notifications
You must be signed in to change notification settings - Fork 89
Re-instate minimum connections after failed connection #612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
11c0c2f
dbc0b76
e72ed8b
1f27837
d883b9b
4c10e15
296dc67
1a6a8f5
d7bf5f9
604954f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,9 @@ struct PoolConfiguration: Sendable { | |
|
|
||
| @usableFromInline | ||
| var idleTimeoutDuration: Duration = .seconds(30) | ||
|
|
||
| @usableFromInline | ||
| var maximumConnectionRequestsAtOneTime: Int = 20 | ||
| } | ||
|
|
||
| @usableFromInline | ||
|
|
@@ -90,6 +93,7 @@ struct PoolStateMachine< | |
|
|
||
| case scheduleTimers(Max2Sequence<Timer>) | ||
| case makeConnection(ConnectionRequest, TinyFastSequence<TimerCancellationToken>) | ||
| case makeConnectionsCancelAndScheduleTimers(TinyFastSequence<ConnectionRequest>, TinyFastSequence<TimerCancellationToken>, Max2Sequence<Timer>) | ||
| case runKeepAlive(Connection, TimerCancellationToken?) | ||
| case cancelTimers(TinyFastSequence<TimerCancellationToken>) | ||
| case closeConnection(Connection, Max2Sequence<TimerCancellationToken>) | ||
|
|
@@ -673,9 +677,20 @@ struct PoolStateMachine< | |
| let requests = self.requestQueue.pop(max: availableContext.info.availableStreams) | ||
| if !requests.isEmpty { | ||
adam-fowler marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let leaseResult = self.connections.leaseConnection(at: index, streams: UInt16(requests.count)) | ||
| let connectionsRequired: Int | ||
| if requests.count <= self.connections.stats.availableStreams + self.connections.stats.leasedStreams { | ||
| connectionsRequired = self.configuration.minimumConnectionCount - Int(self.connections.stats.active) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does active include already connecting?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||
| } else { | ||
| connectionsRequired = 1 | ||
| } | ||
| let connectionAction = self.createMultipleConnectionsAction( | ||
| connectionsRequired, | ||
| cancelledTimers: .init(leaseResult.timersToCancel), | ||
| scheduledTimers: [] | ||
| ) ?? .cancelTimers(.init(leaseResult.timersToCancel)) | ||
| return .init( | ||
| request: .leaseConnection(requests, leaseResult.connection), | ||
| connection: .cancelTimers(.init(leaseResult.timersToCancel)) | ||
| connection: connectionAction | ||
| ) | ||
| } | ||
|
|
||
|
|
@@ -704,10 +719,13 @@ struct PoolStateMachine< | |
| } | ||
| let timers = self.connections.parkConnection(at: index, hasBecomeIdle: newIdle).map(self.mapTimers) | ||
|
|
||
| return .init( | ||
| request: .none, | ||
| connection: .scheduleTimers(timers) | ||
| ) | ||
| let connectionsRequired = self.configuration.minimumConnectionCount - Int(self.connections.stats.active) | ||
| let connectionAction = self.createMultipleConnectionsAction( | ||
| connectionsRequired, | ||
| cancelledTimers: [], | ||
| scheduledTimers: timers | ||
| ) ?? .scheduleTimers(timers) | ||
| return .init(request: .none, connection: connectionAction) | ||
| } | ||
|
|
||
| case .overflow: | ||
|
|
@@ -723,6 +741,28 @@ struct PoolStateMachine< | |
|
|
||
| } | ||
|
|
||
| @inlinable | ||
| /* private */ mutating func createMultipleConnectionsAction( | ||
| _ connectionCount: Int, | ||
| cancelledTimers: TinyFastSequence<TimerCancellationToken>, | ||
| scheduledTimers: Max2Sequence<Timer> | ||
| ) -> ConnectionAction? { | ||
| if connectionCount > 0, | ||
|
||
| self.connections.stats.connecting < self.configuration.maximumConnectionRequestsAtOneTime { | ||
| let connectionCount = min( | ||
| connectionCount, | ||
| self.configuration.maximumConnectionRequestsAtOneTime - Int(self.connections.stats.connecting) | ||
| ) | ||
| var connectionRequests = TinyFastSequence<ConnectionRequest>() | ||
| connectionRequests.reserveCapacity(connectionCount) | ||
| for _ in 0..<connectionCount { | ||
| connectionRequests.append(self.connections.createNewConnection()) | ||
| } | ||
| return .makeConnectionsCancelAndScheduleTimers(connectionRequests, cancelledTimers, scheduledTimers) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| @inlinable | ||
| /* private */ func mapTimers(_ connectionTimer: ConnectionTimer) -> Timer { | ||
| switch connectionTimer.usecase { | ||
|
|
@@ -808,6 +848,9 @@ extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationTo | |
| return lhs == rhs | ||
| case (.makeConnection(let lhsRequest, let lhsToken), .makeConnection(let rhsRequest, let rhsToken)): | ||
| return lhsRequest == rhsRequest && lhsToken == rhsToken | ||
| case (.makeConnectionsCancelAndScheduleTimers(let lhsRequests, let lhsTokens, let lhsTimers), | ||
| .makeConnectionsCancelAndScheduleTimers(let rhsRequests, let rhsTokens, let rhsTimers)): | ||
| return lhsRequests == rhsRequests && lhsTokens == rhsTokens && lhsTimers == rhsTimers | ||
| case (.runKeepAlive(let lhsConn, let lhsToken), .runKeepAlive(let rhsConn, let rhsToken)): | ||
| return lhsConn === rhsConn && lhsToken == rhsToken | ||
| case (.closeConnection(let lhsConn, let lhsTimers), .closeConnection(let rhsConn, let rhsTimers)): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add a follow up pr that allows us to configure this after we have landed this pr.