Changes
3 changed files (+214/-3)
-
-
@@ -362,7 +362,7 @@ const LoadListener = JsonResponseListener(BrowseService.Load.Response, 5_000);const ControlListener = Listener(transport.ControlResultCode, 3_000); const SeekListener = Listener(transport.SeekResultCode, 2_000); const ImageDownloader = image.Downloader(10); const ImageDownloader = image.Downloader(.{}); internal: *Internal,
-
-
-
@@ -261,6 +261,28 @@ const result = try makeError(code);return result.retain(); } pub fn makeCached(image: *Image) std.mem.Allocator.Error!*@This() { const internal = try allocator.create(Internal); errdefer allocator.destroy(internal); internal.* = .{}; const self = try allocator.create(@This()); errdefer allocator.destroy(self); self.* = .{ .internal = internal, .image = image.retain(), }; return self; } pub inline fn makeCachedRetained(image: *Image) std.mem.Allocator.Error!*@This() { const result = try makeCached(image); return result.retain(); } pub fn retain(ptr: ?*@This()) callconv(.C) *@This() { var self = ptr orelse @panic( std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }),
-
@@ -296,7 +318,178 @@ GetOptions.export_capi();GetResult.export_capi(); } pub fn Downloader(concurrent_download_limit: u5) type { fn DoubleFifoCache(comptime T: type, short_size: usize, main_size: usize) type { return struct { pub const Data = T; pub const Entry = struct { hash: u64, data: *Data, accessed: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), }; allocator: std.mem.Allocator, short: std.DoublyLinkedList(Entry) = .{}, main: std.DoublyLinkedList(Entry) = .{}, pub fn init(allocator: std.mem.Allocator) @This() { return .{ .allocator = allocator, }; } pub fn deinit(self: *@This()) void { while (self.short.pop()) |popped| { self.release(popped); } while (self.main.pop()) |popped| { self.release(popped); } } pub fn get(self: *@This(), hash: u64) ?*Data { var entry = self.short.first; while (entry) |e| : (entry = e.next) { if (e.data.hash == hash) { e.data.accessed.store(true, .release); return e.data.data; } } entry = self.main.first; while (entry) |e| : (entry = e.next) { if (e.data.hash == hash) { return e.data.data; } } return null; } fn release(self: *const @This(), node: *std.DoublyLinkedList(Entry).Node) void { if (@hasDecl(Data, "release")) { node.data.data.release(); } self.allocator.destroy(node); } fn evictMain(self: *@This()) void { if (self.main.len <= main_size) { return; } const popped = self.main.pop() orelse return; self.release(popped); } fn evictShort(self: *@This()) void { if (self.short.len <= short_size) { return; } const popped = self.short.pop() orelse return; if (!popped.data.accessed.load(.unordered)) { self.release(popped); return; } self.main.prepend(popped); } pub fn put(self: *@This(), hash: u64, data: *Data) std.mem.Allocator.Error!void { if (@hasDecl(Data, "retain")) { _ = data.retain(); } const node = try self.allocator.create(std.DoublyLinkedList(Entry).Node); node.* = .{ .data = .{ .data = data, .hash = hash, }, }; self.short.prepend(node); self.evictShort(); } }; } test DoubleFifoCache { const Cache = DoubleFifoCache(struct { n: u64, rc: usize = 0, pub fn retain(self: *@This()) void { self.rc += 1; } pub fn release(self: *@This()) void { self.rc -= 1; } }, 3, 10); var cache = Cache.init(std.testing.allocator); var n1 = Cache.Data{ .n = 1 }; try cache.put(1, &n1); var n2 = Cache.Data{ .n = 2 }; try cache.put(2, &n2); var n3 = Cache.Data{ .n = 3 }; try cache.put(3, &n3); try std.testing.expectEqual(2, cache.get(2).?.n); try std.testing.expectEqual(3, cache.get(3).?.n); try std.testing.expect(cache.get(4) == null); try std.testing.expectEqual(3, cache.short.len); try std.testing.expectEqual(0, cache.main.len); var n4 = Cache.Data{ .n = 4 }; try cache.put(4, &n4); try std.testing.expectEqual(4, cache.get(4).?.n); try std.testing.expectEqual(3, cache.short.len); try std.testing.expectEqual(0, cache.main.len); var n5 = Cache.Data{ .n = 5 }; try cache.put(5, &n5); try std.testing.expect(cache.get(1) == null); try std.testing.expectEqual(2, cache.get(2).?.n); try std.testing.expectEqual(3, cache.get(3).?.n); try std.testing.expectEqual(4, cache.get(4).?.n); try std.testing.expectEqual(5, cache.get(5).?.n); try std.testing.expectEqual(3, cache.short.len); try std.testing.expectEqual(1, cache.main.len); try std.testing.expectEqual(0, n1.rc); try std.testing.expectEqual(1, n2.rc); try std.testing.expectEqual(1, n3.rc); try std.testing.expectEqual(1, n4.rc); try std.testing.expectEqual(1, n5.rc); cache.deinit(); try std.testing.expectEqual(0, n1.rc); try std.testing.expectEqual(0, n2.rc); try std.testing.expectEqual(0, n3.rc); try std.testing.expectEqual(0, n4.rc); try std.testing.expectEqual(0, n5.rc); } pub const DownloaderOptions = struct { concurrent_download_limit: u5 = 10, short_cache_size: usize = 10, main_cache_size: usize = 50, }; pub fn Downloader(opts: DownloaderOptions) type { return struct { const Job = struct { ready: std.Thread.ResetEvent = .{},
-
@@ -304,23 +497,27 @@ result: *GetResult = undefined,req_hash: u64, }; const Cache = DoubleFifoCache(Image, opts.short_cache_size, opts.main_cache_size); // a bit is set (1) = available, otherwise occupied. // ex) truncating size to u8, concurrent download limit = 5 // 0b00011111 = Fully available // 0b00011110 = First item is occupied, second item is available // 0b00000000 = Fully occupied downloads: u32 = ~(@as(u32, std.math.maxInt(u32)) << concurrent_download_limit), downloads: u32 = ~(@as(u32, std.math.maxInt(u32)) << opts.concurrent_download_limit), mutex: std.Thread.Mutex = .{}, cond: std.Thread.Condition = .{}, downloads_queue: std.DoublyLinkedList(Job) = .{}, http_client: std.http.Client, is_closed: bool = false, cache: Cache, pub fn init(allocator: std.mem.Allocator) @This() { return .{ .http_client = std.http.Client{ .allocator = allocator, }, .cache = Cache.init(allocator), }; }
-
@@ -332,6 +529,7 @@ self.cond.signal();self.mutex.unlock(); self.http_client.deinit(); self.cache.deinit(); } fn hash(url: []const u8) u64 {
-
@@ -361,6 +559,11 @@ };defer allocator.free(url); const req_hash = hash(url); if (self.cache.get(req_hash)) |cached| { std.log.debug("Using cached image for {s}", .{url}); return try GetResult.makeCachedRetained(cached); } { var node = self.downloads_queue.first;
-
@@ -449,6 +652,13 @@ .data = response.items,}; const get_result = try GetResult.make(&moo_resp); if (get_result.image) |image| { self.cache.put(req_hash, image) catch { std.log.warn("Unable to cache downloaded image due to out of memory error", .{}); }; } node.data.result = get_result; node.data.ready.set();
-
-
-
@@ -30,5 +30,6 @@ }test { _ = @import("./connection.zig"); _ = @import("./image.zig"); _ = @import("./services/ImageService.zig"); }
-