Changes
4 changed files (+168/-18)
-
-
@@ -61,6 +61,20 @@mod.addOptions("config", options); } // Test { const t = b.addTest(.{ .root_source_file = b.path("src/main.zig"), .target = target, .optimize = optimize, }); const run = b.addRunArtifact(t); const step = b.step("test", "Run unit tests"); step.dependOn(&run.step); } // Static library for GLib { const linkage = b.option(
-
-
-
@@ -25,6 +25,7 @@ const discovery = @import("./discovery.zig");const extension = @import("./extension.zig").extension; const image = @import("./image.zig"); const freelog = @import("./log.zig").freelog; const ObjectPool = @import("./pool.zig").ObjectPool; const BrowseService = @import("./services/BrowseService.zig"); const ImageService = @import("./services/ImageService.zig"); const PingService = @import("./services/PingService.zig");
-
@@ -412,6 +413,7 @@ browse_listeners: std.AutoHashMap(u64, BrowseListener),load_listeners: std.AutoHashMap(u64, LoadListener), image_downloads: std.AutoHashMap(u64, *ImageDownload), image_downloads_lock: std.Thread.Mutex = .{}, image_download_req_pool: ObjectPool(ImageService.Get.Request, 10) = .{}, control_events: std.AutoHashMap(u64, ControlListener), seek_events: std.AutoHashMap(u64, SeekListener),
-
@@ -1240,22 +1242,6 @@const opts = image.GetOptions.retain(opts_ptr); defer opts.release(); const req_hash = ImageDownload.getHash(image_key, opts); { self.internal.image_downloads_lock.lock(); defer self.internal.image_downloads_lock.unlock(); var download_iter = self.internal.image_downloads.valueIterator(); while (download_iter.next()) |download| { if (download.*.hash == req_hash) { std.log.debug("Using existing request for image_key={s}", .{image_key}); download.*.ready.wait(); return download.*.downloaded.retain(); } } } var ws = self.internal.ws orelse { std.log.err( "{s}_{s} called, but WebSocket connection is not ready",
-
@@ -1267,13 +1253,32 @@const req_id = self.getRequestId(); std.log.debug("Sending get_image request...", .{}); var req = ImageService.Get.Request{ const req = self.internal.image_download_req_pool.acquire(); defer self.internal.image_download_req_pool.release(req); req.* = ImageService.Get.Request{ .image_key = image_key, .format = if (opts.internal.data.content_type) |t| switch (t) { .jpeg => .jpeg, .png => .png, } else null, }; const req_hash = ImageDownload.getHash(image_key, opts); { self.internal.image_downloads_lock.lock(); defer self.internal.image_downloads_lock.unlock(); var download_iter = self.internal.image_downloads.valueIterator(); while (download_iter.next()) |download| { if (download.*.hash == req_hash) { std.log.debug("Using existing request for image_key={s}", .{image_key}); download.*.ready.wait(); return download.*.downloaded.retain(); } } } if (opts.internal.data.size) |size| { req.scale = switch (size.scaling_method) { .fit => .fit,
-
@@ -1316,7 +1321,7 @@ };std.log.debug("Sent get_image request (ID={d})", .{req_id}); download.ready.timedWait(5_000 * std.time.ns_per_ms) catch { download.ready.timedWait(5 * std.time.ns_per_s) catch { std.log.err("get_image timeout", .{}); // Reusing listeners wait without timeout. So we have to manually
-
@@ -1355,3 +1360,7 @@ ConnectedEvent.export_capi();Event.export_capi(); Connection.export_capi(); } test { _ = @import("./pool.zig"); }
-
-
-
@@ -27,3 +27,7 @@ discovery.export_capi();image.export_capi(); transport.export_capi(); } test { _ = @import("./connection.zig"); }
-
-
core/src/pool.zig (new)
-
@@ -0,0 +1,123 @@// Copyright 2025 Shota FUJI // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // SPDX-License-Identifier: Apache-2.0 const std = @import("std"); pub fn ObjectPool(comptime T: type, comptime size: usize) type { const Availability = u32; if (size > @bitSizeOf(Availability)) { @compileError( std.fmt.comptimePrint( "ObjectPool size must be less than or equals to {d} (got {d})", .{ @bitSizeOf(Availability), size }, ), ); } return struct { // a bit is set (1) = available, otherwise occupied. // ex) Availability = u8, size = 5 // 0b00011111 = Fully available // 0b00011110 = First item is occupied, second item is available // 0b00000000.. = Fully occupied availability: Availability = std.math.maxInt(Availability), lock: std.Thread.Mutex = .{}, cond: std.Thread.Condition = .{}, size: usize = size, pool: [size]T = undefined, pub fn acquire(self: *@This()) *T { self.lock.lock(); defer self.lock.unlock(); while (true) { const index = @ctz(self.availability); if (index < size) { self.availability &= ~(@as(Availability, 0b1) << @min(std.math.maxInt(u5), index)); return &self.pool[index]; } self.cond.wait(&self.lock); } } pub fn release(self: *@This(), item: *const T) void { defer self.cond.signal(); self.lock.lock(); defer self.lock.unlock(); const pool: [*]const T = &self.pool; self.availability |= @as(Availability, 0b1) << @min(std.math.maxInt(u5), item - pool); } }; } test ObjectPool { const Pool = ObjectPool(usize, 5); const num_items = 6; const Runner = struct { pool: Pool = .{}, threads: [num_items]std.Thread = undefined, asserted: std.Thread.ResetEvent = .{}, wrote_lock: std.Thread.Mutex = .{}, wrote_times: usize = 0, pool_exhausted: std.Thread.ResetEvent = .{}, pub fn run(self: *@This(), i: usize) void { const item = self.pool.acquire(); defer self.pool.release(item); item.* = i; { self.wrote_lock.lock(); defer self.wrote_lock.unlock(); self.wrote_times += 1; if (self.wrote_times == 5) { self.pool_exhausted.set(); } } self.asserted.wait(); } }; var runner: Runner = .{}; for (&runner.threads, 1..) |*thread, i| { thread.* = try std.Thread.spawn(.{}, Runner.run, .{ &runner, i }); } errdefer for (runner.threads) |thread| { thread.detach(); }; try runner.pool_exhausted.timedWait(std.time.ns_per_s); for (runner.pool.pool) |item| { try std.testing.expect(item > 0); try std.testing.expect(item < 6); } runner.asserted.set(); for (runner.threads) |thread| { thread.join(); } try std.testing.expectEqual(num_items, runner.wrote_times); }
-