Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

implement new streamwriter api#291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
artemredkin wants to merge2 commits intoswift-server:main
base:main
Choose a base branch
Loading
fromartemredkin:revamp_stream_writer
Open
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 108 additions & 8 deletionsSources/AsyncHTTPClient/HTTPHandler.swift
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -32,32 +32,92 @@ extension HTTPClient {
///
/// - parameters:
/// - closure: function that will be called to write actual bytes to the channel.
@available(*, deprecated, message: "StreamWriter is deprecated, please use StreamWriter2")
public init(closure: @escaping (IOData) -> EventLoopFuture<Void>) {
self.closure = closure
}

// This is needed so we don't have build warnings in the client itself
init(internalClosure: @escaping (IOData) -> EventLoopFuture<Void>) {
self.closure = internalClosure
}

/// Write data to server.
///
/// - parameters:
/// - data: `IOData` to write.
@available(*, deprecated, message: "StreamWriter is deprecated, please use StreamWriter2")
public func write(_ data: IOData) -> EventLoopFuture<Void> {
return self.closure(data)
}
}

public struct StreamWriter2 {
public let allocator: ByteBufferAllocator
let onChunk: (IOData) -> EventLoopFuture<Void>
let onComplete: EventLoopPromise<Void>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Is there a reason we're writing this as a callback-taking type rather than just holding onto the task object?

Copy link
CollaboratorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

and "forward" all write requests to it's channel? that might be a good idea, will try that, thanks!


public init(allocator: ByteBufferAllocator, onChunk: @escaping (IOData) -> EventLoopFuture<Void>, onComplete: EventLoopPromise<Void>) {
self.allocator = allocator
self.onChunk = onChunk
self.onComplete = onComplete
}

public func write(_ buffer: ByteBuffer) -> EventLoopFuture<Void> {
return self.onChunk(.byteBuffer(buffer))
}

public func write(_ data: IOData) -> EventLoopFuture<Void> {
return self.onChunk(data)
}

public func write(_ buffer: ByteBuffer, promise: EventLoopPromise<Void>?) {
self.onChunk(.byteBuffer(buffer)).cascade(to: promise)
}

public func write(_ data: IOData, promise: EventLoopPromise<Void>?) {
self.onChunk(data).cascade(to: promise)
}

public func end() {
self.onComplete.succeed(())
}

public func fail(_ error: Error) {
self.onComplete.fail(error)
}
}

/// Body size. Request validation will be failed with `HTTPClientErrors.contentLengthMissing` if nil,
/// unless `Trasfer-Encoding: chunked` header is set.
public var length: Int?
/// Body chunk provider.
public var stream: (StreamWriter) -> EventLoopFuture<Void>
var stream2: ((StreamWriter2) -> Void)?

@available(*, deprecated, message: "StreamWriter is deprecated, please use StreamWriter2")
init(length: Int?, stream: @escaping (StreamWriter) -> EventLoopFuture<Void>) {
self.length = length
self.stream = stream
self.stream2 = nil
}

init(length: Int?, stream: @escaping (StreamWriter2) -> Void) {
self.length = length
self.stream = { _ in
preconditionFailure("stream writer 2 was called")
}
self.stream2 = stream
}

/// Create and stream body using `ByteBuffer`.
///
/// - parameters:
/// - buffer: Body `ByteBuffer` representation.
public static func byteBuffer(_ buffer: ByteBuffer) -> Body {
return Body(length: buffer.readableBytes) { writer in
writer.write(.byteBuffer(buffer))
return Body(length: buffer.readableBytes) { (writer: StreamWriter2) in
writer.write(.byteBuffer(buffer), promise: nil)
writer.end()
}
}

Expand All@@ -67,17 +127,30 @@ extension HTTPClient {
/// - length: Body size. Request validation will be failed with `HTTPClientErrors.contentLengthMissing` if nil,
/// unless `Transfer-Encoding: chunked` header is set.
/// - stream: Body chunk provider.
@available(*, deprecated, message: "StreamWriter is deprecated, please use StreamWriter2 instead")
public static func stream(length: Int? = nil, _ stream: @escaping (StreamWriter) -> EventLoopFuture<Void>) -> Body {
return Body(length: length, stream: stream)
}

/// Create and stream body using `StreamWriter`.
///
/// - parameters:
/// - length: Body size. Request validation will be failed with `HTTPClientErrors.contentLengthMissing` if nil,
/// unless `Transfer-Encoding: chunked` header is set.
/// - stream: Body chunk provider.
public static func stream2(length: Int? = nil, _ stream: @escaping (StreamWriter2) -> Void) -> Body {
Copy link
CollaboratorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I we add method with the same name but different type we will break compilation, because some calls could become ambiguous 😢

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Given that there is discussion about possibly breaking API in the nearish futureanyway, we may find we want to delay this until then to resolve this problem.

return Body(length: length, stream: stream)
}

/// Create and stream body using `Data`.
///
/// - parameters:
/// - data: Body `Data` representation.
public static func data(_ data: Data) -> Body {
return Body(length: data.count) { writer in
writer.write(.byteBuffer(ByteBuffer(bytes: data)))
return Body(length: data.count) { (writer: StreamWriter2) in
let buffer = writer.allocator.buffer(data: data)
writer.write(.byteBuffer(buffer), promise: nil)
writer.end()
}
}

Expand All@@ -86,8 +159,10 @@ extension HTTPClient {
/// - parameters:
/// - string: Body `String` representation.
public static func string(_ string: String) -> Body {
return Body(length: string.utf8.count) { writer in
writer.write(.byteBuffer(ByteBuffer(string: string)))
return Body(length: string.utf8.count) { (writer: StreamWriter2) in
let buffer = writer.allocator.buffer(string: string)
writer.write(.byteBuffer(buffer), promise: nil)
writer.end()
}
}
}
Expand DownExpand Up@@ -874,7 +949,32 @@ extension TaskHandler: ChannelDuplexHandler {
let channel = context.channel

func doIt() -> EventLoopFuture<Void> {
return body.stream(HTTPClient.Body.StreamWriter { part in
if let stream2 = body.stream2 {
let completion = channel.eventLoop.makePromise(of: Void.self)
stream2(HTTPClient.Body.StreamWriter2(allocator: channel.allocator, onChunk: { part in
let promise = self.task.eventLoop.makePromise(of: Void.self)
// All writes have to be switched to the channel EL if channel and task ELs differ
if channel.eventLoop.inEventLoop {
self.writeBodyPart(context: context, part: part, promise: promise)
} else {
channel.eventLoop.execute {
self.writeBodyPart(context: context, part: part, promise: promise)
}
}

promise.futureResult.whenFailure { error in
completion.fail(error)
}

return promise.futureResult.map {
self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart)
}
}, onComplete: completion))

return completion.futureResult
}

return body.stream(HTTPClient.Body.StreamWriter(internalClosure: { part in
let promise = self.task.eventLoop.makePromise(of: Void.self)
// All writes have to be switched to the channel EL if channel and task ELs differ
if channel.eventLoop.inEventLoop {
Expand All@@ -888,7 +988,7 @@ extension TaskHandler: ChannelDuplexHandler {
return promise.futureResult.map {
self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart)
}
})
}))
}

// Callout to the user to start body streaming should be on task EL
Expand Down
57 changes: 31 additions & 26 deletionsTests/AsyncHTTPClientTests/HTTPClientInternalTests.swift
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -169,17 +169,19 @@ class HTTPClientInternalTests: XCTestCase {
XCTAssertNoThrow(try httpBin.shutdown())
}

letbody:HTTPClient.Body=.stream(length:50){ writerin
letbody:HTTPClient.Body=.stream2(length:50){ writerin
do{
varrequest=tryRequest(url:"http://localhost:\(httpBin.port)/events/10/1")
request.headers.add(name:"Accept", value:"text/event-stream")

letdelegate=HTTPClientCopyingDelegate{ partin
writer.write(.byteBuffer(part))
writer.write(part)
}
httpClient.execute(request: request, delegate: delegate).futureResult.whenComplete{ _in
writer.end()
}
return httpClient.execute(request: request, delegate: delegate).futureResult
}catch{
return httpClient.eventLoopGroup.next().makeFailedFuture(error)
writer.fail(error)
}
}

Expand All@@ -198,23 +200,25 @@ class HTTPClientInternalTests: XCTestCase {
XCTAssertNoThrow(try httpBin.shutdown())
}

varbody:HTTPClient.Body=.stream(length:50){_in
httpClient.eventLoopGroup.next().makeFailedFuture(HTTPClientError.invalidProxyResponse)
varbody:HTTPClient.Body=.stream2(length:50){writerin
writer.fail(HTTPClientError.invalidProxyResponse)
}

XCTAssertThrowsError(try httpClient.post(url:"http://localhost:\(httpBin.port)/post", body: body).wait())

body=.stream(length:50){_in
body=.stream2(length:50){writerin
do{
varrequest=tryRequest(url:"http://localhost:\(httpBin.port)/events/10/1")
request.headers.add(name:"Accept", value:"text/event-stream")

letdelegate=HTTPClientCopyingDelegate{ _in
httpClient.eventLoopGroup.next().makeFailedFuture(HTTPClientError.invalidProxyResponse)
}
return httpClient.execute(request: request, delegate: delegate).futureResult
httpClient.execute(request: request, delegate: delegate).futureResult.whenComplete{ _in
writer.end()
}
}catch{
return httpClient.eventLoopGroup.next().makeFailedFuture(error)
writer.fail(error)
}
}

Expand DownExpand Up@@ -432,11 +436,11 @@ class HTTPClientInternalTests: XCTestCase {
XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose:true))
}

letbody:HTTPClient.Body=.stream(length:8){ writerin
letbuffer=ByteBuffer(string:"1234")
returnwriter.write(.byteBuffer(buffer)).flatMap{
letbuffer=ByteBuffer(string:"4321")
return writer.write(.byteBuffer(buffer))
letbody:HTTPClient.Body=.stream2(length:8){ writerin
writer.write(writer.allocator.buffer(string:"1234")).whenComplete{ _in
writer.write(writer.allocator.buffer(string:"4321")).whenComplete{ _in
writer.end()
}
}
}

Expand DownExpand Up@@ -885,13 +889,13 @@ class HTTPClientInternalTests: XCTestCase {
letel2= group.next()
XCTAssert(el1!== el2)

letbody:HTTPClient.Body=.stream(length:8){ writerin
letbody:HTTPClient.Body=.stream2(length:8){ writerin
XCTAssert(el1.inEventLoop)
letbuffer=ByteBuffer(string:"1234")
return writer.write(.byteBuffer(buffer)).flatMap{
return writer.write(writer.allocator.buffer(string:"1234")).whenComplete{ _in
XCTAssert(el1.inEventLoop)
letbuffer=ByteBuffer(string:"4321")
return writer.write(.byteBuffer(buffer))
writer.write(writer.allocator.buffer(string:"4321")).whenComplete{ _in
writer.end()
}
}
}
letrequest=tryHTTPClient.Request(url:"http://localhost:\(httpBin.port)/post", method:.POST, body: body)
Expand DownExpand Up@@ -921,17 +925,17 @@ class HTTPClientInternalTests: XCTestCase {
XCTAssert(el1!== el2)

lettaskPromise= group.next().makePromise(of:HTTPClient.Task<HTTPClient.Response>.self)
letbody:HTTPClient.Body=.stream(length:8){ writerin
letbody:HTTPClient.Body=.stream2(length:8){ writerin
XCTAssert(el1.inEventLoop)
letbuffer=ByteBuffer(string:"1234")
return writer.write(.byteBuffer(buffer)).flatMap{
writer.write(writer.allocator.buffer(string:"1234")).whenComplete{ _in
XCTAssert(el1.inEventLoop)
letbuffer=ByteBuffer(string:"4321")
return taskPromise.futureResult.map{(task:HTTPClient.Task<HTTPClient.Response>)->Voidin
XCTAssertNotNil(task.connection)
XCTAssert(task.connection?.channel.eventLoop=== el2)
}.flatMap{
writer.write(.byteBuffer(buffer))
writer.write(writer.allocator.buffer(string:"4321"))
}.whenComplete{ _in
writer.end()
}
}
}
Expand DownExpand Up@@ -1070,8 +1074,9 @@ class HTTPClientInternalTests: XCTestCase {
try channel.pipeline.addHandler(handler).wait()

varrequest=tryRequest(url:"http://localhost:8080/post")
request.body=.stream(length:1){ writerin
writer.write(.byteBuffer(ByteBuffer(string:"1234")))
request.body=.stream2(length:1){ writerin
writer.write(writer.allocator.buffer(string:"1234"), promise:nil)
writer.end()
}

XCTAssertThrowsError(try channel.writeOutbound(request))
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp