Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite9f5c6f

Browse files
chore: refactor speaker & handshaker into actors (#15)
Instead of relying on class inheritance, the new Speaker can composed into whatever would like to speak the CoderVPN protocol, and messages can be handled by iterating over the speaker itself e.g:```swiftenum IncomingMessage { case message(RecvMsg) case RPC(RPCRequest<SendMsg, RecvMsg>)}``````swiftfor try await msg in speaker { switch msg { case let .message(msg): // Handle message that doesn't require a response case let .RPC(req): // Handle incoming RPC }}```
1 parentae65c20 commite9f5c6f

File tree

3 files changed

+81
-118
lines changed

3 files changed

+81
-118
lines changed

‎Coder Desktop/Proto/Receiver.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ actor Receiver<RecvMsg: Message> {
5757

5858
/// Starts reading protocol messages from the `DispatchIO` channel and returns them as an `AsyncStream` of messages.
5959
/// On read or decoding error, it logs and closes the stream.
60-
func messages()throws->AsyncStream<RecvMsg>{
60+
func messages()throws(ReceiveError)->AsyncStream<RecvMsg>{
6161
if running{
6262
throwReceiveError.alreadyRunning
6363
}

‎Coder Desktop/Proto/Speaker.swift

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ let newLine = 0x0A
66
letheaderPreamble="codervpn"
77

88
/// A message that has the `rpc` property for recording participation in a unary RPC.
9-
protocolRPCMessage{
9+
protocolRPCMessage:Sendable{
1010
varrpc:Vpn_RPC{getset}
1111
/// Returns true if `rpc` has been explicitly set.
1212
varhasRpc:Bool{get}
@@ -49,8 +49,8 @@ struct ProtoVersion: CustomStringConvertible, Equatable, Codable {
4949
}
5050
}
5151

52-
/// Anabstract base class for implementationsthatneed to communicateusing the VPN protocol.
53-
classSpeaker<SendMsg:RPCMessage&Message, RecvMsg:RPCMessage&Message>{
52+
/// Anactorthatcommunicatesusing the VPN protocol
53+
actorSpeaker<SendMsg:RPCMessage&Message, RecvMsg:RPCMessage&Message>{
5454
privateletlogger=Logger(subsystem:"com.coder.Coder-Desktop", category:"proto")
5555
privateletwriteFD:FileHandle
5656
privateletreadFD:FileHandle
@@ -93,43 +93,6 @@ class Speaker<SendMsg: RPCMessage & Message, RecvMsg: RPCMessage & Message> {
9393
try _=await hndsh.handshake()
9494
}
9595

96-
/// Reads and handles protocol messages.
97-
func readLoop()asyncthrows{
98-
fortryawaitmsgintryawait receiver.messages(){
99-
guard msg.hasRpcelse{
100-
handleMessage(msg)
101-
continue
102-
}
103-
guard msg.rpc.msgID==0else{
104-
letreq=RPCRequest<SendMsg,RecvMsg>(req: msg, sender: sender)
105-
handleRPC(req)
106-
continue
107-
}
108-
guard msg.rpc.responseTo==0else{
109-
logger.debug("got RPC reply for msgID\(msg.rpc.responseTo)")
110-
dothrows(RPCError){
111-
tryawaitself.secretary.route(reply: msg)
112-
} catch{
113-
logger.error(
114-
"couldn't route RPC reply for\(msg.rpc.responseTo):\(error)")
115-
}
116-
continue
117-
}
118-
}
119-
}
120-
121-
/// Handles a single non-RPC message. It is expected that subclasses override this method with their own handlers.
122-
func handleMessage(_ msg: RecvMsg){
123-
// just log
124-
logger.debug("got non-RPC message\(msg.textFormatString())")
125-
}
126-
127-
/// Handle a single RPC request. It is expected that subclasses override this method with their own handlers.
128-
func handleRPC(_ req:RPCRequest<SendMsg,RecvMsg>){
129-
// just log
130-
logger.debug("got RPC message\(req.msg.textFormatString())")
131-
}
132-
13396
/// Send a unary RPC message and handle the response
13497
func unaryRPC(_ req:SendMsg)asyncthrows->RecvMsg{
13598
returntryawaitwithCheckedThrowingContinuation{ continuationin
@@ -166,10 +129,45 @@ class Speaker<SendMsg: RPCMessage & Message, RecvMsg: RPCMessage & Message> {
166129
logger.error("failed to close read file handle:\(error)")
167130
}
168131
}
132+
133+
enumIncomingMessage{
134+
case message(RecvMsg)
135+
case RPC(RPCRequest<SendMsg,RecvMsg>)
136+
}
137+
}
138+
139+
extensionSpeaker:AsyncSequence,AsyncIteratorProtocol{
140+
typealiasElement=IncomingMessage
141+
142+
publicnonisolatedfunc makeAsyncIterator()->Speaker<SendMsg,RecvMsg>{
143+
self
144+
}
145+
146+
func next()asyncthrows->IncomingMessage?{
147+
fortryawaitmsgintryawait receiver.messages(){
148+
guard msg.hasRpcelse{
149+
return.message(msg)
150+
}
151+
guard msg.rpc.msgID==0else{
152+
return.RPC(RPCRequest<SendMsg,RecvMsg>(req: msg, sender: sender))
153+
}
154+
guard msg.rpc.responseTo==0else{
155+
logger.debug("got RPC reply for msgID\(msg.rpc.responseTo)")
156+
dothrows(RPCError){
157+
tryawaitself.secretary.route(reply: msg)
158+
} catch{
159+
logger.error(
160+
"couldn't route RPC reply for\(msg.rpc.responseTo):\(error)")
161+
}
162+
continue
163+
}
164+
}
165+
return nil
166+
}
169167
}
170168

171-
///A class that performs the initial VPN protocol handshake and version negotiation.
172-
classHandshaker:@uncheckedSendable{
169+
///An actor performs the initial VPN protocol handshake and version negotiation.
170+
actor Handshaker{
173171
private let writeFD:FileHandle
174172
privateletdispatch:DispatchIO
175173
privatevartheirData:Data=.init()
@@ -193,17 +191,19 @@ class Handshaker: @unchecked Sendable {
193191
func handshake()asyncthrows->ProtoVersion{
194192
// kick off the read async before we try to write, synchronously, so we don't deadlock, both
195193
// waiting to write with nobody reading.
196-
asynclettheirs=trywithCheckedThrowingContinuation{ contin
197-
continuation= cont
198-
// send in a nil read to kick us off
199-
handleRead(false,nil,0)
194+
letreadTask=Task{
195+
tryawaitwithCheckedThrowingContinuation{ contin
196+
self.continuation= cont
197+
// send in a nil read to kick us off
198+
self.handleRead(false,nil,0)
199+
}
200200
}
201201

202202
letvStr= versions.map{ $0.description}.joined(separator:",")
203203
letours=String(format:"\(headerPreamble)\(role)\(vStr)\n")
204204
try writeFD.write(contentsOf: ours.data(using:.utf8)!)
205205

206-
lettheirData=tryawaittheirs
206+
lettheirData=tryawaitreadTask.value
207207
guardlet theirsString=String(bytes: theirData, encoding:.utf8)else{
208208
throwHandshakeError.invalidHeader("<unparsable:\(theirData)")
209209
}

‎Coder Desktop/ProtoTests/SpeakerTests.swift

Lines changed: 33 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -2,58 +2,19 @@
22
import Foundation
33
import Testing
44

5-
/// A concrete, test class for the abstract Speaker, which overrides the handlers to send things to
6-
/// continuations we set in the test.
7-
classTestTunnel:Speaker<Vpn_TunnelMessage,Vpn_ManagerMessage>,@uncheckedSendable{
8-
privatevarmsgHandler:CheckedContinuation<Vpn_ManagerMessage,Error>?
9-
overridefunc handleMessage(_ msg:Vpn_ManagerMessage){
10-
msgHandler?.resume(returning: msg)
11-
}
12-
13-
/// Runs the given closure asynchronously and returns the next non-RPC message received.
14-
func expectMessage(with closure:
15-
@escaping@Sendable()async->Void)asyncthrows->Vpn_ManagerMessage
16-
{
17-
returntryawaitwithCheckedThrowingContinuation{ continuationin
18-
msgHandler= continuation
19-
Task{
20-
awaitclosure()
21-
}
22-
}
23-
}
24-
25-
privatevarrpcHandler:CheckedContinuation<RPCRequest<Vpn_TunnelMessage,Vpn_ManagerMessage>,Error>?
26-
overridefunc handleRPC(_ req:RPCRequest<Vpn_TunnelMessage,Vpn_ManagerMessage>){
27-
rpcHandler?.resume(returning: req)
28-
}
29-
30-
/// Runs the given closure asynchronously and return the next non-RPC message received
31-
func expectRPC(with closure:
32-
@escaping@Sendable()async->Void)asyncthrows->
33-
RPCRequest<Vpn_TunnelMessage,Vpn_ManagerMessage>
34-
{
35-
returntryawaitwithCheckedThrowingContinuation{ continuationin
36-
rpcHandler= continuation
37-
Task{
38-
awaitclosure()
39-
}
40-
}
41-
}
42-
}
43-
445
@Suite(.timeLimit(.minutes(1)))
456
struct SpeakerTests:Sendable{
467
letpipeMT=Pipe()
478
letpipeTM=Pipe()
48-
letuut:TestTunnel
9+
letuut:Speaker<Vpn_TunnelMessage,Vpn_ManagerMessage>
4910
letsender:Sender<Vpn_ManagerMessage>
5011
letdispatch:DispatchIO
5112
letreceiver:Receiver<Vpn_TunnelMessage>
5213
lethandshaker:Handshaker
5314

5415
init(){
5516
letqueue=DispatchQueue.global(qos:.utility)
56-
uut=TestTunnel(
17+
uut=Speaker(
5718
writeFD: pipeTM.fileHandleForWriting,
5819
readFD: pipeMT.fileHandleForReading
5920
)
@@ -79,39 +40,40 @@ struct SpeakerTests: Sendable {
7940
}
8041

8142
@Testfunc handleSingleMessage()asyncthrows{
82-
asyncletreadDone:()=try uut.readLoop()
83-
84-
letgot=tryawait uut.expectMessage{
85-
vars=Vpn_ManagerMessage()
86-
s.start=Vpn_StartRequest()
87-
await #expect(throws:Never.self){
88-
tryawait sender.send(s)
89-
}
43+
vars=Vpn_ManagerMessage()
44+
s.start=Vpn_StartRequest()
45+
await #expect(throws:Never.self){
46+
tryawait sender.send(s)
47+
}
48+
letgot=try #require(await uut.next())
49+
guard caselet.message(msg)= gotelse{
50+
Issue.record("Received unexpected message from speaker")
51+
return
9052
}
91-
#expect(got.msg==.start(Vpn_StartRequest()))
53+
#expect(msg.msg==.start(Vpn_StartRequest()))
9254
tryawait sender.close()
93-
tryawait readDone
9455
}
9556

9657
@Testfunc handleRPC()asyncthrows{
97-
asyncletreadDone:()=try uut.readLoop()
98-
99-
letgot=tryawait uut.expectRPC{
100-
vars=Vpn_ManagerMessage()
101-
s.start=Vpn_StartRequest()
102-
s.rpc=Vpn_RPC()
103-
s.rpc.msgID=33
104-
await #expect(throws:Never.self){
105-
tryawait sender.send(s)
106-
}
58+
vars=Vpn_ManagerMessage()
59+
s.start=Vpn_StartRequest()
60+
s.rpc=Vpn_RPC()
61+
s.rpc.msgID=33
62+
await #expect(throws:Never.self){
63+
tryawait sender.send(s)
64+
}
65+
letgot=try #require(await uut.next())
66+
guard caselet.RPC(req)= gotelse{
67+
Issue.record("Received unexpected message from speaker")
68+
return
10769
}
108-
#expect(got.msg.msg==.start(Vpn_StartRequest()))
109-
#expect(got.msg.rpc.msgID==33)
70+
#expect(req.msg.msg==.start(Vpn_StartRequest()))
71+
#expect(req.msg.rpc.msgID==33)
11072
varreply=Vpn_TunnelMessage()
11173
reply.start=Vpn_StartResponse()
11274
reply.rpc.responseTo=33
113-
tryawaitgot.sendReply(reply)
114-
uut.closeWrite()
75+
tryawaitreq.sendReply(reply)
76+
awaituut.closeWrite()
11577

11678
varcount=0
11779
await #expect(throws:Never.self){
@@ -122,12 +84,13 @@ struct SpeakerTests: Sendable {
12284
#expect(count==1)
12385
}
12486
tryawait sender.close()
125-
tryawaitreadDone
12687
}
12788

12889
@Testfunc sendRPCs()async throws{
129-
asyncletreadDone:()=try uut.readLoop()
130-
90+
// Speaker must be reading from the receiver for `unaryRPC` to return
91+
letreadDone=Task{
92+
fortryawait_in uut{}
93+
}
13194
asyncletmanagerDone=Task{
13295
varcount=0
13396
fortryawaitreqintryawait receiver.messages(){
@@ -148,9 +111,9 @@ struct SpeakerTests: Sendable {
148111
letgot=tryawait uut.unaryRPC(req)
149112
#expect(got.networkSettings.errorMessage=="test\(i)")
150113
}
151-
uut.closeWrite()
114+
awaituut.closeWrite()
152115
_=await managerDone
153116
tryawait sender.close()
154-
tryawait readDone
117+
tryawait readDone.value
155118
}
156119
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp