@@ -278,10 +278,6 @@ extension HTTPConnectionPool {
278278 self . overflowIndex < self . maximumConcurrentConnections
279279 }
280280
281- private var maximumAdditionalGeneralPurposeConnections : Int {
282- self . maximumConcurrentConnections - ( self . overflowIndex - 1 )
283- }
284-
285281 var startingGeneralPurposeConnections : Int {
286282 var connecting = 0
287283 for connectionState in self . connections [ 0 ..< self . overflowIndex] {
@@ -545,66 +541,84 @@ extension HTTPConnectionPool {
545541
546542 /// We only handle starting and backing off connection here.
547543 /// All already running connections must be handled by the enclosing state machine.
548- /// We will also create new connections for `requiredEventLoopsOfPendingRequests`
549- /// if we do not already have a connection that can or will be able to execute requests on the given event loop.
550- /// In addition, we also create more general purpose connections if we do not have enough to execute
551- /// all requests on the given `preferredEventLoopsOfPendingGeneralPurposeRequests`
552- /// until we reach `maximumConcurrentConnections`.
553544 /// - Parameters:
554545 /// - starting: starting HTTP connections from previous state machine
555546 /// - backingOff: backing off HTTP connections from previous state machine
556- /// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
557- /// - preferredEventLoopsOfPendingGeneralPurposeRequests: the preferred event loop of all pending requests without a required event loop
558- /// - Returns: new connections that need to be created
559- mutating func migrateFromHTTP2< PreferredEventLoopSequence> (
547+ mutating func migrateFromHTTP2(
560548 starting: [ ( Connection . ID , EventLoop ) ] ,
561- backingOff: [ ( Connection . ID , EventLoop ) ] ,
562- requiredEventLoopsOfPendingRequests: [ EventLoop ] ,
563- preferredEventLoopsOfPendingGeneralPurposeRequests: PreferredEventLoopSequence
564- ) -> [ ( Connection . ID , EventLoop ) ] where PreferredEventLoopSequence: Sequence , PreferredEventLoopSequence. Element == EventLoop {
549+ backingOff: [ ( Connection . ID , EventLoop ) ]
550+ ) {
565551 for (connectionID, eventLoop) in starting {
566552 let newConnection = HTTP1ConnectionState ( connectionID: connectionID, eventLoop: eventLoop)
567553 self . connections. insert ( newConnection, at: self . overflowIndex)
568- self . overflowIndex = self . connections. index ( after: self . overflowIndex)
554+ /// If we can grow, we mark the connection as a general purpose connection.
555+ /// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
556+ if self . canGrow {
557+ self . overflowIndex = self . connections. index ( after: self . overflowIndex)
558+ }
569559 }
570560
571561 for (connectionID, eventLoop) in backingOff {
572562 var backingOffConnection = HTTP1ConnectionState ( connectionID: connectionID, eventLoop: eventLoop)
573563 // TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState
574564 backingOffConnection. failedToConnect ( )
575565 self . connections. insert ( backingOffConnection, at: self . overflowIndex)
576- self . overflowIndex = self . connections. index ( after: self . overflowIndex)
566+ /// If we can grow, we mark the connection as a general purpose connection.
567+ /// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
568+ if self . canGrow {
569+ self . overflowIndex = self . connections. index ( after: self . overflowIndex)
570+ }
577571 }
572+ }
578573
579- // create new connections for requests with a required event loop
580- let eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests = Set (
581- self . connections. lazy
582- . filter {
583- $0. canOrWillBeAbleToExecuteRequests
584- } . map {
585- $0. eventLoop. id
586- }
574+ /// We will create new connections for each `requiredEventLoopOfPendingRequests`
575+ /// In addition, we also create more general purpose connections if we do not have enough to execute
576+ /// all requests on the given `preferredEventLoopsOfPendingGeneralPurposeRequests`
577+ /// until we reach `maximumConcurrentConnections
578+ /// - Parameters:
579+ /// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
580+ /// - generalPurposeRequestCountPerPreferredEventLoop: request count with no required event loop, grouped by preferred event loop and ordered descending by number of requests
581+ /// - Returns: new connections that need to be created
582+ mutating func createConnectionsAfterMigrationIfNeeded(
583+ requiredEventLoopOfPendingRequests: [ ( EventLoop , Int ) ] ,
584+ generalPurposeRequestCountGroupedByPreferredEventLoop: [ ( EventLoop , Int ) ]
585+ ) -> [ ( Connection . ID , EventLoop ) ] {
586+ /// create new connections for requests with a required event loop
587+
588+ /// we may already start connections for those requests and do not want to start to many
589+ let startingRequiredEventLoopConnectionCount = Dictionary (
590+ self . connections [ self . overflowIndex..< self . connections. endIndex] . lazy. map {
591+ ( $0. eventLoop. id, 1 )
592+ } ,
593+ uniquingKeysWith: +
587594 )
588- var createConnections = requiredEventLoopsOfPendingRequests. compactMap { eventLoop -> ( Connection . ID , EventLoop ) ? in
589- guard !eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests. contains ( eventLoop. id)
590- else { return nil }
591- let connectionID = self . createNewOverflowConnection ( on: eventLoop)
592- return ( connectionID, eventLoop)
593- }
594-
595- // create new connections for requests without a required event loop
596- createConnections. append (
597- contentsOf: preferredEventLoopsOfPendingGeneralPurposeRequests. lazy
598- /// we do not want to start additional connection for requests for which we already have starting or backing off connections
599- . dropFirst ( self . startingGeneralPurposeConnections)
600- /// if we have additional requests, we will start more connections while respecting the user defined `maximumConcurrentConnections`
601- . prefix ( self . maximumAdditionalGeneralPurposeConnections)
602- . map { eventLoop in
603- ( self . createNewConnection ( on: eventLoop) , eventLoop)
595+
596+ var connectionToCreate = requiredEventLoopOfPendingRequests
597+ . flatMap { ( eventLoop, requestCount) -> [ ( Connection . ID , EventLoop ) ] in
598+ let connectionsToStart = max ( requestCount - startingRequiredEventLoopConnectionCount[ eventLoop. id, default: 0 ] , 0 )
599+ return ( 0 ..< connectionsToStart) . lazy. map { _ in
600+ ( self . createNewOverflowConnection ( on: eventLoop) , eventLoop)
604601 }
605- )
602+ }
603+
604+ /// create new connections for requests without a required event loop
605+
606+ /// we may already start connections for those requests and do not want to start to many
607+ /// this integer keeps track of the number of connections which do not yet have any requests assigned
608+ var unusedGeneralPurposeConnections = self . startingGeneralPurposeConnections
609+
610+ outerLoop: for (eventLoop, requestCount) in generalPurposeRequestCountGroupedByPreferredEventLoop {
611+ let connectionsToStart = max ( requestCount - unusedGeneralPurposeConnections, 0 )
612+ unusedGeneralPurposeConnections -= min ( requestCount, unusedGeneralPurposeConnections)
613+ for _ in 0 ..< connectionsToStart {
614+ guard self . canGrow else {
615+ break outerLoop
616+ }
617+ connectionToCreate. append ( ( self . createNewConnection ( on: eventLoop) , eventLoop) )
618+ }
619+ }
606620
607- return createConnections
621+ return connectionToCreate
608622 }
609623
610624 // MARK: Shutdown
0 commit comments