Changes
1 changed files (+101/-48)
-
-
@@ -346,6 +346,56 @@ pub fn write(self: *@This(), data: T) void {self.data = data; self.wrote.set(); } pub const Store = struct { const Map = std.AutoHashMap(u64, Listener(T, timeout_ms)); mutex: std.Thread.Mutex = .{}, map: Map, pub fn init(allocator: std.mem.Allocator) Store { return .{ .map = Map.init(allocator) }; } pub fn deinit(self: *Store) void { self.map.deinit(); } pub fn add(self: *Store, req_id: u64) !*Listener(T, timeout_ms) { self.mutex.lock(); defer self.mutex.unlock(); const entry = try self.map.getOrPut(req_id); entry.value_ptr.* = .{}; return entry.value_ptr; } /// Only call from the thread that calls "add". pub fn remove(self: *Store, req_id: u64) void { self.mutex.lock(); defer self.mutex.unlock(); _ = self.map.remove(req_id); } /// Caller needs to call "unlock" after reading or writing to the /// returned value. pub fn get(self: *Store, req_id: u64) ?*Listener(T, timeout_ms) { self.mutex.lock(); const entry = self.map.getPtr(req_id) orelse { self.mutex.unlock(); return null; }; return entry; } pub fn unlock(self: *Store) void { self.mutex.unlock(); } }; }; }
-
@@ -378,12 +428,12 @@ host: []const u8,zone_subscription_request_id: ?u64 = null, arc: Arc = .{}, saved_token: ?[]const u8 = null, browse_listeners: std.AutoHashMap(u64, BrowseListener), load_listeners: std.AutoHashMap(u64, LoadListener), browse_listeners: BrowseListener.Store, load_listeners: LoadListener.Store, image_downloads: ImageDownloader, control_events: std.AutoHashMap(u64, ControlListener), seek_events: std.AutoHashMap(u64, SeekListener), change_volume_events: std.AutoHashMap(u64, ChangeVolumeListener), control_events: ControlListener.Store, seek_events: SeekListener.Store, change_volume_events: ChangeVolumeListener.Store, fn init(server: *discovery.Server, token: ?[]const u8) !Internal { var addr = std.ArrayList(u8).init(allocator);
-
@@ -421,12 +471,12 @@ };} fn initListeners(self: *Internal) void { self.browse_listeners = std.AutoHashMap(u64, BrowseListener).init(allocator); self.load_listeners = std.AutoHashMap(u64, LoadListener).init(allocator); self.browse_listeners = BrowseListener.Store.init(allocator); self.load_listeners = LoadListener.Store.init(allocator); self.image_downloads = ImageDownloader.init(self.tsa.allocator()); self.control_events = std.AutoHashMap(u64, ControlListener).init(allocator); self.seek_events = std.AutoHashMap(u64, SeekListener).init(allocator); self.change_volume_events = std.AutoHashMap(u64, ChangeVolumeListener).init(allocator); self.control_events = ControlListener.Store.init(allocator); self.seek_events = SeekListener.Store.init(allocator); self.change_volume_events = ChangeVolumeListener.Store.init(allocator); } fn deinitListeners(self: *Internal) void {
-
@@ -648,7 +698,9 @@ continue;} } if (self.internal.browse_listeners.getEntry(header.request_id)) |entry| { if (self.internal.browse_listeners.get(header.request_id)) |listener| { defer self.internal.browse_listeners.unlock(); std.log.debug("Received /browse response (ID={d})", .{header.request_id}); const res = BrowseService.Browse.Response.parse(
-
@@ -661,11 +713,13 @@ std.log.err("Received unexpected browse response: {s}", .{@errorName(err)});continue; }; entry.value_ptr.write(res); listener.write(res); continue; } if (self.internal.load_listeners.getEntry(header.request_id)) |entry| { if (self.internal.load_listeners.get(header.request_id)) |listener| { defer self.internal.load_listeners.unlock(); std.log.debug("Received /load response (ID={d})", .{header.request_id}); const res = BrowseService.Load.Response.parse(
-
@@ -678,23 +732,27 @@ std.log.err("Received unexpected load response: {s}", .{@errorName(err)});continue; }; entry.value_ptr.write(res); listener.write(res); continue; } if (self.internal.control_events.getEntry(header.request_id)) |entry| { if (self.internal.control_events.get(header.request_id)) |listener| { defer self.internal.control_events.unlock(); std.log.debug("Received /control response (ID={d})", .{header.request_id}); entry.value_ptr.write( listener.write( if (std.mem.eql(u8, "Success", meta.service)) .ok else .server_error, ); continue; } if (self.internal.seek_events.getEntry(header.request_id)) |entry| { if (self.internal.seek_events.get(header.request_id)) |listener| { defer self.internal.seek_events.unlock(); std.log.debug("Received /seek response (ID={d})", .{header.request_id}); entry.value_ptr.write(code: { listener.write(code: { _ = TransportService.Seek.Response.decode(&meta) catch { break :code .server_error; };
-
@@ -704,10 +762,12 @@ });continue; } if (self.internal.change_volume_events.getEntry(header.request_id)) |entry| { if (self.internal.change_volume_events.get(header.request_id)) |listener| { defer self.internal.change_volume_events.unlock(); std.log.debug("Received /change_volume response (ID={d})", .{header.request_id}); entry.value_ptr.write(code: { listener.write(code: { _ = TransportService.ChangeVolume.Response.decode(&meta) catch { break :code .server_error; };
-
@@ -967,19 +1027,18 @@ return .unknown_error;}; defer allocator.free(req); var entry = self.internal.control_events.getOrPut(req_id) catch |err| { var listener = self.internal.control_events.add(req_id) catch |err| { std.log.err("Unable to set listener for control response: {s}", .{@errorName(err)}); return .unknown_error; }; entry.value_ptr.* = .{}; defer _ = self.internal.control_events.remove(req_id); defer self.internal.control_events.remove(req_id); ws.writeBin(req) catch |err| { std.log.err("Unable to write control request: {s}", .{@errorName(err)}); return .failed_to_send; }; return entry.value_ptr.listen() catch { return listener.listen() catch { return .timeout; }; }
-
@@ -1020,19 +1079,18 @@ return .unknown_error;}; defer allocator.free(req_msg); var entry = self.internal.seek_events.getOrPut(req_id) catch |err| { var listener = self.internal.seek_events.add(req_id) catch |err| { std.log.err("Unable to set listener for seek response: {s}", .{@errorName(err)}); return .unknown_error; }; entry.value_ptr.* = .{}; defer _ = self.internal.seek_events.remove(req_id); defer self.internal.seek_events.remove(req_id); ws.writeBin(req_msg) catch |err| { std.log.err("Unable to write seek request: {s}", .{@errorName(err)}); return .failed_to_send; }; return entry.value_ptr.listen() catch { return listener.listen() catch { return .timeout; }; }
-
@@ -1072,19 +1130,18 @@ return .unknown_error;}; defer allocator.free(req_msg); var entry = self.internal.change_volume_events.getOrPut(req_id) catch |err| { var listener = self.internal.change_volume_events.add(req_id) catch |err| { std.log.err("Unable to set listener for volume increase response: {s}", .{@errorName(err)}); return .out_of_memory; }; entry.value_ptr.* = .{}; defer _ = self.internal.change_volume_events.remove(req_id); defer self.internal.change_volume_events.remove(req_id); ws.writeBin(req_msg) catch |err| { std.log.err("Unable to write volume increase request: {s}", .{@errorName(err)}); return .failed_to_send; }; return entry.value_ptr.listen() catch { return listener.listen() catch { return .timeout; }; }
-
@@ -1124,19 +1181,18 @@ return .unknown_error;}; defer allocator.free(req_msg); var entry = self.internal.change_volume_events.getOrPut(req_id) catch |err| { var listener = self.internal.change_volume_events.add(req_id) catch |err| { std.log.err("Unable to set listener for volume decrease response: {s}", .{@errorName(err)}); return .out_of_memory; }; entry.value_ptr.* = .{}; defer _ = self.internal.change_volume_events.remove(req_id); defer self.internal.change_volume_events.remove(req_id); ws.writeBin(req_msg) catch |err| { std.log.err("Unable to write volume decrease request: {s}", .{@errorName(err)}); return .failed_to_send; }; return entry.value_ptr.listen() catch { return listener.listen() catch { return .timeout; }; }
-
@@ -1177,19 +1233,18 @@ return .unknown_error;}; defer allocator.free(req_msg); var entry = self.internal.change_volume_events.getOrPut(req_id) catch |err| { var listener = self.internal.change_volume_events.add(req_id) catch |err| { std.log.err("Unable to set listener for volume change response: {s}", .{@errorName(err)}); return .out_of_memory; }; entry.value_ptr.* = .{}; defer _ = self.internal.change_volume_events.remove(req_id); defer self.internal.change_volume_events.remove(req_id); ws.writeBin(req_msg) catch |err| { std.log.err("Unable to write volume change request: {s}", .{@errorName(err)}); return .failed_to_send; }; return entry.value_ptr.listen() catch { return listener.listen() catch { return .timeout; }; }
-
@@ -1233,12 +1288,11 @@ return browse.Result.makeRetainedError(.unknown_error) catch null;}; defer allocator.free(req_msg); var entry = self.internal.browse_listeners.getOrPut(req_id) catch |err| { var browse_listener = self.internal.browse_listeners.add(req_id) catch |err| { std.log.err("Unable to set listener for browse response: {s}", .{@errorName(err)}); return browse.Result.makeRetainedError(.unknown_error) catch null; }; entry.value_ptr.* = .{}; defer _ = self.internal.browse_listeners.remove(req_id); defer self.internal.browse_listeners.remove(req_id); ws.writeBin(req_msg) catch |err| { std.log.err("Unable to write browse request: {s}", .{@errorName(err)});
-
@@ -1247,7 +1301,7 @@ };std.log.debug("Sent browse request (ID={d})", .{req_id}); const resp = entry.value_ptr.listen() catch { const resp = browse_listener.listen() catch { std.log.err("Browse request timeout", .{}); return browse.Result.makeRetainedError(.timeout) catch null; };
-
@@ -1307,12 +1361,11 @@ return browse.Result.makeRetainedError(.unknown_error) catch null;}; defer allocator.free(load_req_msg); var load_entry = self.internal.load_listeners.getOrPut(load_req_id) catch |err| { var load_listener = self.internal.load_listeners.add(load_req_id) catch |err| { std.log.err("Unable to set listener for load response: {s}", .{@errorName(err)}); return browse.Result.makeRetainedError(.unknown_error) catch null; }; load_entry.value_ptr.* = .{}; defer _ = self.internal.load_listeners.remove(load_req_id); defer self.internal.load_listeners.remove(load_req_id); ws.writeBin(load_req_msg) catch |err| { std.log.err("Unable to write load request: {s}", .{@errorName(err)});
-
@@ -1321,7 +1374,7 @@ };std.log.debug("Sent load request (ID={d})", .{load_req_id}); const load_resp = load_entry.value_ptr.listen() catch { const load_resp = load_listener.listen() catch { std.log.err("Load request timeout", .{}); return browse.Result.makeRetainedError(.timeout) catch null; };
-