diff --git a/Tests/AsyncHTTPClientTests/HTTP2ClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTP2ClientTests+XCTest.swift index ffe9c14a1..753cf9e37 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2ClientTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2ClientTests+XCTest.swift @@ -29,6 +29,10 @@ extension HTTP2ClientTests { ("testConcurrentRequests", testConcurrentRequests), ("testConcurrentRequestsFromDifferentThreads", testConcurrentRequestsFromDifferentThreads), ("testConcurrentRequestsWorkWithRequiredEventLoop", testConcurrentRequestsWorkWithRequiredEventLoop), + ("testUncleanShutdownCancelsExecutingAndQueuedTasks", testUncleanShutdownCancelsExecutingAndQueuedTasks), + ("testCancelingRunningRequest", testCancelingRunningRequest), + ("testStressCancelingRunningRequestFromDifferentThreads", testStressCancelingRunningRequestFromDifferentThreads), + ("testPlatformConnectErrorIsForwardedOnTimeout", testPlatformConnectErrorIsForwardedOnTimeout), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift b/Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift index bef964577..34076f75c 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift @@ -19,16 +19,19 @@ #endif import Logging import NIOCore +import NIOHTTP1 import NIOPosix import NIOSSL import XCTest class HTTP2ClientTests: XCTestCase { - func makeDefaultHTTPClient() -> HTTPClient { + func makeDefaultHTTPClient( + eventLoopGroupProvider: HTTPClient.EventLoopGroupProvider = .createNew + ) -> HTTPClient { var tlsConfig = TLSConfiguration.makeClientConfiguration() tlsConfig.certificateVerification = .none return HTTPClient( - eventLoopGroupProvider: .createNew, + eventLoopGroupProvider: eventLoopGroupProvider, configuration: HTTPClient.Configuration( tlsConfiguration: tlsConfig, httpVersion: .automatic @@ -37,6 +40,18 @@ class HTTP2ClientTests: XCTestCase { ) } + func makeClientWithActiveHTTP2Connection( + to bin: HTTPBin, + eventLoopGroupProvider: HTTPClient.EventLoopGroupProvider = .createNew + ) -> HTTPClient { + let client = self.makeDefaultHTTPClient(eventLoopGroupProvider: eventLoopGroupProvider) + var response: HTTPClient.Response? + XCTAssertNoThrow(response = try client.get(url: "https://localhost:\(bin.port)/get").wait()) + XCTAssertEqual(.ok, response?.status) + XCTAssertEqual(response?.version, .http2) + return client + } + func testSimpleGet() { let bin = HTTPBin(.http2(compress: false)) defer { XCTAssertNoThrow(try bin.shutdown()) } @@ -92,7 +107,7 @@ class HTTP2ClientTests: XCTestCase { for _ in 0..] = [] + XCTAssertNoThrow(results = try EventLoopFuture + .whenAllComplete(responses, on: clientGroup.next()) + .timeout(after: .seconds(2)) + .wait()) + + for result in results { + switch result { + case .success: + XCTFail("Shouldn't succeed") + case .failure(let error): + XCTAssertEqual(error as? HTTPClientError, .cancelled) + } + } + } + + func testCancelingRunningRequest() { + let bin = HTTPBin(.http2(compress: false)) { _ in SendHeaderAndWaitChannelHandler() } + defer { XCTAssertNoThrow(try bin.shutdown()) } + let client = self.makeDefaultHTTPClient() + defer { XCTAssertNoThrow(try client.syncShutdown()) } + + var maybeRequest: HTTPClient.Request? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(bin.port)")) + guard let request = maybeRequest else { return } + + var task: HTTPClient.Task! + let delegate = HeadReceivedCallback { _ in + // request is definitely running because we just received a head from the server + task.cancel() + } + task = client.execute( + request: request, + delegate: delegate + ) + + XCTAssertThrowsError(try task.futureResult.timeout(after: .seconds(2)).wait()) { + XCTAssertEqual($0 as? HTTPClientError, .cancelled) + } + } + + func testStressCancelingRunningRequestFromDifferentThreads() { + let bin = HTTPBin(.http2(compress: false)) { _ in SendHeaderAndWaitChannelHandler() } + defer { XCTAssertNoThrow(try bin.shutdown()) } + let client = self.makeDefaultHTTPClient() + defer { XCTAssertNoThrow(try client.syncShutdown()) } + let cancelPool = MultiThreadedEventLoopGroup(numberOfThreads: 10) + defer { XCTAssertNoThrow(try cancelPool.syncShutdownGracefully()) } + + var maybeRequest: HTTPClient.Request? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(bin.port)")) + guard let request = maybeRequest else { return } + + let tasks = (0..<100).map { _ -> HTTPClient.Task in + var task: HTTPClient.Task! + let delegate = HeadReceivedCallback { _ in + // request is definitely running because we just received a head from the server + cancelPool.next().execute { + // canceling from a different thread + task.cancel() + } + } + task = client.execute( + request: request, + delegate: delegate + ) + return task + } + + for task in tasks { + XCTAssertThrowsError(try task.futureResult.timeout(after: .seconds(2)).wait()) { + XCTAssertEqual($0 as? HTTPClientError, .cancelled) + } + } + } + + func testPlatformConnectErrorIsForwardedOnTimeout() { + let bin = HTTPBin(.http2(compress: false)) + let clientGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + let el1 = clientGroup.next() + let el2 = clientGroup.next() + defer { XCTAssertNoThrow(try clientGroup.syncShutdownGracefully()) } + var tlsConfig = TLSConfiguration.makeClientConfiguration() + tlsConfig.certificateVerification = .none + let client = HTTPClient( + eventLoopGroupProvider: .shared(clientGroup), + configuration: HTTPClient.Configuration( + tlsConfiguration: tlsConfig, + timeout: .init(connect: .milliseconds(1000)), + httpVersion: .automatic + ), + backgroundActivityLogger: Logger(label: "HTTPClient", factory: StreamLogHandler.standardOutput(label:)) + ) + defer { XCTAssertNoThrow(try client.syncShutdown()) } + + var maybeRequest1: HTTPClient.Request? + XCTAssertNoThrow(maybeRequest1 = try HTTPClient.Request(url: "https://localhost:\(bin.port)/get")) + guard let request1 = maybeRequest1 else { return } + + let task1 = client.execute(request: request1, delegate: ResponseAccumulator(request: request1), eventLoop: .delegateAndChannel(on: el1)) + var response1: ResponseAccumulator.Response? + XCTAssertNoThrow(response1 = try task1.wait()) + + XCTAssertEqual(.ok, response1?.status) + XCTAssertEqual(response1?.version, .http2) + let serverPort = bin.port + XCTAssertNoThrow(try bin.shutdown()) + // client is now in HTTP/2 state and the HTTPBin is closed + // start a new server on the old port which closes all connections immediately + let serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try serverGroup.syncShutdownGracefully()) } + var maybeServer: Channel? + XCTAssertNoThrow(maybeServer = try ServerBootstrap(group: serverGroup) + .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) + .childChannelInitializer { channel in + channel.close() + } + .childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) + .bind(host: "0.0.0.0", port: serverPort) + .wait()) + guard let server = maybeServer else { return } + defer { XCTAssertNoThrow(try server.close().wait()) } + + var maybeRequest2: HTTPClient.Request? + XCTAssertNoThrow(maybeRequest2 = try HTTPClient.Request(url: "https://localhost:\(serverPort)/")) + guard let request2 = maybeRequest2 else { return } + + let task2 = client.execute(request: request2, delegate: ResponseAccumulator(request: request2), eventLoop: .delegateAndChannel(on: el2)) + XCTAssertThrowsError(try task2.wait()) { error in + XCTAssertNil( + error as? HTTPClientError, + "error should be some platform specific error that the connection is closed/reset by the other side" + ) + } + } +} + +private final class HeadReceivedCallback: HTTPClientResponseDelegate { + typealias Response = Void + private let didReceiveHeadCallback: (HTTPResponseHead) -> Void + init(didReceiveHead: @escaping (HTTPResponseHead) -> Void) { + self.didReceiveHeadCallback = didReceiveHead + } + + func didReceiveHead(task: HTTPClient.Task, _ head: HTTPResponseHead) -> EventLoopFuture { + self.didReceiveHeadCallback(head) + return task.eventLoop.makeSucceededVoidFuture() + } + + func didFinishRequest(task: HTTPClient.Task) throws {} +} + +/// sends some headers and waits indefinitely afterwards +private final class SendHeaderAndWaitChannelHandler: ChannelInboundHandler { + typealias InboundIn = HTTPServerRequestPart + typealias OutboundOut = HTTPServerResponsePart + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let requestPart = self.unwrapInboundIn(data) + switch requestPart { + case .head: + context.writeAndFlush(self.wrapOutboundOut(.head(HTTPResponseHead( + version: HTTPVersion(major: 1, minor: 1), + status: .ok + )) + ), promise: nil) + case .body, .end: + return + } + } }