Skip to content

Commit 6af6121

Browse files
committed
remove buffer
1 parent 8388b40 commit 6af6121

File tree

2 files changed

+23
-52
lines changed

2 files changed

+23
-52
lines changed

Coder Desktop/Proto/Speaker.swift

+19-43
Original file line numberDiff line numberDiff line change
@@ -95,48 +95,6 @@ actor Speaker<SendMsg: RPCMessage & Message, RecvMsg: RPCMessage & Message> {
9595
try _ = await hndsh.handshake()
9696
}
9797

98-
public func start() {
99-
guard readLoopTask == nil else {
100-
logger.error("speaker is already running")
101-
return
102-
}
103-
readLoopTask = Task {
104-
do throws(ReceiveError) {
105-
for try await msg in try await self.receiver.messages() {
106-
guard msg.hasRpc else {
107-
await messageBuffer.push(.message(msg))
108-
continue
109-
}
110-
guard msg.rpc.msgID == 0 else {
111-
let req = RPCRequest<SendMsg, RecvMsg>(req: msg, sender: self.sender)
112-
await messageBuffer.push(.RPC(req))
113-
continue
114-
}
115-
guard msg.rpc.responseTo == 0 else {
116-
self.logger.debug("got RPC reply for msgID \(msg.rpc.responseTo)")
117-
do throws(RPCError) {
118-
try await self.secretary.route(reply: msg)
119-
} catch {
120-
self.logger.error(
121-
"couldn't route RPC reply for \(msg.rpc.responseTo): \(error)")
122-
}
123-
continue
124-
}
125-
}
126-
} catch {
127-
self.logger.error("failed to receive messages: \(error)")
128-
throw error
129-
}
130-
}
131-
}
132-
133-
func wait() async throws {
134-
guard let task = readLoopTask else {
135-
return
136-
}
137-
try await task.value
138-
}
139-
14098
/// Send a unary RPC message and handle the response
14199
func unaryRPC(_ req: SendMsg) async throws -> RecvMsg {
142100
return try await withCheckedThrowingContinuation { continuation in
@@ -212,7 +170,25 @@ extension Speaker: AsyncSequence, AsyncIteratorProtocol {
212170
}
213171

214172
func next() async throws -> IncomingMessage? {
215-
return await messageBuffer.next()
173+
for try await msg in try await receiver.messages() {
174+
guard msg.hasRpc else {
175+
return .message(msg)
176+
}
177+
guard msg.rpc.msgID == 0 else {
178+
return .RPC(RPCRequest<SendMsg, RecvMsg>(req: msg, sender: sender))
179+
}
180+
guard msg.rpc.responseTo == 0 else {
181+
logger.debug("got RPC reply for msgID \(msg.rpc.responseTo)")
182+
do throws(RPCError) {
183+
try await self.secretary.route(reply: msg)
184+
} catch {
185+
logger.error(
186+
"couldn't route RPC reply for \(msg.rpc.responseTo): \(error)")
187+
}
188+
continue
189+
}
190+
}
191+
return nil
216192
}
217193
}
218194

Coder Desktop/ProtoTests/SpeakerTests.swift

+4-9
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ struct SpeakerTests: Sendable {
4040
}
4141

4242
@Test func handleSingleMessage() async throws {
43-
await uut.start()
44-
4543
var s = Vpn_ManagerMessage()
4644
s.start = Vpn_StartRequest()
4745
await #expect(throws: Never.self) {
@@ -54,12 +52,9 @@ struct SpeakerTests: Sendable {
5452
}
5553
#expect(msg.msg == .start(Vpn_StartRequest()))
5654
try await sender.close()
57-
try await uut.wait()
5855
}
5956

6057
@Test func handleRPC() async throws {
61-
await uut.start()
62-
6358
var s = Vpn_ManagerMessage()
6459
s.start = Vpn_StartRequest()
6560
s.rpc = Vpn_RPC()
@@ -89,12 +84,13 @@ struct SpeakerTests: Sendable {
8984
#expect(count == 1)
9085
}
9186
try await sender.close()
92-
try await uut.wait()
9387
}
9488

9589
@Test func sendRPCs() async throws {
96-
await uut.start()
97-
90+
// Speaker must be reading from the receiver for `unaryRPC` to return
91+
Task {
92+
for try await _ in uut {}
93+
}
9894
async let managerDone = Task {
9995
var count = 0
10096
for try await req in try await receiver.messages() {
@@ -118,6 +114,5 @@ struct SpeakerTests: Sendable {
118114
await uut.closeWrite()
119115
_ = await managerDone
120116
try await sender.close()
121-
try await uut.wait()
122117
}
123118
}

0 commit comments

Comments
 (0)