@@ -66,7 +66,7 @@ public class HTTPClient {
6666 public let eventLoopGroup : EventLoopGroup
6767 let eventLoopGroupProvider : EventLoopGroupProvider
6868 let configuration : Configuration
69- let pool : ConnectionPool
69+ let poolManager : HTTPConnectionPool . Manager
7070 var state : State
7171 private let stateLock = Lock ( )
7272
@@ -108,14 +108,20 @@ public class HTTPClient {
108108 #endif
109109 }
110110 self . configuration = configuration
111- self . pool = ConnectionPool ( configuration: configuration,
112- backgroundActivityLogger: backgroundActivityLogger)
111+ self . poolManager = HTTPConnectionPool . Manager (
112+ eventLoopGroup: self . eventLoopGroup,
113+ configuration: self . configuration,
114+ backgroundActivityLogger: backgroundActivityLogger
115+ )
113116 self . state = . upAndRunning
117+
118+ self . poolManager. delegate = self
114119 }
115120
116121 deinit {
117- assert ( self . pool. count == 0 )
118- assert ( self . state == . shutDown, " Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed. " )
122+ guard case . shutDown = self . state else {
123+ preconditionFailure ( " Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed. " )
124+ }
119125 }
120126
121127 /// Shuts down the client and `EventLoopGroup` if it was created by the client.
@@ -189,36 +195,17 @@ public class HTTPClient {
189195 private func shutdown( requiresCleanClose: Bool , queue: DispatchQueue , _ callback: @escaping ( Error ? ) -> Void ) {
190196 do {
191197 try self . stateLock. withLock {
192- if self . state != . upAndRunning {
198+ guard case . upAndRunning = self . state else {
193199 throw HTTPClientError . alreadyShutdown
194200 }
195- self . state = . shuttingDown
201+ self . state = . shuttingDown( requiresCleanClose : requiresCleanClose , callback : callback )
196202 }
197203 } catch {
198204 callback ( error)
199205 return
200206 }
201207
202- self . pool. close ( on: self . eventLoopGroup. next ( ) ) . whenComplete { result in
203- var closeError : Error ?
204- switch result {
205- case . failure( let error) :
206- closeError = error
207- case . success( let cleanShutdown) :
208- if !cleanShutdown, requiresCleanClose {
209- closeError = HTTPClientError . uncleanShutdown
210- }
211-
212- self . shutdownEventLoop ( queue: queue) { eventLoopError in
213- // we prioritise .uncleanShutdown here
214- if let error = closeError {
215- callback ( error)
216- } else {
217- callback ( eventLoopError)
218- }
219- }
220- }
221- }
208+ self . poolManager. shutdown ( )
222209 }
223210
224211 /// Execute `GET` request using specified URL.
@@ -490,7 +477,7 @@ public class HTTPClient {
490477 let taskEL : EventLoop
491478 switch eventLoopPreference. preference {
492479 case . indifferent:
493- taskEL = self . pool . associatedEventLoop ( for : ConnectionPool . Key ( request ) ) ?? self . eventLoopGroup. next ( )
480+ taskEL = self . eventLoopGroup. next ( )
494481 case . delegate( on: let eventLoop) :
495482 precondition ( self . eventLoopGroup. makeIterator ( ) . contains { $0 === eventLoop } , " Provided EventLoop must be part of clients EventLoopGroup. " )
496483 taskEL = eventLoop
@@ -538,77 +525,30 @@ public class HTTPClient {
538525 }
539526
540527 let task = Task < Delegate . Response > ( eventLoop: taskEL, logger: logger)
541- let setupComplete = taskEL. makePromise ( of: Void . self)
542- let connection = self . pool. getConnection ( request,
543- preference: eventLoopPreference,
544- taskEventLoop: taskEL,
545- deadline: deadline,
546- setupComplete: setupComplete. futureResult,
547- logger: logger)
548-
549- let taskHandler = TaskHandler ( task: task,
550- kind: request. kind,
551- delegate: delegate,
552- redirectHandler: redirectHandler,
553- ignoreUncleanSSLShutdown: self . configuration. ignoreUncleanSSLShutdown,
554- logger: logger)
555-
556- connection. flatMap { connection -> EventLoopFuture < Void > in
557- logger. debug ( " got connection for request " ,
558- metadata: [ " ahc-connection " : " \( connection) " ,
559- " ahc-request " : " \( request. method) \( request. url) " ,
560- " ahc-channel-el " : " \( connection. channel. eventLoop) " ,
561- " ahc-task-el " : " \( taskEL) " ] )
562-
563- let channel = connection. channel
564-
565- func prepareChannelForTask0( ) -> EventLoopFuture < Void > {
566- do {
567- let syncPipelineOperations = channel. pipeline. syncOperations
568-
569- if let timeout = self . resolve ( timeout: self . configuration. timeout. read, deadline: deadline) {
570- try syncPipelineOperations. addHandler ( IdleStateHandler ( readTimeout: timeout) )
571- }
572-
573- try syncPipelineOperations. addHandler ( taskHandler)
574- } catch {
575- connection. release ( closing: true , logger: logger)
576- return channel. eventLoop. makeFailedFuture ( error)
577- }
578-
579- task. setConnection ( connection)
580528
581- let isCancelled = task. lock. withLock {
582- task. cancelled
583- }
584-
585- if !isCancelled {
586- return channel. writeAndFlush ( request) . flatMapError { _ in
587- // At this point the `TaskHandler` will already be present
588- // to handle the failure and pass it to the `promise`
589- channel. eventLoop. makeSucceededVoidFuture ( )
590- }
591- } else {
592- return channel. eventLoop. makeSucceededVoidFuture ( )
593- }
529+ let requestBag = RequestBag (
530+ request: request,
531+ eventLoopPreference: eventLoopPreference,
532+ task: task,
533+ redirectHandler: redirectHandler,
534+ connectionDeadline: . now( ) + ( self . configuration. timeout. connect ?? . seconds( 10 ) ) ,
535+ idleReadTimeout: self . configuration. timeout. read,
536+ delegate: delegate
537+ )
538+
539+ var deadlineSchedule : Scheduled < Void > ?
540+ if let deadline = deadline {
541+ deadlineSchedule = taskEL. scheduleTask ( deadline: deadline) {
542+ requestBag. fail ( HTTPClientError . deadlineExceeded)
594543 }
595544
596- if channel. eventLoop. inEventLoop {
597- return prepareChannelForTask0 ( )
598- } else {
599- return channel. eventLoop. flatSubmit {
600- return prepareChannelForTask0 ( )
601- }
602- }
603- } . always { _ in
604- setupComplete. succeed ( ( ) )
605- } . whenFailure { error in
606- taskHandler. callOutToDelegateFireAndForget { task in
607- delegate. didReceiveError ( task: task, error)
545+ task. promise. futureResult. whenComplete { _ in
546+ deadlineSchedule? . cancel ( )
608547 }
609- task. promise. fail ( error)
610548 }
611549
550+ self . poolManager. execute ( request: requestBag)
551+
612552 return task
613553 }
614554
@@ -815,7 +755,7 @@ public class HTTPClient {
815755
816756 enum State {
817757 case upAndRunning
818- case shuttingDown
758+ case shuttingDown( requiresCleanClose : Bool , callback : ( Error ? ) -> Void )
819759 case shutDown
820760 }
821761}
@@ -882,6 +822,22 @@ extension HTTPClient.Configuration {
882822 }
883823}
884824
825+ extension HTTPClient : HTTPConnectionPoolManagerDelegate {
826+ func httpConnectionPoolManagerDidShutdown( _: HTTPConnectionPool . Manager , unclean: Bool ) {
827+ let ( callback, error) = self . stateLock. withLock { ( ) -> ( ( Error ? ) -> Void , Error ? ) in
828+ guard case . shuttingDown( let requiresClean, callback: let callback) = self . state else {
829+ preconditionFailure ( " Why did the pool manager shut down, if it was not instructed to " )
830+ }
831+
832+ self . state = . shutDown
833+ let error : Error ? = ( requiresClean && unclean) ? HTTPClientError . uncleanShutdown : nil
834+ return ( callback, error)
835+ }
836+
837+ callback ( error)
838+ }
839+ }
840+
885841/// Possible client errors.
886842public struct HTTPClientError : Error , Equatable , CustomStringConvertible {
887843 private enum Code : Equatable {
@@ -909,6 +865,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
909865 case incompatibleHeaders
910866 case connectTimeout
911867 case getConnectionFromPoolTimeout
868+ case deadlineExceeded
912869 }
913870
914871 private var code : Code
@@ -973,4 +930,6 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
973930 /// - A connection could not be created within the timout period.
974931 /// - Tasks are not processed fast enough on the existing connections, to process all waiters in time
975932 public static let getConnectionFromPoolTimeout = HTTPClientError ( code: . getConnectionFromPoolTimeout)
933+ /// The request deadline was exceeded. The request was cancelled because of this.
934+ public static let deadlineExceeded = HTTPClientError ( code: . deadlineExceeded)
976935}
0 commit comments