Changes
5 changed files (+283/-290)
-
-
@@ -25,7 +25,6 @@ const discovery = @import("./discovery.zig");const extension = @import("./extension.zig").extension; const image = @import("./image.zig"); const freelog = @import("./log.zig").freelog; const ObjectPool = @import("./pool.zig").ObjectPool; const BrowseService = @import("./services/BrowseService.zig"); const ImageService = @import("./services/ImageService.zig"); const PingService = @import("./services/PingService.zig");
-
@@ -354,40 +353,6 @@ fn JsonResponseListener(comptime T: type, timeout_ms: usize) type {return Listener(moo.JsonBody(T), timeout_ms); } const ImageDownload = struct { hash: u64, ready: std.Thread.ResetEvent = .{}, downloaded: *image.GetResult = undefined, pub fn getHash( image_key: []const u8, opts: *const image.GetOptions, ) u64 { var hasher = std.hash.Wyhash.init(0); std.hash.autoHashStrat(&hasher, image_key, .Deep); std.hash.autoHashStrat(&hasher, opts.internal.data, .Deep); return hasher.final(); } pub fn init( image_key: []const u8, opts: *const image.GetOptions, ) @This() { return .{ .hash = ImageDownload.getHash(image_key, opts) }; } pub fn write(self: *@This(), data: *image.GetResult) void { self.downloaded = data.retain(); self.ready.set(); } pub fn deinit(self: *const @This()) void { if (self.ready.isSet()) { self.downloaded.release(); } } }; pub const Connection = extern struct { const cname = "plac_connection"; const allocator = std.heap.c_allocator;
-
@@ -396,10 +361,13 @@ const BrowseListener = JsonResponseListener(BrowseService.Browse.Response, 5_000);const LoadListener = JsonResponseListener(BrowseService.Load.Response, 5_000); const ControlListener = Listener(transport.ControlResultCode, 3_000); const SeekListener = Listener(transport.SeekResultCode, 2_000); const ImageDownloader = image.Downloader(10); internal: *Internal, pub const Internal = struct { tsa: std.heap.ThreadSafeAllocator = .{ .child_allocator = allocator }, server: *discovery.Server, ws: ?websocket.Client = null, request_id: u64 = 0,
-
@@ -411,9 +379,7 @@ arc: Arc = .{},saved_token: ?[]const u8 = null, browse_listeners: std.AutoHashMap(u64, BrowseListener), load_listeners: std.AutoHashMap(u64, LoadListener), image_downloads: std.AutoHashMap(u64, *ImageDownload), image_downloads_lock: std.Thread.Mutex = .{}, image_download_req_pool: ObjectPool(ImageService.Get.Request, 10) = .{}, image_downloads: ImageDownloader, control_events: std.AutoHashMap(u64, ControlListener), seek_events: std.AutoHashMap(u64, SeekListener),
-
@@ -454,7 +420,7 @@fn initListeners(self: *Internal) void { self.browse_listeners = std.AutoHashMap(u64, BrowseListener).init(allocator); self.load_listeners = std.AutoHashMap(u64, LoadListener).init(allocator); self.image_downloads = std.AutoHashMap(u64, *ImageDownload).init(allocator); self.image_downloads = ImageDownloader.init(self.tsa.allocator()); self.control_events = std.AutoHashMap(u64, ControlListener).init(allocator); self.seek_events = std.AutoHashMap(u64, SeekListener).init(allocator); }
-
@@ -463,10 +429,6 @@ fn deinitListeners(self: *Internal) void {self.browse_listeners.deinit(); self.load_listeners.deinit(); var image_downloads = self.image_downloads.valueIterator(); while (image_downloads.next()) |download| { download.*.deinit(); } self.image_downloads.deinit(); self.control_events.deinit(); self.seek_events.deinit();
-
@@ -715,33 +677,6 @@ entry.value_ptr.write(res);continue; } if (self.internal.image_downloads.getEntry(header.request_id)) |entry| { std.log.debug("Received /get_image response (ID={d})", .{header.request_id}); const res = ImageService.Get.Response.decode( &meta, header_ctx, msg.data, ) catch |err| { std.log.debug("Received unexpected get_image response: {s}", .{@errorName(err)}); entry.value_ptr.*.write( image.GetResult.makeError(.unexpected_response) catch |obj_err| { std.log.err("Unable to compose image.GetResult: {s}", .{@errorName(obj_err)}); continue; }, ); continue; }; entry.value_ptr.*.write(image.GetResult.make(&res) catch |err| { std.log.err("Unable to compose image.GetResult: {s}", .{@errorName(err)}); continue; }); continue; } if (self.internal.control_events.getEntry(header.request_id)) |entry| { std.log.debug("Received /control response (ID={d})", .{header.request_id});
-
@@ -1239,106 +1174,35 @@ );const image_key = std.mem.span(image_key_ptr); const opts = image.GetOptions.retain(opts_ptr); defer opts.release(); var ws = self.internal.ws orelse { std.log.err( "{s}_{s} called, but WebSocket connection is not ready", .{ cname, @src().fn_name }, ); return image.GetResult.makeRetainedError(.socket_closed) catch null; }; const req_id = self.getRequestId(); std.log.debug("Sending get_image request...", .{}); const req = self.internal.image_download_req_pool.acquire(); defer self.internal.image_download_req_pool.release(req); req.* = ImageService.Get.Request{ .image_key = image_key, .format = if (opts.internal.data.content_type) |t| switch (t) { .jpeg => .jpeg, .png => .png, } else null, }; const req = req: { const opts = image.GetOptions.retain(opts_ptr); defer opts.release(); const req_hash = ImageDownload.getHash(image_key, opts); var r = ImageService.Get.Request{ .image_key = image_key, .format = if (opts.internal.data.content_type) |t| switch (t) { .jpeg => .jpeg, .png => .png, } else null, }; { self.internal.image_downloads_lock.lock(); defer self.internal.image_downloads_lock.unlock(); var download_iter = self.internal.image_downloads.valueIterator(); while (download_iter.next()) |download| { if (download.*.hash == req_hash) { std.log.debug("Using existing request for image_key={s}", .{image_key}); download.*.ready.wait(); return download.*.downloaded.retain(); } if (opts.internal.data.size) |size| { r.scale = switch (size.scaling_method) { .fit => .fit, .fill => .fill, .stretch => .stretch, }; r.width = size.width; r.height = size.height; } } if (opts.internal.data.size) |size| { req.scale = switch (size.scaling_method) { .fit => .fit, .fill => .fill, .stretch => .stretch, }; req.width = size.width; req.height = size.height; } const req_msg = req.encode(allocator, req_id) catch |err| { std.log.err("Unable to compose get_image request: {s}", .{@errorName(err)}); return image.GetResult.makeRetainedError(.unknown_error) catch null; break :req r; }; defer allocator.free(req_msg); var download = ImageDownload.init(image_key, opts); { self.internal.image_downloads_lock.lock(); defer self.internal.image_downloads_lock.unlock(); self.internal.image_downloads.put(req_id, &download) catch |err| { std.log.err("Unable to set listener for get_image response: {s}", .{@errorName(err)}); return image.GetResult.makeRetainedError(.unknown_error) catch null; }; } defer { self.internal.image_downloads_lock.lock(); defer self.internal.image_downloads_lock.unlock(); download.deinit(); _ = self.internal.image_downloads.remove(req_id); } ws.writeBin(req_msg) catch |err| { std.log.err("Unable to write get_image request: {s}", .{@errorName(err)}); return image.GetResult.makeRetainedError(.failed_to_send) catch null; }; std.log.debug("Sent get_image request (ID={d})", .{req_id}); download.ready.timedWait(5 * std.time.ns_per_s) catch { std.log.err("get_image timeout", .{}); // Reusing listeners wait without timeout. So we have to manually // set an error response for them. download.write(image.GetResult.makeError(.timeout) catch { std.log.err( "Failed to set timeout error to reusing listeners," ++ " the listeners goes stale and leaks memory: out of memory", .{}, ); return null; }); return image.GetResult.makeRetainedError(.timeout) catch null; return self.internal.image_downloads.download(allocator, &req, self.internal.server) catch { std.log.err("Out of memory during image download", .{}); return image.GetResult.makeRetainedError(.out_of_memory) catch null; }; return download.downloaded.retain(); } pub fn export_capi() void {
-
@@ -1360,7 +1224,3 @@ ConnectedEvent.export_capi();Event.export_capi(); Connection.export_capi(); } test { _ = @import("./pool.zig"); }
-
-
-
@@ -17,6 +17,7 @@const std = @import("std"); const Arc = @import("./Arc.zig"); const Server = @import("./discovery.zig").Server; const freelog = @import("./log.zig").freelog; const ImageService = @import("./services/ImageService.zig");
-
@@ -294,3 +295,164 @@ Image.export_capi();GetOptions.export_capi(); GetResult.export_capi(); } pub fn Downloader(concurrent_download_limit: u5) type { return struct { const Job = struct { ready: std.Thread.ResetEvent = .{}, result: *GetResult = undefined, req_hash: u64, }; // a bit is set (1) = available, otherwise occupied. // ex) truncating size to u8, concurrent download limit = 5 // 0b00011111 = Fully available // 0b00011110 = First item is occupied, second item is available // 0b00000000 = Fully occupied downloads: u32 = ~(@as(u32, std.math.maxInt(u32)) << concurrent_download_limit), mutex: std.Thread.Mutex = .{}, cond: std.Thread.Condition = .{}, downloads_queue: std.DoublyLinkedList(Job) = .{}, http_client: std.http.Client, is_closed: bool = false, pub fn init(allocator: std.mem.Allocator) @This() { return .{ .http_client = std.http.Client{ .allocator = allocator, }, }; } pub fn deinit(self: *@This()) void { self.is_closed = true; self.mutex.lock(); self.downloads = std.math.maxInt(u32); self.cond.signal(); self.mutex.unlock(); self.http_client.deinit(); } fn hash(url: []const u8) u64 { var hasher = std.hash.Wyhash.init(0); std.hash.autoHashStrat(&hasher, url, .Deep); return hasher.final(); } pub fn download( self: *@This(), allocator: std.mem.Allocator, req: *const ImageService.Get.Request, server: *Server, ) std.mem.Allocator.Error!*GetResult { _ = server.retain(); defer server.release(); const format = req.format orelse { std.log.err("Downloading image without specifying format is not currently supported", .{}); return try GetResult.makeRetainedError(.failed_to_send); }; const url = req.url(allocator, server.internal.address) catch |err| { std.log.err("Unable to construct image download URL: {s}", .{@errorName(err)}); return try GetResult.makeRetainedError(.unknown_error); }; defer allocator.free(url); const req_hash = hash(url); { var node = self.downloads_queue.first; while (node) |d| : (node = d.next) { if (d.data.req_hash == req_hash) { d.data.ready.wait(); return d.data.result.retain(); } } } var node: std.DoublyLinkedList(Job).Node = .{ .data = .{ .req_hash = req_hash, }, }; self.downloads_queue.prepend(&node); defer self.downloads_queue.remove(&node); const job_index: u5 = job_index: { self.mutex.lock(); defer self.mutex.unlock(); while (self.downloads == 0) { self.cond.wait(&self.mutex); } if (self.is_closed) { return try GetResult.makeRetainedError(.socket_closed); } const i = @min(@ctz(self.downloads), std.math.maxInt(u5)); self.downloads &= ~(@as(u32, 1) << i); break :job_index i; }; std.log.debug("Downloading image({d}) {s}", .{ job_index, url }); defer { self.mutex.lock(); defer self.mutex.unlock(); if (!self.is_closed) { self.downloads |= @as(u32, 1) << job_index; self.cond.signal(); } } var response = std.ArrayList(u8).init(allocator); defer response.deinit(); const result = self.http_client.fetch(.{ .location = .{ .url = url }, .method = .GET, .redirect_behavior = .not_allowed, .headers = .{ .content_type = .{ .override = if (format == .jpeg) "image/jpeg" else "image/png", }, }, .response_storage = .{ .dynamic = &response }, }) catch |err| { if (err == error.ConnectionTimedOut) { std.log.err("Image download timed out", .{}); return try GetResult.makeRetainedError(.timeout); } std.log.err("Failed to download image: {s}", .{@errorName(err)}); return try GetResult.makeRetainedError(.unknown_error); }; if (result.status != .ok) { std.log.err("Unexpected image download response: HTTP({d}) {s}", .{ @intFromEnum(result.status), @tagName(result.status), }); return try GetResult.makeRetainedError(.unexpected_response); } const moo_resp = ImageService.Get.Response{ // TODO: Set from parsed HTTP response header .content_type = format, .data = response.items, }; const get_result = try GetResult.make(&moo_resp); node.data.result = get_result; node.data.ready.set(); return get_result.retain(); } }; }
-
-
-
@@ -30,4 +30,5 @@ }test { _ = @import("./connection.zig"); _ = @import("./services/ImageService.zig"); }
-
-
core/src/pool.zig (deleted)
-
@@ -1,123 +0,0 @@// Copyright 2025 Shota FUJI // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // SPDX-License-Identifier: Apache-2.0 const std = @import("std"); pub fn ObjectPool(comptime T: type, comptime size: usize) type { const Availability = u32; if (size > @bitSizeOf(Availability)) { @compileError( std.fmt.comptimePrint( "ObjectPool size must be less than or equals to {d} (got {d})", .{ @bitSizeOf(Availability), size }, ), ); } return struct { // a bit is set (1) = available, otherwise occupied. // ex) Availability = u8, size = 5 // 0b00011111 = Fully available // 0b00011110 = First item is occupied, second item is available // 0b00000000.. = Fully occupied availability: Availability = std.math.maxInt(Availability), lock: std.Thread.Mutex = .{}, cond: std.Thread.Condition = .{}, size: usize = size, pool: [size]T = undefined, pub fn acquire(self: *@This()) *T { self.lock.lock(); defer self.lock.unlock(); while (true) { const index = @ctz(self.availability); if (index < size) { self.availability &= ~(@as(Availability, 0b1) << @min(std.math.maxInt(u5), index)); return &self.pool[index]; } self.cond.wait(&self.lock); } } pub fn release(self: *@This(), item: *const T) void { defer self.cond.signal(); self.lock.lock(); defer self.lock.unlock(); const pool: [*]const T = &self.pool; self.availability |= @as(Availability, 0b1) << @min(std.math.maxInt(u5), item - pool); } }; } test ObjectPool { const Pool = ObjectPool(usize, 5); const num_items = 6; const Runner = struct { pool: Pool = .{}, threads: [num_items]std.Thread = undefined, asserted: std.Thread.ResetEvent = .{}, wrote_lock: std.Thread.Mutex = .{}, wrote_times: usize = 0, pool_exhausted: std.Thread.ResetEvent = .{}, pub fn run(self: *@This(), i: usize) void { const item = self.pool.acquire(); defer self.pool.release(item); item.* = i; { self.wrote_lock.lock(); defer self.wrote_lock.unlock(); self.wrote_times += 1; if (self.wrote_times == 5) { self.pool_exhausted.set(); } } self.asserted.wait(); } }; var runner: Runner = .{}; for (&runner.threads, 1..) |*thread, i| { thread.* = try std.Thread.spawn(.{}, Runner.run, .{ &runner, i }); } errdefer for (runner.threads) |thread| { thread.detach(); }; try runner.pool_exhausted.timedWait(std.time.ns_per_s); for (runner.pool.pool) |item| { try std.testing.expect(item > 0); try std.testing.expect(item < 6); } runner.asserted.set(); for (runner.threads) |thread| { thread.join(); } try std.testing.expectEqual(num_items, runner.wrote_times); }
-
-
-
@@ -93,6 +93,95 @@ try moo.encode(fbs.writer(), meta, header, body);return buf; } pub fn url(self: *const @This(), allocator: std.mem.Allocator, addr: std.net.Address) ![]const u8 { const path = try std.fmt.allocPrint(allocator, "/api/image/{s}", .{self.image_key}); defer allocator.free(path); var query = std.ArrayList(u8).init(allocator); defer query.deinit(); const query_writer = query.writer(); if (self.scale) |scale| { try std.fmt.format(query_writer, "&scale={s}", .{@tagName(scale)}); } if (self.width) |width| { try std.fmt.format(query_writer, "&width={d}", .{width}); } if (self.height) |height| { try std.fmt.format(query_writer, "&height={d}", .{height}); } if (self.format) |format| { try std.fmt.format(query_writer, "&format=image/{s}", .{@tagName(format)}); } const query_component: ?std.Uri.Component = if (query.items.len > 0) .{ .raw = query.items[1..], } else null; var origin = std.ArrayList(u8).init(allocator); defer origin.deinit(); try addr.format("", .{}, origin.writer()); var uri = std.Uri{ .scheme = "http", .host = .{ // std.net.Address is POSIX's address, which contains both IP address and port. // The type has no method to print only IP address part. .percent_encoded = origin.items, }, .path = .{ .raw = path, }, .query = query_component, }; var result = std.ArrayList(u8).init(allocator); errdefer result.deinit(); try uri.writeToStream(.{ .scheme = true, .authority = true, .path = true, .query = true, }, result.writer()); return result.toOwnedSlice(); } test url { const req = Request{ .image_key = "foo", }; const result = try req.url(std.testing.allocator, try std.net.Address.parseIp("127.0.0.1", 8080)); defer std.testing.allocator.free(result); try std.testing.expectEqualStrings("http://127.0.0.1:8080/api/image/foo", result); } test "url constructs search params" { const req = Request{ .image_key = "foo", .scale = .fit, .width = 100, .height = 200, .format = .png, }; const result = try req.url(std.testing.allocator, try std.net.Address.parseIp("127.0.0.1", 8080)); defer std.testing.allocator.free(result); try std.testing.expectEqualStrings( "http://127.0.0.1:8080/api/image/foo?scale=fit&width=100&height=200&format=image/png", result, ); } }; pub const Response = struct {
-
@@ -126,3 +215,7 @@ return .{ .content_type = content_type, .data = body.bytes };} }; }; test { _ = Get.Request.url; }
-