Changes
4 changed files (+45/-32)
-
-
@@ -94,12 +94,12 @@ }}; allocator: std.mem.Allocator, pool: std.Thread.Pool = undefined, on_server_change: OnServerChange.Store, on_connection_change: OnConnectionChange.Store, on_restore_complete: OnRestoreComplete.Store, capi_lock: std.Thread.Mutex, state_file_path: ?[]const u8 = null, restore_thread: ?std.Thread = null, pub fn init(allocator: std.mem.Allocator) App { return .{
-
@@ -111,16 +111,14 @@ .capi_lock = std.Thread.Mutex{},}; } pub fn deinit(self: *const App) void { if (self.restore_thread) |t| { t.join(); } pub fn deinit(self: *App) void { if (self.state_file_path) |path| { self.allocator.free(path); } self.on_restore_complete.deinit(); self.on_connection_change.deinit(); self.on_server_change.deinit(); self.pool.deinit(); } pub const CApi = extern struct {
-
@@ -144,15 +142,23 @@ err_websocket = 10,err_not_found = 11, }; pub fn init(allocator: std.mem.Allocator) std.mem.Allocator.Error!CApi { pub const InitError = error{ PoolInitError, } || std.mem.Allocator.Error; pub fn init(allocator: std.mem.Allocator) InitError!CApi { const internal = try allocator.create(App); errdefer allocator.destroy(internal); internal.* = App.init(allocator); internal.pool.init(.{ .allocator = allocator, .n_jobs = 4 }) catch |err| { std.log.err("Unable to initialize thread pool: {s}", .{@errorName(err)}); return InitError.PoolInitError; }; errdefer internal.deinit(); const server_selector = try allocator.create(ServerSelector.CApi); errdefer allocator.destroy(server_selector); server_selector.* = try ServerSelector.CApi.init(allocator); server_selector.* = try ServerSelector.CApi.init(allocator, &internal.pool); errdefer server_selector.deinit(allocator); return .{
-
@@ -221,11 +227,7 @@ self.internal.on_restore_complete.runAll(.{});return; } self.internal.restore_thread = std.Thread.spawn( .{}, restoreWorker, .{ self, file_path }, ) catch |err| { self.internal.pool.spawn(restoreWorker, .{ self, file_path }) catch |err| { std.log.err("Unable to spawn thread for restore job: {s}", .{@errorName(err)}); self.internal.on_restore_complete.runAll(.{}); return;
-
@@ -342,7 +344,7 @@ self.internal.on_connection_change.runAll(.{});return; } else null; const thread = std.Thread.spawn(.{}, connectWorker, .{ self, server_id_t, saved_token_t }) catch { self.internal.pool.spawn(connectWorker, .{ self, server_id_t, saved_token_t }) catch { self.internal.allocator.free(server_id_t); if (saved_token_t) |t| { self.internal.allocator.free(t);
-
@@ -355,7 +357,6 @@ self.connection = .err_thread_spawn;self.internal.on_connection_change.runAll(.{}); return; }; thread.detach(); } pub fn lock(capi_ptr: ?*CApi) callconv(.C) void {
-
@@ -423,6 +424,7 @@ if (std.mem.eql(u8, entry.id[0..entry.id_len], server_id)) {const conn, const token = register( self.internal.allocator, entry.getAddr(), &self.internal.pool, server_id, saved_token, ) catch |err| {
-
@@ -456,6 +458,7 @@ entry.getId(),entry.getName(), entry.getVersion(), token, &self.internal.pool, ) catch { self.internal.capi_lock.lock(); defer self.internal.capi_lock.unlock();
-
@@ -505,6 +508,7 @@const conn, const token = register( self.internal.allocator, info.addr, &self.internal.pool, server_id, saved_token, ) catch |err| {
-
@@ -552,6 +556,7 @@ info.id,info.name, info.version, token, &self.internal.pool, ) catch { self.internal.capi_lock.lock(); defer self.internal.capi_lock.unlock();
-
@@ -581,12 +586,13 @@fn register( allocator: std.mem.Allocator, address: std.net.Address, pool: *std.Thread.Pool, server_id: []const u8, saved_token: ?[]const u8, ) !struct { *Connection, []const u8 } { const conn = try allocator.create(Connection); errdefer allocator.destroy(conn); conn.* = try Connection.init(allocator, address); conn.* = try Connection.init(allocator, address, pool); errdefer conn.deinit(); conn.listen(PingService.handleRequest) catch {
-
-
-
@@ -28,6 +28,7 @@ const OnZoneListLoadingChange = callback.Callback(struct {});allocator: std.mem.Allocator, address: std.net.Address, pool: *std.Thread.Pool, on_zone_add: OnZoneAdd.Store, on_zone_list_loading_change: OnZoneListLoadingChange.Store,
-
@@ -38,11 +39,13 @@ pub fn init(allocator: std.mem.Allocator, address: std.net.Address, conn: *connection.Connection, pool: *std.Thread.Pool, ) std.mem.Allocator.Error!Self { return .{ .allocator = allocator, .address = address, .conn = conn, .pool = pool, .on_zone_add = OnZoneAdd.Store.init(allocator), .on_zone_list_loading_change = OnZoneListLoadingChange.Store.init(allocator), };
-
@@ -86,10 +89,11 @@ id: []const u8,name: []const u8, version: []const u8, token: []const u8, pool: *std.Thread.Pool, ) std.mem.Allocator.Error!CApi { const internal = try allocator.create(Self); errdefer allocator.destroy(internal); internal.* = try Self.init(allocator, address, conn); internal.* = try Self.init(allocator, address, conn, pool); errdefer internal.deinit(); const id_z = try allocator.dupeZ(u8, id);
-
@@ -146,14 +150,13 @@std.log.debug("Loading list of zones...", .{}); capi.zones_loading = if (capi.zones_loading == .not_loaded) .loading else .refreshing; const thread = std.Thread.spawn(.{}, loadZonesWorker, .{capi}) catch |err| { capi.internal.pool.spawn(loadZonesWorker, .{capi}) catch |err| { std.log.err("Failed to spawn thread for loading zones: {s}", .{@errorName(err)}); capi.zones_loading = .err_thread_spawn; capi.internal.on_zone_list_loading_change.runAll(.{}); return; }; thread.detach(); } fn loadZonesWorker(self: *CApi) void {
-
-
-
@@ -29,17 +29,19 @@allocator: std.mem.Allocator, on_change_callbacks: OnChange.Store, capi_lock: std.Thread.Mutex, pool: *std.Thread.Pool, has_loaded_once: bool = false, /// Acts as backing buffer for `CApi.entries`. For this reason, `CApi` manages this /// struct's memory. scan_result: ?std.StringHashMap(Entry.CApi) = null, pub fn init(allocator: std.mem.Allocator) Self { pub fn init(allocator: std.mem.Allocator, pool: *std.Thread.Pool) Self { return .{ .allocator = allocator, .on_change_callbacks = OnChange.Store.init(allocator), .capi_lock = std.Thread.Mutex{}, .pool = pool, }; }
-
@@ -66,10 +68,10 @@ err_socket = 8,err_thread_spawn = 9, }; pub fn init(allocator: std.mem.Allocator) std.mem.Allocator.Error!CApi { pub fn init(allocator: std.mem.Allocator, pool: *std.Thread.Pool) std.mem.Allocator.Error!CApi { const internal = try allocator.create(Self); errdefer allocator.destroy(internal); internal.* = Self.init(allocator); internal.* = Self.init(allocator, pool); errdefer internal.deinit(); return .{
-
@@ -111,14 +113,13 @@ }pub fn load(capi_ptr: ?*CApi) callconv(.C) void { const capi = capi_ptr orelse return; const thread = std.Thread.spawn(.{}, loadInternal, .{capi}) catch { capi.internal.pool.spawn(loadInternal, .{capi}) catch { capi.internal.capi_lock.lock(); defer capi.internal.capi_lock.unlock(); defer capi.internal.on_change_callbacks.runAll(.{}); capi.state = .err_thread_spawn; return; }; thread.detach(); } fn loadInternal(self: *CApi) void {
-
-
-
@@ -42,7 +42,7 @@ // Making this non-pointer causes invalid read/write when thread terminates.ws: *websocket.Client, addr: []const u8, rng: std.Random.Xoshiro256, thread: ?std.Thread = null, pool: *std.Thread.Pool, responses: ResponsesStore, responses_mutex: *std.Thread.Mutex,
-
@@ -52,7 +52,7 @@ WebSocketHandshakeError,PRNGSeedGenerationFailure, } || std.mem.Allocator.Error; pub fn init(child_allocator: std.mem.Allocator, address: std.net.Address) InitError!@This() { pub fn init(child_allocator: std.mem.Allocator, address: std.net.Address, pool: *std.Thread.Pool) InitError!@This() { var tsa = try child_allocator.create(std.heap.ThreadSafeAllocator); tsa.* = std.heap.ThreadSafeAllocator{ .child_allocator = child_allocator }; const allocator = tsa.allocator();
-
@@ -98,6 +98,7 @@ return InitError.PRNGSeedGenerationFailure;}; break :seed seed; }), .pool = pool, .responses = responses, .responses_mutex = responses_mutex, .thread_safe_allocator = tsa,
-
@@ -106,7 +107,15 @@ }pub fn listen(self: *Connection, on_request: RequestHandler) std.Thread.SpawnError!void { std.log.debug("Spawning WebSocket request handler thread...", .{}); self.thread = try std.Thread.spawn(.{}, readLoop, .{ self, on_request }); self.pool.spawn(readLoop, .{ self, on_request }) catch |err| { // From user's perspective, normal spawn error and pool's error is no // different. std.log.warn( "Spawning a thread from pool failed, simplifying error ({s})", .{@errorName(err)}, ); return std.Thread.SpawnError.Unexpected; }; } pub fn deinit(self: *Connection) void {
-
@@ -121,12 +130,6 @@ };self.ws.close(.{ .code = 4000, .reason = "bye" }) catch |err| { std.log.warn("Unable to close WebSocket client, proceeding: {s}", .{@errorName(err)}); }; if (self.thread) |thread| { std.log.debug("Waiting WebSocket thread to terminate...", .{}); // Wait for read thread to terminate. thread.join(); } std.log.debug("Releasing WebSocket resources...", .{}); self.ws.deinit();
-