Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,22 @@ extension HTTPConnectionPool {
}
}

mutating func fail() {
enum FailAction {
case removeConnection
case none
}

mutating func fail() -> FailAction {
switch self.state {
case .starting, .backingOff, .idle, .leased:
case .starting:
// If the connection fails while we are starting it, the fail call raced with
// `failedToConnect` (promises are succeeded or failed before channel handler
// callbacks). let's keep the state in `starting`, so that `failedToConnect` can
// create a backoff timer.
return .none
case .backingOff, .idle, .leased:
self.state = .closed
return .removeConnection
case .closed:
preconditionFailure("Invalid state: \(self.state)")
}
Expand Down Expand Up @@ -559,23 +571,28 @@ extension HTTPConnectionPool {
}

let use: ConnectionUse
self.connections[index].fail()
let eventLoop = self.connections[index].eventLoop
let starting: Int
if index < self.overflowIndex {
use = .generalPurpose
starting = self.startingGeneralPurposeConnections
} else {
use = .eventLoop(eventLoop)
starting = self.startingEventLoopConnections(on: eventLoop)
}
switch self.connections[index].fail() {
case .removeConnection:
let eventLoop = self.connections[index].eventLoop
let starting: Int
if index < self.overflowIndex {
use = .generalPurpose
starting = self.startingGeneralPurposeConnections
} else {
use = .eventLoop(eventLoop)
starting = self.startingEventLoopConnections(on: eventLoop)
}

let context = FailedConnectionContext(
eventLoop: eventLoop,
use: use,
connectionsStartingForUseCase: starting
)
return (index, context)
let context = FailedConnectionContext(
eventLoop: eventLoop,
use: use,
connectionsStartingForUseCase: starting
)
return (index, context)

case .none:
return nil
}
}

// MARK: Migration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ extension HTTPConnectionPool {
self.failedConsecutiveConnectionAttempts += 1
self.lastConnectFailure = error

// We don't care how many waiting requests we have at this point, we will schedule a
// retry. More tasks, may appear until the backoff has completed. The final
// decision about the retry will be made in `connectionCreationBackoffDone(_:)`
let eventLoop = self.connections.backoffNextConnectionAttempt(connectionID)

switch self.lifecycleState {
case .running:
guard self.retryConnectionEstablishment else {
Expand All @@ -265,10 +270,6 @@ extension HTTPConnectionPool {
connection: .none
)
}
// We don't care how many waiting requests we have at this point, we will schedule a
// retry. More tasks, may appear until the backoff has completed. The final
// decision about the retry will be made in `connectionCreationBackoffDone(_:)`
let eventLoop = self.connections.backoffNextConnectionAttempt(connectionID)

let backoff = calculateBackoff(failedAttempt: self.failedConsecutiveConnectionAttempts)
return .init(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,22 @@ extension HTTPConnectionPool {
}
}

mutating func fail() {
enum FailAction {
case removeConnection
case none
}

mutating func fail() -> FailAction {
switch self.state {
case .starting, .active, .backingOff, .draining:
case .starting:
// If the connection fails while we are starting it, the fail call raced with
// `failedToConnect` (promises are succeeded or failed before channel handler
// callbacks). let's keep the state in `starting`, so that `failedToConnect` can
// create a backoff timer.
return .none
case .active, .backingOff, .draining:
self.state = .closed
return .removeConnection
case .closed:
preconditionFailure("Invalid state: \(self.state)")
}
Expand Down Expand Up @@ -749,10 +761,16 @@ extension HTTPConnectionPool {
// must ignore the event.
return nil
}
self.connections[index].fail()
let eventLoop = self.connections[index].eventLoop
let context = FailedConnectionContext(eventLoop: eventLoop)
return (index, context)

switch self.connections[index].fail() {
case .none:
return nil

case .removeConnection:
let eventLoop = self.connections[index].eventLoop
let context = FailedConnectionContext(eventLoop: eventLoop)
return (index, context)
}
}

mutating func shutdown() -> CleanupContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,20 +226,18 @@ extension HTTPConnectionPool {
) -> EstablishedAction {
self.failedConsecutiveConnectionAttempts = 0
self.lastConnectFailure = nil
if self.connections.hasActiveConnection(for: connection.eventLoop) {
guard let (index, _) = self.connections.failConnection(connection.id) else {
preconditionFailure("we have established a new connection that we know nothing about?")
}
self.connections.removeConnection(at: index)
let doesConnectionExistsForEL = self.connections.hasActiveConnection(for: connection.eventLoop)
let (index, context) = self.connections.newHTTP2ConnectionEstablished(
connection,
maxConcurrentStreams: maxConcurrentStreams
)
if doesConnectionExistsForEL {
let connection = self.connections.closeConnection(at: index)
return .init(
request: .none,
connection: .closeConnection(connection, isShutdown: .no)
)
} else {
let (index, context) = self.connections.newHTTP2ConnectionEstablished(
connection,
maxConcurrentStreams: maxConcurrentStreams
)
return self.nextActionForAvailableConnection(at: index, context: context)
}
}
Expand Down Expand Up @@ -424,6 +422,8 @@ extension HTTPConnectionPool {
self.failedConsecutiveConnectionAttempts += 1
self.lastConnectFailure = error

let eventLoop = self.connections.backoffNextConnectionAttempt(connectionID)

switch self.lifecycleState {
case .running:
guard self.retryConnectionEstablishment else {
Expand All @@ -440,7 +440,6 @@ extension HTTPConnectionPool {
)
}

let eventLoop = self.connections.backoffNextConnectionAttempt(connectionID)
let backoff = calculateBackoff(failedAttempt: self.failedConsecutiveConnectionAttempts)
return .init(
request: .none,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase {
XCTAssertEqual(connections.closeConnection(at: releaseIndex), lease)
XCTAssertFalse(connections.isEmpty)

let backoffEL = connections.backoffNextConnectionAttempt(startingID)
XCTAssertIdentical(backoffEL, el2)
guard let (failIndex, _) = connections.failConnection(startingID) else {
return XCTFail("Expected that the connection is remembered")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1493,4 +1493,48 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase {

// We won't bother doing it though, it's enough that it asked.
}

func testFailConnectionRacesAgainstConnectionCreationFailed() {
let elg = EmbeddedEventLoopGroup(loops: 4)
defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) }

var state = HTTPConnectionPool.StateMachine(
idGenerator: .init(),
maximumConcurrentHTTP1Connections: 2,
retryConnectionEstablishment: true,
preferHTTP1: true,
maximumConnectionUses: nil,
preWarmedHTTP1ConnectionCount: 0
)

let mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next())
let request = HTTPConnectionPool.Request(mockRequest)

let executeAction = state.executeRequest(request)
XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), executeAction.request)

// 1. connection attempt
guard case .createConnection(let connectionID, on: let connectionEL) = executeAction.connection else {
return XCTFail("Unexpected connection action: \(executeAction.connection)")
}
XCTAssert(connectionEL === mockRequest.eventLoop) // XCTAssertIdentical not available on Linux

// 2. connection fails – first with closed callback

XCTAssertEqual(state.http1ConnectionClosed(connectionID), .none)

// 3. connection fails – with make connection callback

let action = state.failedToCreateNewConnection(
IOError(errnoCode: -1, reason: "Test failure"),
connectionID: connectionID
)
XCTAssertEqual(action.request, .none)
guard case .scheduleBackoffTimer(connectionID, _, on: let backoffTimerEL) = action.connection else {
XCTFail("Unexpected connection action: \(action.connection)")
return
}
XCTAssertIdentical(connectionEL, backoffTimerEL)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase {
XCTAssertEqual(connections.closeConnection(at: releaseIndex), leasedConn)
XCTAssertFalse(connections.isEmpty)

let backoffEL = connections.backoffNextConnectionAttempt(startingID)
XCTAssertIdentical(el6, backoffEL)
guard let (failIndex, _) = connections.failConnection(startingID) else {
return XCTFail("Expected that the connection is remembered")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,50 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase {

XCTAssertEqual(state.http2ConnectionClosed(connection.id), .none)
}

func testFailConnectionRacesAgainstConnectionCreationFailed() {
let elg = EmbeddedEventLoopGroup(loops: 4)
defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) }

var state = HTTPConnectionPool.StateMachine(
idGenerator: .init(),
maximumConcurrentHTTP1Connections: 2,
retryConnectionEstablishment: true,
preferHTTP1: false,
maximumConnectionUses: nil,
preWarmedHTTP1ConnectionCount: 0
)

let mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next())
let request = HTTPConnectionPool.Request(mockRequest)

let executeAction = state.executeRequest(request)
XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), executeAction.request)

// 1. connection attempt
guard case .createConnection(let connectionID, on: let connectionEL) = executeAction.connection else {
return XCTFail("Unexpected connection action: \(executeAction.connection)")
}
XCTAssert(connectionEL === mockRequest.eventLoop) // XCTAssertIdentical not available on Linux

// 2. connection fails – first with closed callback

XCTAssertEqual(state.http2ConnectionClosed(connectionID), .none)

// 3. connection fails – with make connection callback

let action = state.failedToCreateNewConnection(
IOError(errnoCode: -1, reason: "Test failure"),
connectionID: connectionID
)
XCTAssertEqual(action.request, .none)
guard case .scheduleBackoffTimer(connectionID, _, on: let backoffTimerEL) = action.connection else {
XCTFail("Unexpected connection action: \(action.connection)")
return
}
XCTAssertIdentical(connectionEL, backoffTimerEL)
}

}

/// Should be used if you have a value of statically unknown type and want to compare its value to another `Equatable` value.
Expand Down