Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ extension RequestBodyLength {
case .none:
self = .known(0)
case .byteBuffer(let buffer):
self = .known(buffer.readableBytes)
self = .known(Int64(buffer.readableBytes))
case .sequence(let length, _, _), .asyncSequence(let length, _):
self = length
}
Expand Down
16 changes: 11 additions & 5 deletions Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ extension HTTPClientRequest.Body {
public static func bytes<Bytes: RandomAccessCollection & Sendable>(
_ bytes: Bytes
) -> Self where Bytes.Element == UInt8 {
self.bytes(bytes, length: .known(bytes.count))
self.bytes(bytes, length: .known(Int64(bytes.count)))
}

/// Create an ``HTTPClientRequest/Body-swift.struct`` from a `Sequence` of bytes.
Expand All @@ -140,7 +140,7 @@ extension HTTPClientRequest.Body {
///
/// Caution should be taken with this method to ensure that the `length` is correct. Incorrect lengths
/// will cause unnecessary runtime failures. Setting `length` to ``Length/unknown`` will trigger the upload
/// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)`` will use `Content-Length`.
/// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)-9q0ge`` will use `Content-Length`.
///
/// - parameters:
/// - bytes: The bytes of the request body.
Expand Down Expand Up @@ -225,7 +225,7 @@ extension HTTPClientRequest.Body {
///
/// Caution should be taken with this method to ensure that the `length` is correct. Incorrect lengths
/// will cause unnecessary runtime failures. Setting `length` to ``Length/unknown`` will trigger the upload
/// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)`` will use `Content-Length`.
/// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)-9q0ge`` will use `Content-Length`.
///
/// - parameters:
/// - bytes: The bytes of the request body.
Expand Down Expand Up @@ -265,7 +265,7 @@ extension HTTPClientRequest.Body {
///
/// Caution should be taken with this method to ensure that the `length` is correct. Incorrect lengths
/// will cause unnecessary runtime failures. Setting `length` to ``Length/unknown`` will trigger the upload
/// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)`` will use `Content-Length`.
/// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)-9q0ge`` will use `Content-Length`.
///
/// - parameters:
/// - sequenceOfBytes: The bytes of the request body.
Expand Down Expand Up @@ -293,7 +293,7 @@ extension HTTPClientRequest.Body {
///
/// Caution should be taken with this method to ensure that the `length` is correct. Incorrect lengths
/// will cause unnecessary runtime failures. Setting `length` to ``Length/unknown`` will trigger the upload
/// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)`` will use `Content-Length`.
/// to use `chunked` `Transfer-Encoding`, while using ``Length/known(_:)-9q0ge`` will use `Content-Length`.
///
/// - parameters:
/// - bytes: The bytes of the request body.
Expand Down Expand Up @@ -341,7 +341,13 @@ extension HTTPClientRequest.Body {
public static let unknown: Self = .init(storage: .unknown)

/// The size of the request body is known and exactly `count` bytes
@available(*, deprecated, message: "Use `known(_ count: Int64)` with an explicit Int64 argument instead")
public static func known(_ count: Int) -> Self {
.init(storage: .known(Int64(count)))
}

/// The size of the request body is known and exactly `count` bytes
public static func known(_ count: Int64) -> Self {
.init(storage: .known(count))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct HTTPRequestStateMachine {
/// The request is streaming its request body. `expectedBodyLength` has a value, if the request header contained
/// a `"content-length"` header field. If the request header contained a `"transfer-encoding" = "chunked"`
/// header field, the `expectedBodyLength` is `nil`.
case streaming(expectedBodyLength: Int?, sentBodyBytes: Int, producer: ProducerControlState)
case streaming(expectedBodyLength: Int64?, sentBodyBytes: Int64, producer: ProducerControlState)
/// The request has sent its request body and end.
case endSent
}
Expand Down Expand Up @@ -308,13 +308,13 @@ struct HTTPRequestStateMachine {
// pause. The reason for this is as follows: There might be thread synchronization
// situations in which the producer might not have received the plea to pause yet.

if let expected = expectedBodyLength, sentBodyBytes + part.readableBytes > expected {
if let expected = expectedBodyLength, sentBodyBytes + Int64(part.readableBytes) > expected {
let error = HTTPClientError.bodyLengthMismatch
self.state = .failed(error)
return .failRequest(error, .close(promise))
}

sentBodyBytes += part.readableBytes
sentBodyBytes += Int64(part.readableBytes)

let requestState: RequestState = .streaming(
expectedBodyLength: expectedBodyLength,
Expand Down Expand Up @@ -768,7 +768,7 @@ struct HTTPRequestStateMachine {
}

extension RequestFramingMetadata.Body {
var expectedLength: Int? {
var expectedLength: Int64? {
switch self {
case .fixedSize(let length): return length
case .stream: return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ internal enum RequestBodyLength: Hashable, Sendable {
/// size of the request body is not known before starting the request
case unknown
/// size of the request body is fixed and exactly `count` bytes
case known(_ count: Int)
case known(_ count: Int64)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
struct RequestFramingMetadata: Hashable {
enum Body: Hashable {
case stream
case fixedSize(Int)
case fixedSize(Int64)
}

var connectionClose: Bool
Expand Down
39 changes: 31 additions & 8 deletions Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,28 @@ extension HTTPClient {

/// Body size. If nil,`Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length`
/// header is set with the given `length`.
public var length: Int?
@available(*, deprecated, renamed: "contentLength")
public var length: Int? {
get {
self.contentLength.flatMap { Int($0) }
}
set {
self.contentLength = newValue.flatMap { Int64($0) }
}
}

/// Body size. If nil,`Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length`
/// header is set with the given `contentLength`.
public var contentLength: Int64?

/// Body chunk provider.
public var stream: @Sendable (StreamWriter) -> EventLoopFuture<Void>

@usableFromInline typealias StreamCallback = @Sendable (StreamWriter) -> EventLoopFuture<Void>

@inlinable
init(length: Int?, stream: @escaping StreamCallback) {
self.length = length
init(contentLength: Int64?, stream: @escaping StreamCallback) {
self.contentLength = contentLength.flatMap { $0 }
self.stream = stream
}

Expand All @@ -88,7 +100,7 @@ extension HTTPClient {
/// - parameters:
/// - buffer: Body `ByteBuffer` representation.
public static func byteBuffer(_ buffer: ByteBuffer) -> Body {
return Body(length: buffer.readableBytes) { writer in
return Body(contentLength: Int64(buffer.readableBytes)) { writer in
writer.write(.byteBuffer(buffer))
}
}
Expand All @@ -100,8 +112,19 @@ extension HTTPClient {
/// header is set with the given `length`.
/// - stream: Body chunk provider.
@preconcurrency
@available(*, deprecated, renamed: "stream(contentLength:bodyStream:)")
public static func stream(length: Int? = nil, _ stream: @Sendable @escaping (StreamWriter) -> EventLoopFuture<Void>) -> Body {
return Body(length: length, stream: stream)
return Body(contentLength: length.flatMap { Int64($0) }, stream: stream)
}

/// Create and stream body using ``StreamWriter``.
///
/// - parameters:
/// - contentLength: Body size. If nil, `Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length`
/// header is set with the given `contentLength`.
/// - bodyStream: Body chunk provider.
public static func stream(contentLength: Int64? = nil, bodyStream: @Sendable @escaping (StreamWriter) -> EventLoopFuture<Void>) -> Body {
return Body(contentLength: contentLength, stream: bodyStream)
}

/// Create and stream body using a collection of bytes.
Expand All @@ -111,7 +134,7 @@ extension HTTPClient {
@preconcurrency
@inlinable
public static func bytes<Bytes>(_ bytes: Bytes) -> Body where Bytes: RandomAccessCollection, Bytes: Sendable, Bytes.Element == UInt8 {
return Body(length: bytes.count) { writer in
return Body(contentLength: Int64(bytes.count)) { writer in
if bytes.count <= bagOfBytesToByteBufferConversionChunkSize {
return writer.write(.byteBuffer(ByteBuffer(bytes: bytes)))
} else {
Expand All @@ -125,7 +148,7 @@ extension HTTPClient {
/// - parameters:
/// - string: Body `String` representation.
public static func string(_ string: String) -> Body {
return Body(length: string.utf8.count) { writer in
return Body(contentLength: Int64(string.utf8.count)) { writer in
if string.utf8.count <= bagOfBytesToByteBufferConversionChunkSize {
return writer.write(.byteBuffer(ByteBuffer(string: string)))
} else {
Expand Down Expand Up @@ -858,7 +881,7 @@ extension RequestBodyLength {
self = .known(0)
return
}
guard let length = body.length else {
guard let length = body.contentLength else {
self = .unknown
return
}
Expand Down
56 changes: 56 additions & 0 deletions Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,62 @@ final class AsyncAwaitEndToEndTests: XCTestCase {
}
}

struct AsyncSequenceByteBufferGenerator: AsyncSequence, Sendable, AsyncIteratorProtocol {
typealias Element = ByteBuffer

let chunkSize: Int
let totalChunks: Int
let buffer: ByteBuffer
var chunksGenerated: Int = 0

init(chunkSize: Int, totalChunks: Int) {
self.chunkSize = chunkSize
self.totalChunks = totalChunks
self.buffer = ByteBuffer(repeating: 1, count: self.chunkSize)
}

mutating func next() async throws -> ByteBuffer? {
guard self.chunksGenerated < self.totalChunks else { return nil }

self.chunksGenerated += 1
return self.buffer
}

func makeAsyncIterator() -> AsyncSequenceByteBufferGenerator {
return self
}
}

func testEchoStreamThatHas3GBInTotal() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let bin = HTTPBin(.http1_1()) { _ in HTTPEchoHandler() }
defer { XCTAssertNoThrow(try bin.shutdown()) }

let client: HTTPClient = makeDefaultHTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup))
defer { XCTAssertNoThrow(try client.syncShutdown()) }

let logger = Logger(label: "HTTPClient", factory: StreamLogHandler.standardOutput(label:))

var request = HTTPClientRequest(url: "http://localhost:\(bin.port)/")
request.method = .POST

let sequence = AsyncSequenceByteBufferGenerator(
chunkSize: 4_194_304, // 4MB chunk
totalChunks: 768 // Total = 3GB
)
request.body = .stream(sequence, length: .unknown)

let response: HTTPClientResponse = try await client.execute(request, deadline: .now() + .seconds(30), logger: logger)
XCTAssertEqual(response.headers["content-length"], [])

var receivedBytes: Int64 = 0
for try await part in response.body {
receivedBytes += Int64(part.readableBytes)
}
XCTAssertEqual(receivedBytes, 3_221_225_472) // 3GB
}

func testPostWithAsyncSequenceOfByteBuffers() {
XCTAsyncTest {
let bin = HTTPBin(.http2(compress: false)) { _ in HTTPEchoHandler() }
Expand Down
10 changes: 5 additions & 5 deletions Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 100) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 100) { writer in
testWriter.start(writer: writer)
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down Expand Up @@ -345,7 +345,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
// Advance time by more than the idle write timeout (that's 1 millisecond) to trigger the timeout.
embedded.embeddedEventLoop.advanceTime(by: .milliseconds(2))
return testWriter.start(writer: writer)
Expand Down Expand Up @@ -384,7 +384,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
embedded.isWritable = false
embedded.pipeline.fireChannelWritabilityChanged()
// This should not trigger any errors or timeouts, because the timer isn't running
Expand Down Expand Up @@ -432,7 +432,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 2) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 2) { writer in
return testWriter.start(writer: writer, expectedErrors: [HTTPClientError.cancelled])
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down Expand Up @@ -595,7 +595,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
testWriter.start(writer: writer)
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down
4 changes: 2 additions & 2 deletions Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ class HTTP1ConnectionTests: XCTestCase {
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(
url: "http://localhost/hello/swift",
method: .POST,
body: .stream(length: 4) { writer -> EventLoopFuture<Void> in
func recursive(count: UInt8, promise: EventLoopPromise<Void>) {
body: .stream(contentLength: 4) { writer -> EventLoopFuture<Void> in
@Sendable func recursive(count: UInt8, promise: EventLoopPromise<Void>) {
guard count < 4 else {
return promise.succeed(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {
let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 50)

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 100) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 100) { writer in
testWriter.start(writer: writer)
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down Expand Up @@ -295,7 +295,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {

let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 5)
var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
// Advance time by more than the idle write timeout (that's 1 millisecond) to trigger the timeout.
embedded.embeddedEventLoop.advanceTime(by: .milliseconds(2))
return testWriter.start(writer: writer)
Expand Down Expand Up @@ -335,7 +335,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {

let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 5)
var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
embedded.isWritable = false
embedded.pipeline.fireChannelWritabilityChanged()
// This should not trigger any errors or timeouts, because the timer isn't running
Expand Down Expand Up @@ -385,7 +385,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {

let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 5)
var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 2) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 2) { writer in
return testWriter.start(writer: writer, expectedErrors: [HTTPClientError.cancelled])
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down
Loading