-
1
-
2
-
3
-
4
-
5
-
6
-
7
-
8
-
9
-
10
-
11
-
12
-
13
-
14
-
15
-
16
-
17
-
18
-
19
-
20
-
21
-
22
-
23
-
24
-
25
-
26
-
27
-
28
-
29
-
30
-
31
-
32
-
33
-
34
-
35
-
36
-
37
-
38
-
39
-
40
-
41
-
42
-
43
-
44
-
45
-
46
-
47
-
48
-
49
-
50
-
51
-
52
-
53
-
54
-
55
-
56
-
57
-
58
-
59
-
60
-
61
-
62
-
63
-
64
-
65
-
66
-
67
-
68
-
69
-
70
-
71
-
72
-
73
-
74
-
75
-
76
-
77
-
78
-
79
-
80
-
81
-
82
-
83
-
84
-
85
-
86
-
87
-
88
-
89
-
90
-
91
-
92
-
93
-
94
-
95
-
96
-
97
-
98
-
99
-
100
-
101
-
102
-
103
-
104
-
105
-
106
-
107
-
108
-
109
-
110
-
111
-
112
-
113
-
114
-
115
-
116
-
117
-
118
-
119
-
120
-
121
-
122
-
123
-
124
-
125
-
126
-
127
-
128
-
129
-
130
-
131
-
132
-
133
-
134
-
135
-
136
-
137
-
138
-
139
-
140
-
141
-
142
-
143
-
144
-
145
-
146
-
147
-
148
-
149
-
150
-
151
-
152
-
153
-
154
-
155
-
156
-
157
-
158
-
159
-
160
-
161
-
162
-
163
-
164
-
165
-
166
-
167
-
168
-
169
-
170
-
171
-
172
-
173
-
174
-
175
-
176
-
177
-
178
-
179
-
180
-
181
-
182
-
183
-
184
-
185
-
186
-
187
-
188
-
189
-
190
-
191
-
192
-
193
-
194
-
195
-
196
-
197
-
198
-
199
-
200
-
201
-
202
-
203
-
204
-
205
-
206
-
207
-
208
-
209
-
210
-
211
-
212
-
213
-
214
-
215
-
216
-
217
-
218
-
219
-
220
-
221
-
222
-
223
-
224
-
225
-
226
-
227
-
228
-
229
-
230
-
231
-
232
-
233
-
234
-
235
-
236
-
237
-
238
-
239
-
240
-
241
-
242
-
243
-
244
-
245
-
246
-
247
-
248
-
249
-
250
-
251
-
252
-
253
-
254
-
255
-
256
-
257
-
258
-
259
-
260
-
261
-
262
-
263
-
264
-
265
-
266
-
267
-
268
-
269
-
270
-
271
-
272
-
273
-
274
-
275
-
276
-
277
-
278
-
279
-
280
-
281
-
282
-
283
-
284
-
285
-
286
-
287
-
288
-
289
-
290
-
291
-
292
-
293
-
294
-
295
-
296
-
297
-
298
-
299
-
300
-
301
-
302
-
303
-
304
-
305
-
306
-
307
-
308
-
309
-
310
-
311
-
312
-
313
-
314
-
315
-
316
-
317
-
318
-
319
-
320
-
321
-
322
-
323
-
324
-
325
-
326
-
327
-
328
-
329
-
330
-
331
-
332
-
333
-
334
-
335
-
336
-
337
-
338
-
339
-
340
-
341
-
342
-
343
-
344
-
345
-
346
-
347
-
348
-
349
-
350
-
351
-
352
-
353
-
354
-
355
-
356
-
357
-
358
-
359
-
360
-
361
-
362
-
363
-
364
-
365
-
366
-
367
-
368
-
369
-
370
-
371
-
372
-
373
-
374
-
375
-
376
-
377
-
378
-
379
-
380
-
381
-
382
-
383
-
384
-
385
-
386
-
387
-
388
-
389
-
390
-
391
-
392
-
393
-
394
-
395
-
396
-
397
-
398
-
399
-
400
-
401
-
402
-
403
-
404
-
405
-
406
-
407
-
408
-
409
-
410
-
411
// Copyright 2025 Shota FUJI
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
import Network
import OSLog
import Observation
public actor Connection: Connectable, Communicatable {
private let logger = Logger(subsystem: subsystem, category: "Connection")
public let serverID: String
public let host: String
public let port: UInt16
public let ext: RegistryService.Extension
public private(set) var token: String? = nil
private var conn: NWConnection
private var writeContext: NWConnection.ContentContext
private let queue = DispatchQueue(label: "RoonKit.Connection.network")
private var requestID: RequestId = 0
private var messageContinuationID: UInt = 0
private var messageContinuations: [UInt: AsyncStream<Moo>.Continuation] = [:]
private var readLoop: Task<Void, any Error>? = nil
public enum InitError: Error {
case invalidHost
}
public init(
id: String,
host: String,
port: UInt16,
ext: RegistryService.Extension
)
async throws
{
self.serverID = id
self.host = host
self.port = port
self.ext = ext
self.token = ext.token
let metadata = NWProtocolWebSocket.Metadata(opcode: .binary)
self.writeContext = NWConnection.ContentContext(
identifier: "writeBinaryContext",
metadata: [metadata]
)
var url = URLComponents()
url.scheme = "ws"
url.host = host
url.port = Int(port)
url.path = "/api"
guard let url = url.url else {
throw InitError.invalidHost
}
let options = NWProtocolWebSocket.Options.init()
options.autoReplyPing = true
let parameters = NWParameters.tcp
parameters.defaultProtocolStack.applicationProtocols.insert(options, at: 0)
self.conn = NWConnection.init(to: .url(url), using: parameters)
try await self.connect()
}
deinit {
logger.info("Disconnecting... (automatic)")
if let readLoop = readLoop {
readLoop.cancel()
self.readLoop = nil
}
self.messageContinuations = [:]
if conn.state != .cancelled {
conn.cancel()
}
}
public enum ConnectError: Error {
case alreadyStarted
case unexpectedInfoReceived
case serverIDMismatch
}
public func connect() async throws {
switch conn.state {
case .failed(let error):
throw error
case .cancelled, .setup:
break
default:
throw ConnectError.alreadyStarted
}
logger.debug("Starting WebSocket connection")
conn.start(queue: queue)
try await conn.ready()
logger.debug("WebSocket connection is ready")
self.readLoop = startReadLoop()
conn.stateUpdateHandler = { [weak self] (state: NWConnection.State) in
switch state {
case .cancelled, .failed(_), .waiting(_):
Task {
self?.logger.info("Network connection closed")
await self?.disconnect()
}
case .preparing:
self?.logger.debug("Network is in preparing state")
case .setup:
self?.logger.debug("Network has reset to setup state")
default:
break
}
}
do {
logger.debug("Querying server info")
let res = try await request(Moo(info: .init()), timeout: .seconds(3))
let info = try RegistryService.InfoResponse(res)
guard info.coreId == serverID else {
throw ConnectError.serverIDMismatch
}
}
do {
logger.debug("Registering extension")
// This stays pending until user accepts the extension on Roon settings page.
// Because of this, setting timeout does not make sense here.
let res = try await request(Moo(register: ext))
let result = try RegistryService.RegisterResponse(res)
token = result.token
}
}
private func clearMessageContinuations() {
for (_, continuation) in messageContinuations {
continuation.finish()
}
messageContinuations.removeAll()
}
private func startReadLoop() -> Task<Void, any Error> {
// This loop task is a field of the actor containing it--every reference to
// the actor (self) should be weak otherwise they form circular reference
// and the actor will not be released via ARC.
Task { [weak self] in
while true {
try Task.checkCancellation()
do {
guard let self = self else {
return
}
let (data, context, _) = try await conn.receiveMessage()
guard let data = data else {
continue
}
guard
let metadata = context?.protocolMetadata.first
as? NWProtocolWebSocket.Metadata
else {
logger.warning(
"Received non-WebSocket message on WebSocket connection"
)
continue
}
switch metadata.opcode {
case .binary, .text:
guard let msg = Moo.init(String(data: data, encoding: .utf8)!)
else {
logger.warning("Received invalid MOO message")
continue
}
if msg.service == PingService.Ping.service {
Task { [weak self] in
var res = Moo(verb: "COMPLETE", service: "Success")
res.requestId = msg.requestId
guard let self = self else {
return
}
try await send(res)
}
continue
}
for (_, continuation) in await messageContinuations {
continuation.yield(msg)
}
case .close:
logger.info("Connection closed, releasing resources")
return
default:
break
}
} catch {
if let logger = self?.logger {
logger.warning("Receive failed: \(error)")
}
return
}
}
}
}
public func disconnect() {
logger.info("Disconnecting... (manual)")
if let readLoop = readLoop {
readLoop.cancel()
self.readLoop = nil
}
self.messageContinuations = [:]
if conn.state != .cancelled {
conn.cancel()
}
}
public var messages: AsyncStream<Moo> {
let id = messageContinuationID
messageContinuationID += 1
return AsyncStream { cont in
messageContinuations[id] = cont
cont.onTermination = { @Sendable _ in
Task.detached {
await self.deleteMessageContinuation(id)
}
}
}
}
private func request(
msg: consuming Moo,
timeout: ContinuousClock.Instant.Duration? = nil
) async throws -> Moo {
let requestID: RequestId
if let givenID = msg.requestId {
requestID = givenID
} else {
requestID = self.requestID
msg.requestId = requestID
self.requestID += 1
}
let timeoutTask: Task<Void, any Error>?
if let timeout = timeout {
timeoutTask = Task {
try await Task.sleep(for: timeout)
throw MessageReadError.timeout
}
} else {
timeoutTask = nil
}
let read = Task {
for await msg in self.messages {
guard msg.requestId == .some(requestID) else {
continue
}
timeoutTask?.cancel()
return msg
}
throw MessageReadError.connectionClosed
}
try await self.send(msg)
return try await read.value
}
public func request(_ msg: consuming Moo) async throws -> Moo {
return try await request(msg: msg)
}
public func request(
_ msg: consuming Moo,
timeout: ContinuousClock.Instant.Duration
) async throws -> Moo {
return try await request(msg: msg, timeout: timeout)
}
public func send(_ msg: consuming Moo) async throws {
let req = String(msg).data(using: .utf8)!
try await withCheckedThrowingContinuation {
(cont: CheckedContinuation<Void, any Error>) in
if conn.state != .ready {
cont.resume(throwing: MessageWriteError.connectionNotReady)
return
}
conn.send(
content: req,
contentContext: writeContext,
completion: .contentProcessed({ error in
if let error = error {
cont.resume(throwing: error)
return
}
cont.resume()
})
)
}
}
private func deleteMessageContinuation(_ id: UInt) {
self.messageContinuations[id] = nil
}
}
extension NWConnection {
fileprivate enum ConnectionNotReadyError: Error {
case failed(NWError)
case cancelled
}
fileprivate func ready() async throws {
try await withCheckedThrowingContinuation {
(cont: CheckedContinuation<Void, any Error>) in
self.stateUpdateHandler = { state in
switch state {
case .failed(let error), .waiting(let error):
cont.resume(throwing: ConnectionNotReadyError.failed(error))
self.stateUpdateHandler = nil
case .cancelled:
cont.resume(throwing: ConnectionNotReadyError.cancelled)
self.stateUpdateHandler = nil
case .ready:
cont.resume()
self.stateUpdateHandler = nil
default:
break
}
}
}
}
fileprivate func receiveMessage() async throws -> (
Data?, ContentContext?, Bool
) {
return try await withCheckedThrowingContinuation { cont in
self.receiveMessage { (data, context, received, error) in
if let error = error {
cont.resume(throwing: error)
return
}
cont.resume(returning: (data, context, received))
}
}
}
fileprivate func send(
_ data: Data,
contentContext: ContentContext = ContentContext.defaultMessage
) async throws {
try await withCheckedThrowingContinuation {
(cont: CheckedContinuation<Void, any Error>) in
self.send(
content: data,
contentContext: contentContext,
completion: .contentProcessed({ error in
if let error = error {
cont.resume(throwing: error)
return
}
cont.resume()
})
)
}
}
}