Changes
1 changed files (+299/-177)
-
-
@@ -415,6 +415,8 @@ const ChangeVolumeListener = Listener(transport.VolumeControlResultCode, 2_000);const ImageDownloader = image.Downloader(.{}); const EventsQueue = std.DoublyLinkedList(*Event); internal: *Internal, pub const Internal = struct {
-
@@ -434,6 +436,13 @@ image_downloads: ImageDownloader,control_events: ControlListener.Store, seek_events: SeekListener.Store, change_volume_events: ChangeVolumeListener.Store, read_loop: ?std.Thread = null, /// An event inside this queue is retained. events_queue: EventsQueue = .{}, events_queue_lock: std.Thread.Mutex = .{}, events_queue_cond: std.Thread.Condition = .{}, fn init(server: *discovery.Server, token: ?[]const u8) !Internal { var addr = std.ArrayList(u8).init(allocator);
-
@@ -492,10 +501,27 @@fn deinit(self: *Internal) void { self.deinitListeners(); if (self.read_loop) |thread| { thread.detach(); self.read_loop = null; } if (self.ws) |*ws| { ws.deinit(); } { defer self.events_queue_cond.broadcast(); self.events_queue_lock.lock(); defer self.events_queue_lock.unlock(); while (self.events_queue.popFirst()) |node| { node.data.release(); allocator.destroy(node); } } if (self.saved_token) |saved_token| { allocator.free(saved_token); }
-
@@ -566,6 +592,11 @@ var self = ptr orelse @panic(std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); if (self.internal.read_loop) |thread| { thread.detach(); self.internal.read_loop = null; } if (self.internal.ws) |*ws| { std.log.debug("Closing WebSocket connection...", .{});
-
@@ -588,197 +619,46 @@ var self = ptr orelse @panic(std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); var ws = self.internal.ws orelse { const new_token = self.connect(self.internal.saved_token) catch |err| { std.log.err("Unable to connect: {s}", .{@errorName(err)}); const event = Event.makeConnectionError(err) catch |make_err| { std.log.err("Unable to compose {s} event: {s}", .{ @tagName(Event.Kind.connection_error), @errorName(make_err), }); return null; }; return event.retain(); }; const event = Event.makeConnected(new_token) catch |err| { std.log.err("Unable to compose {s} event: {s}", .{ @tagName(Event.Kind.connected), @errorName(err), }); return null; }; return event.retain(); }; self.internal.events_queue_lock.lock(); defer self.internal.events_queue_lock.unlock(); while (true) { const meta, const header_ctx, const msg = readMessage(&ws) catch |err| { if (err == error.ReadClosedConnection) { std.log.debug("WebSocket connection is closed", .{}); self.disconnect(); } else { std.log.err("Failed to read a message: {s}", .{@errorName(err)}); } const event = Event.makeConnectionError(err) catch |make_err| { std.log.err("Unable to compose {s} event: {s}", .{ @tagName(Event.Kind.connection_error), @errorName(make_err), }); return null; }; return event.retain(); }; defer ws.done(msg); const header: moo.NoBodyHeaders, _ = moo.NoBodyHeaders.parse(msg.data, header_ctx) catch |err| { std.log.err("Invalid MOO message header: {s}", .{@errorName(err)}); continue; }; if (self.internal.zone_subscription_request_id) |req_id| { if (header.request_id == req_id) { if (std.mem.eql(u8, meta.service, "Subscribed")) { const res = TransportService.SubscribeZoneChanges.initialResponse( allocator, header_ctx, msg.data, ) catch |err| { std.log.err( "Received unexpected zone subscription response: {s}", .{@errorName(err)}, ); continue; }; defer res.deinit(); const event = Event.makeZoneListFromInitial(res.value) catch |make_err| { const node = self.internal.events_queue.popFirst() orelse { if (self.internal.ws == null) { const new_token = self.connect(self.internal.saved_token) catch |err| { std.log.err("Unable to connect: {s}", .{@errorName(err)}); const event = Event.makeConnectionError(err) catch |make_err| { std.log.err("Unable to compose {s} event: {s}", .{ @tagName(Event.Kind.zone_list), @tagName(Event.Kind.connection_error), @errorName(make_err), }); return null; }; return event.retain(); } if (std.mem.eql(u8, meta.service, "Changed")) { const res = TransportService.SubscribeZoneChanges.response( allocator, header_ctx, msg.data, ) catch |err| { std.log.err( "Received unexpected zone change response: {s}", .{@errorName(err)}, ); continue; }; defer res.deinit(); const event = Event.makeZoneListFromChanges(res.value) catch |err| { std.log.err("Unable to compose {s} event: {s}", .{ @tagName(Event.Kind.zone_list), @errorName(err), }); return null; }; return event.retain(); } if (std.mem.eql(u8, meta.service, "Unsubscribed")) { self.internal.zone_subscription_request_id = null; // TODO: Return unsubscribed event continue; } }; std.log.warn("Unknown response received for zone subscription: {s}", .{ meta.service, }); continue; const event = Event.makeConnected(new_token) catch |err| { std.log.err("Unable to compose {s} event: {s}", .{ @tagName(Event.Kind.connected), @errorName(err), }); return null; }; return event.retain(); } } if (self.internal.browse_listeners.get(header.request_id)) |listener| { defer self.internal.browse_listeners.unlock(); std.log.debug("Received /browse response (ID={d})", .{header.request_id}); const res = BrowseService.Browse.Response.parse( allocator, &meta, header_ctx, msg.data, ) catch |err| { std.log.err("Received unexpected browse response: {s}", .{@errorName(err)}); continue; }; if (self.internal.read_loop == null) { // After "deinit()". Hitting this branch means concurrent call to "getEvent", // which is illegal. return null; } listener.write(res); self.internal.events_queue_cond.wait(&self.internal.events_queue_lock); continue; } }; if (self.internal.load_listeners.get(header.request_id)) |listener| { defer self.internal.load_listeners.unlock(); std.log.debug("Received /load response (ID={d})", .{header.request_id}); const res = BrowseService.Load.Response.parse( allocator, &meta, header_ctx, msg.data, ) catch |err| { std.log.err("Received unexpected load response: {s}", .{@errorName(err)}); continue; }; listener.write(res); continue; } if (self.internal.control_events.get(header.request_id)) |listener| { defer self.internal.control_events.unlock(); std.log.debug("Received /control response (ID={d})", .{header.request_id}); listener.write( if (std.mem.eql(u8, "Success", meta.service)) .ok else .server_error, ); continue; } if (self.internal.seek_events.get(header.request_id)) |listener| { defer self.internal.seek_events.unlock(); std.log.debug("Received /seek response (ID={d})", .{header.request_id}); listener.write(code: { _ = TransportService.Seek.Response.decode(&meta) catch { break :code .server_error; }; break :code .ok; }); continue; } if (self.internal.change_volume_events.get(header.request_id)) |listener| { defer self.internal.change_volume_events.unlock(); std.log.debug("Received /change_volume response (ID={d})", .{header.request_id}); listener.write(code: { _ = TransportService.ChangeVolume.Response.decode(&meta) catch { break :code .server_error; }; break :code .ok; }); continue; } std.log.warn("Unhandle message on {s}", .{meta.service}); std.log.debug("{s}", .{msg.data}); defer allocator.destroy(node); return node.data; } }
-
@@ -873,7 +753,249 @@self.internal.request_id = request_id; self.internal.ws = ws; self.internal.read_loop = try std.Thread.spawn(.{}, readLoop, .{ self, &self.internal.ws.? }); return new_token; } fn readLoop(self: *Connection, ws: *websocket.Client) void { var should_use_pool = true; var pool: std.Thread.Pool = undefined; pool.init(.{ .allocator = allocator, }) catch |err| { std.log.warn( "Failed to create thread pool for events, handling event in read message thread: {s}", .{@errorName(err)}, ); should_use_pool = false; }; defer if (should_use_pool) pool.deinit(); if (should_use_pool) { std.log.debug("Using {d} threads for event handling", .{pool.threads.len}); } while (true) { _, _, const msg = readMessage(ws) catch |err| { if (err == error.ReadClosedConnection) { std.log.debug("WebSocket connection is closed", .{}); self.disconnect(); } else { std.log.err("Failed to read a message: {s}", .{@errorName(err)}); } const event = Event.makeConnectionError(err) catch |make_err| { std.log.err("Unable to compose {s} event: {s}", .{ @tagName(Event.Kind.connection_error), @errorName(make_err), }); return; }; self.queueEvent(event) catch { std.log.err("Out of memory at pushing event to queue", .{}); return; }; return; }; defer ws.done(msg); const data = allocator.dupe(u8, msg.data) catch { std.log.err("Out of memory at copying WebSocket message", .{}); continue; }; if (should_use_pool) { pool.spawn(handleEvent, .{ self, data }) catch |err| { allocator.free(data); std.log.err("Failed to spawn message handler thread: {s}", .{@errorName(err)}); continue; }; } else { self.handleEvent(data); } } } fn queueEvent(self: *Connection, event: *Event) std.mem.Allocator.Error!void { defer self.internal.events_queue_cond.signal(); self.internal.events_queue_lock.lock(); defer self.internal.events_queue_lock.unlock(); const node = try allocator.create(EventsQueue.Node); node.* = .{ .data = event.retain(), }; self.internal.events_queue.append(node); } /// This function takes ownership of `msg`. fn handleEvent(self: *Connection, msg: []const u8) void { defer allocator.free(msg); const meta, const header_ctx = moo.Metadata.parse(msg) catch |err| { std.log.err("Failed to parse message (possible memory corruption): {s}", .{@errorName(err)}); return; }; const header: moo.NoBodyHeaders, _ = moo.NoBodyHeaders.parse(msg, header_ctx) catch |err| { std.log.err("Invalid MOO message header: {s}", .{@errorName(err)}); return; }; if (self.internal.zone_subscription_request_id) |req_id| { if (header.request_id == req_id) { if (std.mem.eql(u8, meta.service, "Subscribed")) { const res = TransportService.SubscribeZoneChanges.initialResponse( allocator, header_ctx, msg, ) catch |err| { std.log.err( "Received unexpected zone subscription response: {s}", .{@errorName(err)}, ); return; }; defer res.deinit(); const event = Event.makeZoneListFromInitial(res.value) catch |make_err| { std.log.err("Unable to compose {s} event: {s}", .{ @tagName(Event.Kind.zone_list), @errorName(make_err), }); return; }; self.queueEvent(event) catch |err| { std.log.err("Failed to send {s} event: {s}", .{ @tagName(Event.Kind.zone_list), @errorName(err) }); }; return; } if (std.mem.eql(u8, meta.service, "Changed")) { const res = TransportService.SubscribeZoneChanges.response( allocator, header_ctx, msg, ) catch |err| { std.log.err( "Received unexpected zone change response: {s}", .{@errorName(err)}, ); return; }; defer res.deinit(); const event = Event.makeZoneListFromChanges(res.value) catch |err| { std.log.err("Unable to compose {s} event: {s}", .{ @tagName(Event.Kind.zone_list), @errorName(err), }); return; }; self.queueEvent(event) catch |err| { std.log.err("Failed to send {s} event: {s}", .{ @tagName(Event.Kind.zone_list), @errorName(err) }); }; return; } if (std.mem.eql(u8, meta.service, "Unsubscribed")) { self.internal.zone_subscription_request_id = null; // TODO: Queue unsubscribed event return; } std.log.warn("Unknown response received for zone subscription: {s}", .{ meta.service, }); return; } } if (self.internal.browse_listeners.get(header.request_id)) |listener| { defer self.internal.browse_listeners.unlock(); std.log.debug("Received /browse response (ID={d})", .{header.request_id}); const res = BrowseService.Browse.Response.parse( allocator, &meta, header_ctx, msg, ) catch |err| { std.log.err("Received unexpected browse response: {s}", .{@errorName(err)}); return; }; listener.write(res); return; } if (self.internal.load_listeners.get(header.request_id)) |listener| { defer self.internal.load_listeners.unlock(); std.log.debug("Received /load response (ID={d})", .{header.request_id}); const res = BrowseService.Load.Response.parse( allocator, &meta, header_ctx, msg, ) catch |err| { std.log.err("Received unexpected load response: {s}", .{@errorName(err)}); return; }; listener.write(res); return; } if (self.internal.control_events.get(header.request_id)) |listener| { defer self.internal.control_events.unlock(); std.log.debug("Received /control response (ID={d})", .{header.request_id}); listener.write( if (std.mem.eql(u8, "Success", meta.service)) .ok else .server_error, ); return; } if (self.internal.seek_events.get(header.request_id)) |listener| { defer self.internal.seek_events.unlock(); std.log.debug("Received /seek response (ID={d})", .{header.request_id}); listener.write(code: { _ = TransportService.Seek.Response.decode(&meta) catch { break :code .server_error; }; break :code .ok; }); return; } if (self.internal.change_volume_events.get(header.request_id)) |listener| { defer self.internal.change_volume_events.unlock(); std.log.debug("Received /change_volume response (ID={d})", .{header.request_id}); listener.write(code: { _ = TransportService.ChangeVolume.Response.decode(&meta) catch { break :code .server_error; }; break :code .ok; }); return; } std.log.warn("Unhandle message on {s}", .{meta.service}); std.log.debug("{s}", .{msg}); } /// Caller is responsible for closing message by calling `ws.done()`.
-