Changes
2 changed files (+120/-42)
-
-
@@ -354,13 +354,46 @@ fn JsonResponseListener(comptime T: type) type {return Listener(moo.JsonBody(T)); } const ImageDownload = struct { hash: u64, ready: std.Thread.ResetEvent = .{}, downloaded: *image.GetResult = undefined, pub fn getHash( image_key: []const u8, opts: *const image.GetOptions, ) u64 { var hasher = std.hash.Wyhash.init(0); std.hash.autoHashStrat(&hasher, image_key, .Deep); std.hash.autoHashStrat(&hasher, opts.internal.data, .Deep); return hasher.final(); } pub fn init( image_key: []const u8, opts: *const image.GetOptions, ) @This() { return .{ .hash = ImageDownload.getHash(image_key, opts) }; } pub fn write(self: *@This(), data: *image.GetResult) void { self.downloaded = data.retain(); self.ready.set(); } pub fn deinit(self: *const @This()) void { if (self.ready.isSet()) { self.downloaded.release(); } } }; pub const Connection = extern struct { const cname = "plac_connection"; const allocator = std.heap.c_allocator; const BrowseListener = JsonResponseListener(BrowseService.Browse.Response); const LoadListener = JsonResponseListener(BrowseService.Load.Response); const ImageListener = Listener(*image.GetResult); internal: *Internal,
-
@@ -368,6 +401,7 @@ pub const Internal = struct {server: *discovery.Server, ws: ?websocket.Client = null, request_id: u64 = 0, request_id_lock: std.Thread.Mutex = .{}, subscription_id: u64 = 0, host: []const u8, zone_subscription_request_id: ?u64 = null,
-
@@ -375,7 +409,8 @@ arc: Arc = .{},saved_token: ?[]const u8 = null, browse_listeners: std.AutoHashMap(u64, BrowseListener), load_listeners: std.AutoHashMap(u64, LoadListener), image_listeners: std.AutoHashMap(u64, ImageListener), image_downloads: std.AutoHashMap(u64, ImageDownload), image_downloads_lock: std.Thread.Mutex = .{}, control_events: std.AutoHashMap(u64, std.Thread.ResetEvent), fn init(server: *discovery.Server, token: ?[]const u8) !Internal {
-
@@ -406,7 +441,7 @@ .host = host,.saved_token = saved_token, .browse_listeners = std.AutoHashMap(u64, BrowseListener).init(allocator), .load_listeners = std.AutoHashMap(u64, LoadListener).init(allocator), .image_listeners = std.AutoHashMap(u64, ImageListener).init(allocator), .image_downloads = std.AutoHashMap(u64, ImageDownload).init(allocator), .control_events = std.AutoHashMap(u64, std.Thread.ResetEvent).init(allocator), }; }
-
@@ -414,7 +449,12 @@fn deinit(self: *Internal) void { self.browse_listeners.deinit(); self.load_listeners.deinit(); self.image_listeners.deinit(); var image_downloads = self.image_downloads.valueIterator(); while (image_downloads.next()) |download| { download.deinit(); } self.image_downloads.deinit(); self.control_events.deinit(); if (self.ws) |*ws| {
-
@@ -429,6 +469,15 @@ allocator.free(self.host);self.server.release(); } }; fn getRequestId(self: *Connection) u64 { self.internal.request_id_lock.lock(); defer self.internal.request_id_lock.unlock(); const current = self.internal.request_id; self.internal.request_id += 1; return current; } pub fn make(server_ptr: ?*discovery.Server, token: ?[*:0]const u8) callconv(.C) ?*Connection { const server = server_ptr orelse @panic(
-
@@ -646,7 +695,7 @@ entry.value_ptr.write(res);continue; } if (self.internal.image_listeners.getEntry(header.request_id)) |entry| { if (self.internal.image_downloads.getEntry(header.request_id)) |entry| { std.log.debug("Received /get_image response (ID={d})", .{header.request_id}); const res = ImageService.Get.Response.decode(
-
@@ -655,21 +704,21 @@ header_ctx,msg.data, ) catch |err| { std.log.debug("Received unexpected get_image response: {s}", .{@errorName(err)}); const err_obj = image.GetResult.makeRetainedError(.unexpected_response) catch |obj_err| { std.log.err("Unable to compose image.GetResult: {s}", .{@errorName(obj_err)}); continue; }; entry.value_ptr.write(err_obj); entry.value_ptr.write( image.GetResult.makeError(.unexpected_response) catch |obj_err| { std.log.err("Unable to compose image.GetResult: {s}", .{@errorName(obj_err)}); continue; }, ); continue; }; const obj = image.GetResult.makeRetained(&res) catch |err| { entry.value_ptr.write(image.GetResult.make(&res) catch |err| { std.log.err("Unable to compose image.GetResult: {s}", .{@errorName(err)}); continue; }; }); entry.value_ptr.write(obj); continue; }
-
@@ -845,8 +894,7 @@ std.log.err("{s}_{s} called, but WebSocket connection is not ready", .{ cname, @src().fn_name });return; }; const req_id = self.internal.request_id; self.internal.request_id += 1; const req_id = self.getRequestId(); const sub_id = self.internal.subscription_id; self.internal.subscription_id += 1;
-
@@ -889,8 +937,7 @@ std.log.err("{s}_{s} called, but WebSocket connection is not ready", .{ cname, @src().fn_name });return; }; const req_id = self.internal.request_id; self.internal.request_id += 1; const req_id = self.getRequestId(); std.log.debug("Sending control request...", .{}); const kind = control: {
-
@@ -962,8 +1009,7 @@ std.log.err("{s}_{s} called, but WebSocket connection is not ready", .{ cname, @src().fn_name });return browse.Result.makeRetainedError(.closed) catch null; }; const req_id = self.internal.request_id; self.internal.request_id += 1; const req_id = self.getRequestId(); std.log.debug("Sending browse request...", .{}); const req = BrowseService.Browse.Request{
-
@@ -1081,15 +1127,33 @@ }pub fn getImage( ptr: ?*Connection, image_key: [*:0]const u8, image_key_ptr: [*:0]const u8, opts_ptr: ?*image.GetOptions, ) callconv(.C) ?*image.GetResult { var self = ptr orelse @panic( std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); const image_key = std.mem.span(image_key_ptr); const opts = image.GetOptions.retain(opts_ptr); defer opts.release(); const req_hash = ImageDownload.getHash(image_key, opts); { self.internal.image_downloads_lock.lock(); defer self.internal.image_downloads_lock.unlock(); var download_iter = self.internal.image_downloads.valueIterator(); while (download_iter.next()) |download| { if (download.hash == req_hash) { std.log.debug("Using existing request for image_key={s}", .{image_key}); download.ready.wait(); return download.downloaded.retain(); } } } var ws = self.internal.ws orelse { std.log.err(
-
@@ -1099,18 +1163,17 @@ );return image.GetResult.makeRetainedError(.socket_closed) catch null; }; const req_id = self.internal.request_id; self.internal.request_id += 1; const req_id = self.getRequestId(); std.log.debug("Sending get_image request...", .{}); var req = ImageService.Get.Request{ .image_key = std.mem.span(image_key), .format = if (opts.internal.content_type) |t| switch (t) { .image_key = image_key, .format = if (opts.internal.data.content_type) |t| switch (t) { .jpeg => .jpeg, .png => .png, } else null, }; if (opts.internal.size) |size| { if (opts.internal.data.size) |size| { req.scale = switch (size.scaling_method) { .fit => .fit, .fill => .fill,
-
@@ -1126,12 +1189,23 @@ return image.GetResult.makeRetainedError(.unknown_error) catch null;}; defer allocator.free(req_msg); var entry = self.internal.image_listeners.getOrPut(req_id) catch |err| { std.log.err("Unable to set listener for get_image response: {s}", .{@errorName(err)}); return image.GetResult.makeRetainedError(.unknown_error) catch null; var entry = entry: { self.internal.image_downloads_lock.lock(); defer self.internal.image_downloads_lock.unlock(); const ent = self.internal.image_downloads.getOrPut(req_id) catch |err| { std.log.err("Unable to set listener for get_image response: {s}", .{@errorName(err)}); return image.GetResult.makeRetainedError(.unknown_error) catch null; }; ent.value_ptr.* = ImageDownload.init(image_key, opts); break :entry ent; }; entry.value_ptr.* = .{}; defer _ = self.internal.image_listeners.remove(req_id); defer { self.internal.image_downloads_lock.lock(); defer self.internal.image_downloads_lock.unlock(); entry.value_ptr.deinit(); _ = self.internal.image_downloads.remove(req_id); } ws.writeBin(req_msg) catch |err| { std.log.err("Unable to write get_image request: {s}", .{@errorName(err)});
-
@@ -1140,8 +1214,8 @@ };std.log.debug("Sent get_image request (ID={d})", .{req_id}); // Pointer is already retained. return entry.value_ptr.listen(); entry.value_ptr.ready.wait(); return entry.value_ptr.downloaded.retain(); } pub fn export_capi() void {
-
-
-
@@ -48,13 +48,17 @@ internal: *Internal,const Internal = struct { arc: Arc = .{}, size: ?Size = null, content_type: ?ContentType = null, data: Data = .{}, const Size = struct { scaling_method: ScalingMethod, width: usize, height: usize, const Data = struct { size: ?Size = null, content_type: ?ContentType = null, const Size = struct { scaling_method: ScalingMethod, width: usize, height: usize, }; }; };
-
@@ -105,7 +109,7 @@ );_ = self.retain(); defer self.release(); self.internal.size = .{ self.internal.data.size = .{ .scaling_method = scaling, .width = width, .height = height,
-
@@ -119,7 +123,7 @@ );_ = self.retain(); defer self.release(); self.internal.content_type = content_type; self.internal.data.content_type = content_type; } pub fn export_capi() void {
-
@@ -208,7 +212,7 @@ const Internal = struct {arc: Arc = .{}, }; fn make(src: *const ImageService.Get.Response) std.mem.Allocator.Error!*@This() { pub fn make(src: *const ImageService.Get.Response) std.mem.Allocator.Error!*@This() { const internal = try allocator.create(Internal); errdefer allocator.destroy(internal);
-
@@ -233,7 +237,7 @@ const result = try make(src);return result.retain(); } fn makeError(code: GetResultCode) std.mem.Allocator.Error!*@This() { pub fn makeError(code: GetResultCode) std.mem.Allocator.Error!*@This() { const internal = try allocator.create(Internal); errdefer allocator.destroy(internal);
-