@@ -66,7 +66,7 @@ final class RequestBagTests: XCTestCase {
6666 )
6767 XCTAssert ( bag. task. eventLoop === embeddedEventLoop)
6868
69- let executor = MockRequestExecutor ( )
69+ let executor = MockRequestExecutor ( pauseRequestBodyPartStreamAfterASingleWrite : true )
7070
7171 bag. willExecuteRequest ( executor)
7272
@@ -294,6 +294,71 @@ final class RequestBagTests: XCTestCase {
294294 XCTAssertEqual ( $0 as? HTTPClientError , . cancelled)
295295 }
296296 }
297+
298+ func testHTTPUploadIsCancelledEvenThoughRequestSucceeds( ) {
299+ let embeddedEventLoop = EmbeddedEventLoop ( )
300+ defer { XCTAssertNoThrow ( try embeddedEventLoop. syncShutdownGracefully ( ) ) }
301+ let logger = Logger ( label: " test " )
302+
303+ var maybeRequest : HTTPClient . Request ?
304+ let writeSecondPartPromise = embeddedEventLoop. makePromise ( of: Void . self)
305+
306+ XCTAssertNoThrow ( maybeRequest = try HTTPClient . Request (
307+ url: " https://swift.org " ,
308+ method: . POST,
309+ headers: [ " content-length " : " 12 " ] ,
310+ body: . stream( length: 12 ) { writer -> EventLoopFuture < Void > in
311+ var firstWriteSuccess = false
312+ return writer. write ( . byteBuffer( . init( bytes: 0 ... 3 ) ) ) . flatMap { _ in
313+ firstWriteSuccess = true
314+
315+ return writeSecondPartPromise. futureResult
316+ } . flatMap {
317+ return writer. write ( . byteBuffer( . init( bytes: 4 ... 7 ) ) )
318+ } . always { result in
319+ XCTAssertTrue ( firstWriteSuccess)
320+
321+ guard case . failure( let error) = result else {
322+ return XCTFail ( " Expected the second write to fail " )
323+ }
324+ XCTAssertEqual ( error as? HTTPClientError , . requestStreamCancelled)
325+ }
326+ }
327+ ) )
328+ guard let request = maybeRequest else { return XCTFail ( " Expected to have a request " ) }
329+
330+ let delegate = UploadCountingDelegate ( eventLoop: embeddedEventLoop)
331+ let bag = RequestBag (
332+ request: request,
333+ eventLoopPreference: . delegate( on: embeddedEventLoop) ,
334+ task: . init( eventLoop: embeddedEventLoop, logger: logger) ,
335+ redirectHandler: nil ,
336+ connectionDeadline: . now( ) + . seconds( 30 ) ,
337+ idleReadTimeout: nil ,
338+ delegate: delegate
339+ )
340+
341+ let executor = MockRequestExecutor ( )
342+ bag. willExecuteRequest ( executor)
343+
344+ XCTAssertEqual ( delegate. hitDidSendRequestHead, 0 )
345+ XCTAssertEqual ( delegate. hitDidSendRequest, 0 )
346+ bag. requestHeadSent ( )
347+ XCTAssertEqual ( delegate. hitDidSendRequestHead, 1 )
348+ XCTAssertEqual ( delegate. hitDidSendRequest, 0 )
349+
350+ bag. resumeRequestBodyStream ( )
351+ XCTAssertEqual ( executor. nextBodyPart ( ) , . body( . byteBuffer( . init( bytes: 0 ... 3 ) ) ) )
352+ // receive a 301 response immediately.
353+ bag. receiveResponseHead ( . init( version: . http1_1, status: . movedPermanently) )
354+ bag. succeedRequest ( . init( ) )
355+
356+ // if we now write our second part of the response this should fail the backpressure promise
357+ writeSecondPartPromise. succeed ( ( ) )
358+
359+ XCTAssertEqual ( delegate. receivedHead? . status, . movedPermanently)
360+ XCTAssertNoThrow ( try bag. task. futureResult. wait ( ) )
361+ }
297362}
298363
299364class MockRequestExecutor : HTTPRequestExecutor {
@@ -302,11 +367,15 @@ class MockRequestExecutor: HTTPRequestExecutor {
302367 case endOfStream
303368 }
304369
370+ let pauseRequestBodyPartStreamAfterASingleWrite : Bool
371+
305372 private( set) var requestBodyParts = CircularBuffer < RequestParts > ( )
306373 private( set) var isCancelled : Bool = false
307374 private( set) var signalledDemandForResponseBody : Bool = false
308375
309- init ( ) { }
376+ init ( pauseRequestBodyPartStreamAfterASingleWrite: Bool = false ) {
377+ self . pauseRequestBodyPartStreamAfterASingleWrite = pauseRequestBodyPartStreamAfterASingleWrite
378+ }
310379
311380 func nextBodyPart( ) -> RequestParts ? {
312381 guard !self . requestBodyParts. isEmpty else { return nil }
@@ -321,7 +390,7 @@ class MockRequestExecutor: HTTPRequestExecutor {
321390 // data is already scheduled. If we call pause here, once, after the second call new subsequent
322391 // calls should not be scheduled.
323392 func writeRequestBodyPart( _ part: IOData , request: HTTPExecutingRequest ) {
324- if self . requestBodyParts. isEmpty {
393+ if self . requestBodyParts. isEmpty, self . pauseRequestBodyPartStreamAfterASingleWrite {
325394 request. pauseRequestBodyStream ( )
326395 }
327396 self . requestBodyParts. append ( . body( part) )
0 commit comments