Changes
4 changed files (+474/-69)
-
-
@@ -112,13 +112,13 @@ };} pub fn deinit(self: *App) void { self.pool.deinit(); if (self.state_file_path) |path| { self.allocator.free(path); } self.on_restore_complete.deinit(); self.on_connection_change.deinit(); self.on_server_change.deinit(); self.pool.deinit(); } pub const CApi = extern struct {
-
@@ -459,6 +459,7 @@ entry.getName(),entry.getVersion(), token, &self.internal.pool, &self.internal.capi_lock, ) catch { self.internal.capi_lock.lock(); defer self.internal.capi_lock.unlock();
-
@@ -557,6 +558,7 @@ info.name,info.version, token, &self.internal.pool, &self.internal.capi_lock, ) catch { self.internal.capi_lock.lock(); defer self.internal.capi_lock.unlock();
-
-
-
@@ -23,35 +23,31 @@ pub const Zone = @import("./Server/Zone.zig");const Self = @This(); const Zones = std.StringHashMap(Zone.CApi); const OnZoneAdd = callback.Callback(*const Zone.CApi); const OnZoneListLoadingChange = callback.Callback(struct {}); allocator: std.mem.Allocator, address: std.net.Address, pool: *std.Thread.Pool, zones: Zones, on_zone_add: OnZoneAdd.Store, on_zone_list_loading_change: OnZoneListLoadingChange.Store, // Unowned. Caller manages the resource. conn: *connection.Connection, pub fn init( allocator: std.mem.Allocator, address: std.net.Address, conn: *connection.Connection, pool: *std.Thread.Pool, ) std.mem.Allocator.Error!Self { return .{ .allocator = allocator, .address = address, .conn = conn, .pool = pool, .on_zone_add = OnZoneAdd.Store.init(allocator), .on_zone_list_loading_change = OnZoneListLoadingChange.Store.init(allocator), }; } // Unowned. Caller manages the resource. capi_lock: *std.Thread.Mutex, fn deinit(self: *Self) void { var iter = self.zones.iterator(); while (iter.next()) |entry| { entry.value_ptr.deinit(self.allocator); self.allocator.free(entry.key_ptr.*); } self.zones.deinit(); pub fn deinit(self: *const Self) void { self.on_zone_add.deinit(); self.on_zone_list_loading_change.deinit(); }
-
@@ -64,6 +60,7 @@ name: [*:0]const u8,name_len: usize, version: [*:0]const u8, version_len: usize, // Each element is a reference to an item in `internal.zones`. zones: [*]*Zone.CApi, zones_len: usize, zones_loading: ZoneListLoading = .not_loaded,
-
@@ -90,10 +87,20 @@ name: []const u8,version: []const u8, token: []const u8, pool: *std.Thread.Pool, capi_lock: *std.Thread.Mutex, ) std.mem.Allocator.Error!CApi { const internal = try allocator.create(Self); errdefer allocator.destroy(internal); internal.* = try Self.init(allocator, address, conn, pool); internal.* = .{ .allocator = allocator, .address = address, .conn = conn, .pool = pool, .capi_lock = capi_lock, .zones = Zones.init(allocator), .on_zone_add = OnZoneAdd.Store.init(allocator), .on_zone_list_loading_change = OnZoneListLoadingChange.Store.init(allocator), }; errdefer internal.deinit(); const id_z = try allocator.dupeZ(u8, id);
-
@@ -124,13 +131,7 @@ };} pub fn deinit(self: *CApi) void { const zones = self.zones[0..self.zones_len]; for (zones) |zone| { zone.deinit(self.internal.allocator); self.internal.allocator.destroy(zone); } self.internal.allocator.free(zones); self.internal.allocator.free(self.zones[0..self.zones_len]); self.internal.allocator.free(self.token[0..self.token_len]); self.internal.allocator.free(self.version[0..self.version_len]); self.internal.allocator.free(self.name[0..self.name_len]);
-
@@ -160,11 +161,10 @@ };} fn loadZonesWorker(self: *CApi) void { self.loadZonesWorkerInner() catch |err| { std.log.err("Failed to fetch list of zones: {s}", .{@errorName(err)}); self.loadZonesWorkerInnerCont() catch |err| { std.log.err("Failed to subscribe to zone status: {s}", .{@errorName(err)}); self.zones_loading = switch (err) { error.OutOfMemory => ZoneListLoading.err_out_of_memory, error.NonSuccessResponse => ZoneListLoading.err_non_success, else => ZoneListLoading.err_unexpected, };
-
@@ -172,33 +172,90 @@ self.internal.on_zone_list_loading_change.runAll(.{});}; } fn loadZonesWorkerInner(self: *CApi) !void { const res = try Transport.getZones(self.internal.allocator, self.internal.conn); defer res.deinit(); fn loadZonesWorkerInnerCont(self: *CApi) !void { var subscription = Transport.ZoneChangeSubscription.init(self.internal.conn); var iter = try subscription.iter(self.internal.allocator); while (try iter.next(self.internal.allocator)) |event| { defer event.deinit(); self.internal.capi_lock.lock(); defer self.internal.capi_lock.unlock(); for (event.added_zones) |zone| { std.log.debug( "Adding zone \"{s}\" ({s})...", .{ zone.display_name, zone.zone_id }, ); const key = self.internal.zones.getKey(zone.zone_id) orelse try self.internal.allocator.dupe(u8, zone.zone_id); var prev = try self.internal.zones.fetchPut( key, try Zone.CApi.init(self.internal.allocator, &zone), ); if (prev) |*entry| { entry.value.deinit(self.internal.allocator); } } const zones = try self.internal.allocator.alloc(*Zone.CApi, res.value.zones.len); errdefer self.internal.allocator.free(zones); for (event.changed_zones) |zone| { std.log.debug( "Replacing zone \"{s}\" ({s})...", .{ zone.display_name, zone.zone_id }, ); for (res.value.zones, 0..) |zone, i| { const dst = try self.internal.allocator.create(Zone.CApi); errdefer self.internal.allocator.destroy(dst); const key = self.internal.zones.getKey(zone.zone_id) orelse try self.internal.allocator.dupe(u8, zone.zone_id); dst.* = try Zone.CApi.init(self.internal.allocator, &zone); zones[i] = dst; } var prev = try self.internal.zones.fetchPut( key, try Zone.CApi.init(self.internal.allocator, &zone), ); const prev = self.zones[0..self.zones_len]; for (prev) |z| { z.deinit(self.internal.allocator); self.internal.allocator.destroy(z); } self.internal.allocator.free(prev); if (prev) |*entry| { entry.value.deinit(self.internal.allocator); } } self.zones = zones.ptr; self.zones_len = zones.len; self.zones_loading = .loaded; for (event.removed_ids) |id| { if (self.internal.zones.getEntry(id)) |entry| { std.log.debug("Removing zone \"{s}\" ({s})...", .{ entry.value_ptr.name[0..entry.value_ptr.name_len], id, }); self.internal.on_zone_list_loading_change.runAll(.{}); const key = entry.key_ptr.*; entry.value_ptr.deinit(self.internal.allocator); // The `if` conditional already checks whether the key exists. _ = self.internal.zones.remove(id); // `entry.key_ptr` changes at the call of `remove`. // We have to store a pointer for the underlying buffer before that. self.internal.allocator.free(key); } } const zones_len = self.internal.zones.count(); const zones = try self.internal.allocator.alloc(*Zone.CApi, zones_len); var zone_iter = self.internal.zones.iterator(); var i: usize = 0; while (zone_iter.next()) |entry| { zones[i] = entry.value_ptr; i += 1; } self.internal.allocator.free(self.zones[0..self.zones_len]); self.zones = zones.ptr; self.zones_len = zones.len; self.zones_loading = .loaded; self.internal.on_zone_list_loading_change.runAll(.{}); } std.log.debug("Zone loading subscription finished", .{}); } pub fn onZoneAdd(capi_ptr: ?*CApi, cb: OnZoneAdd.Fn, userdata: callback.UserData) callconv(.C) void {
-
-
-
@@ -24,6 +24,9 @@ wrote: *std.Thread.ResetEvent,/// `null` on OOM. data: ?[]const u8, /// If this field is `true`, `data` property does not have meaningful data. canceled: bool = false, }; const ResponsesStore = std.AutoHashMap(i64, *Response);
-
@@ -45,6 +48,15 @@ rng: std.Random.Xoshiro256,pool: *std.Thread.Pool, responses: ResponsesStore, responses_mutex: *std.Thread.Mutex, subscription_id_cnt: u64 = 0, state: State = .connecting, state_mutex: std.Thread.Mutex = .{}, const State = enum { connecting, connected, closed, }; pub const InitError = error{ WebSocketClientCreationError,
-
@@ -119,17 +131,15 @@ };} pub fn deinit(self: *Connection) void { std.log.debug("Closing WebSocket connection...", .{}); // websocket.zig cannot terminate ongoing read via ".close()". // Manually shutting-down socket seems necessary. // Related: https://github.com/karlseguin/websocket.zig/issues/46 std.posix.shutdown(self.ws.stream.stream.handle, .recv) catch |err| { std.log.warn("Failed to shutdown socket handle of WebSocket: {s}", .{@errorName(err)}); }; self.ws.close(.{ .code = 4000, .reason = "bye" }) catch |err| { std.log.warn("Unable to close WebSocket client, proceeding: {s}", .{@errorName(err)}); const state = state: { self.state_mutex.lock(); defer self.state_mutex.unlock(); break :state self.state; }; if (state != .closed) { self.close(); } std.log.debug("Releasing WebSocket resources...", .{}); self.ws.deinit();
-
@@ -144,11 +154,6 @@if (entry.value_ptr.*.data) |data| { self.allocator.free(data); } // Emptying the data then set ResetEvent, to notify listeners that this handler // should be discarded. entry.value_ptr.*.data = null; entry.value_ptr.*.wrote.set(); } self.responses.deinit(); self.responses_mutex.unlock();
-
@@ -160,12 +165,43 @@ self.allocator.free(self.addr);self.thread_safe_allocator.child_allocator.destroy(self.thread_safe_allocator); } /// Closes connection and cancels active message listeners. fn close(self: *Connection) void { // Intentionally blocks `deinit` during closing procedure, so it'll run after closed. self.state_mutex.lock(); defer self.state_mutex.unlock(); std.log.debug("Closing WebSocket connection...", .{}); // websocket.zig cannot terminate ongoing read via ".close()". // Manually shutting-down socket seems necessary. // Related: https://github.com/karlseguin/websocket.zig/issues/46 std.posix.shutdown(self.ws.stream.stream.handle, .recv) catch |err| { std.log.warn("Failed to shutdown socket handle of WebSocket: {s}", .{@errorName(err)}); }; self.ws.close(.{ .code = 4000, .reason = "bye" }) catch |err| { std.log.warn("Unable to close WebSocket client, proceeding: {s}", .{@errorName(err)}); }; { self.responses_mutex.lock(); defer self.responses_mutex.unlock(); var iter = self.responses.iterator(); while (iter.next()) |entry| { entry.value_ptr.*.canceled = true; entry.value_ptr.*.wrote.set(); } } self.state = .closed; } pub fn newRequestId(self: *Connection) i64 { return self.rng.random().int(i64); } pub const RequestError = error{ CanceledOrOutOfMemory, Canceled, }; pub fn request(self: *Connection, request_id: i64, message: []u8) ![]const u8 {
-
@@ -191,11 +227,197 @@ try self.ws.writeBin(message);wrote.wait(); return response.data orelse return RequestError.CanceledOrOutOfMemory; if (response.canceled) { return RequestError.Canceled; } return response.data orelse return std.mem.Allocator.Error.OutOfMemory; } fn newSubscriptionId(self: *@This()) u64 { const id = self.subscription_id_cnt; self.subscription_id_cnt += 1; return id; } /// Returns an interface object for subscription management. /// /// `service_id` is an ID of the service. /// `subject` is an entity name to subscribe. /// /// The resulted service name would be `<service_id>/subscribe_<subject>` and /// `<service_id>/unsubscribe_<subject>`. /// /// Both `service_id` and `subject` must be pointer stable during lifetime of the /// returned struct. /// /// Caller have to call `.activate()` method on the returned struct to receive updates. /// To capture received updates, use `.next()` method on the returned struct. pub fn subscribe(self: *@This(), service_id: []const u8, subject: []const u8) Subscription { return Subscription{ .request_id = self.newRequestId(), .subscription_id = self.newSubscriptionId(), .conn = self, .response = Response{ .wrote = undefined, .data = null, }, .response_wrote = std.Thread.ResetEvent{}, .service_id = service_id, .subject = subject, }; } pub const Subscription = struct { request_id: i64, subscription_id: u64, conn: *Connection, response: Response, response_wrote: std.Thread.ResetEvent, service_id: []const u8, subject: []const u8, request_message: []u8 = undefined, /// Sets up a response listener. pub fn activate(self: *@This(), allocator: std.mem.Allocator) !void { self.response.wrote = &self.response_wrote; { self.conn.responses_mutex.lock(); defer self.conn.responses_mutex.unlock(); try self.conn.responses.put(self.request_id, &self.response); } const service_tmpl = "{s}/subscribe_{s}"; const service_len = std.fmt.count(service_tmpl, .{ self.service_id, self.subject }); const service = try allocator.alloc(u8, service_len); defer allocator.free(service); var service_fbs = std.io.fixedBufferStream(service); try std.fmt.format(service_fbs.writer(), service_tmpl, .{ self.service_id, self.subject }); const meta = moo.Metadata{ .service = service_fbs.getWritten(), .verb = "REQUEST", }; const body_tmpl = "{{\"subscription_key\":{}}}"; const body_len = comptime std.fmt.count(body_tmpl, .{std.math.maxInt(u64)}); var body_bytes: [body_len]u8 = undefined; var body_bytes_fbs = std.io.fixedBufferStream(&body_bytes); try std.fmt.format(body_bytes_fbs.writer(), body_tmpl, .{self.subscription_id}); const body = moo.RawBody{ .bytes = body_bytes_fbs.getWritten(), }; var header = body.getHeader(self.request_id); header.content_type = "application/json"; const req_buffer = try allocator.alloc( u8, meta.getEncodeSize() + header.getEncodeSize() + body.getEncodeSize(), ); defer allocator.free(req_buffer); var req_fbs = std.io.fixedBufferStream(req_buffer); try moo.encode(req_fbs.writer(), meta, header, body); try self.conn.ws.writeBin(req_fbs.getWritten()); } /// Release resources and unregister response handler. /// This does not sends unsubscribe command to the server. /// Call to this function after a call to `Connection.deinit()` is useless and invalid: /// `Connection.deinit()` releases this struct's resource as well. pub fn deinit(self: *@This()) void { self.conn.responses_mutex.lock(); defer self.conn.responses_mutex.unlock(); _ = self.conn.responses.remove(self.request_id); } /// Sends unsubscribe request to the server. pub fn unsubscribe(self: *@This(), allocator: std.mem.Allocator) !void { const request_id = self.conn.newRequestId(); const service_tmpl = "{s}/unsubscribe_{s}"; const service_len = std.fmt.count(service_tmpl, .{ self.service_id, self.subject }); const service = try allocator.alloc(u8, service_len); defer allocator.free(service); var service_fbs = std.io.fixedBufferStream(service); try std.fmt.format(service_fbs.writer(), service_tmpl, .{ self.service_id, self.subject }); const meta = moo.Metadata{ .service = service_fbs.getWritten(), .verb = "REQUEST", }; const body_tmpl = "{{\"subscription_key\":{}}}"; const body_len = comptime std.fmt.count(body_tmpl, .{std.math.maxInt(u64)}); var body_bytes: [body_len]u8 = undefined; var body_bytes_fbs = std.io.fixedBufferStream(&body_bytes); try std.fmt.format(body_bytes_fbs.writer(), body_tmpl, .{self.subscription_id}); const body = moo.RawBody{ .bytes = body_bytes_fbs.getWritten(), }; var header = body.getHeader(request_id); header.content_type = "application/json"; const req_buffer = try allocator.alloc( u8, meta.getEncodeSize() + header.getEncodeSize() + body.getEncodeSize(), ); defer allocator.free(req_buffer); var req_fbs = std.io.fixedBufferStream(req_buffer); try moo.encode(req_fbs.writer(), meta, header, body); try self.conn.ws.writeBin(req_fbs.getWritten()); } /// Returns incoming updates to the subject. If the subscription is unsubscribed, /// this function returns null. /// /// Calling this function before `.activate()` blocks the thread indefinetely. /// /// Caller have to release the returned memory using `Connection.allocator`. pub fn next(self: *@This(), allocator: std.mem.Allocator) std.mem.Allocator.Error!?[]const u8 { var entry = self.conn.responses.get(self.request_id) orelse { return null; }; entry.wrote.wait(); if (entry.canceled) { return null; } const data = entry.data orelse { return std.mem.Allocator.Error.OutOfMemory; }; defer self.conn.allocator.free(data); { self.conn.responses_mutex.lock(); defer self.conn.responses_mutex.unlock(); entry.canceled = false; entry.data = null; entry.wrote.reset(); } return try allocator.dupe(u8, data); } }; }; fn readLoop(conn: *Connection, on_request: RequestHandler) void { { conn.state_mutex.lock(); defer conn.state_mutex.unlock(); conn.state = .connected; } while (true) { const msg = (conn.ws.read() catch return) orelse unreachable; defer conn.ws.done(msg);
-
@@ -215,10 +437,13 @@ continue;}; { conn.responses_mutex.lock(); defer conn.responses_mutex.unlock(); const response = response: { conn.responses_mutex.lock(); defer conn.responses_mutex.unlock(); break :response conn.responses.get(header.request_id); }; if (conn.responses.get(header.request_id)) |store| { if (response) |store| { if (store.wrote.isSet()) { std.log.warn( "Received more than one message having same Request-Id({d})",
-
@@ -266,6 +491,7 @@ },.ping => conn.ws.writePong(msg.data) catch {}, .pong => {}, .close => { // TODO: Release Connection or notify to upstream conn.ws.close(.{}) catch return; return; },
-
-
-
@@ -98,4 +98,124 @@ .ignore_unknown_fields = true,.allocate = .alloc_always, }); } const SubscribeZoneChangesSubscribedResponse = struct { zones: []const Zone = &.{}, }; const SubscribeZoneChangesChangedResponse = struct { zones_removed: []const []const u8 = &.{}, zones_added: []const Zone = &.{}, zones_changed: []const Zone = &.{}, }; pub const ZoneChangeSubscription = struct { subscription: Connection.Subscription, const Data = struct { allocator: std.mem.Allocator, added_zones: []const Zone, changed_zones: []const Zone, removed_ids: []const []const u8, original_message: union(enum) { subscribed: moo.JsonBody(SubscribeZoneChangesSubscribedResponse), changed: moo.JsonBody(SubscribeZoneChangesChangedResponse), }, message_bytes: []const u8, pub fn deinit(self: *const Data) void { switch (self.original_message) { .subscribed => |msg| msg.deinit(), .changed => |msg| msg.deinit(), } self.allocator.free(self.message_bytes); } }; const Iterator = struct { parent: *ZoneChangeSubscription, pub const Error = error{ UnknownService, UnsubscribedByServer, }; /// Returns the next zone change event. This function blocks until next event. /// Caller must call `.deinit()` after using the returned struct. pub fn next(self: *Iterator, allocator: std.mem.Allocator) !?Data { const message = try self.parent.subscription.next(allocator) orelse return null; errdefer allocator.free(message); const meta, const header_ctx = try moo.Metadata.parse(message); if (std.mem.eql(u8, meta.service, "Unsubscribed")) { return Error.UnsubscribedByServer; } if (std.mem.eql(u8, meta.service, "Subscribed")) { _, const body_ctx = try moo.WellKnownHeaders.parse(message, header_ctx); const body = try moo.JsonBody(SubscribeZoneChangesSubscribedResponse).parse( allocator, message, body_ctx, .{ .ignore_unknown_fields = true, }, ); return Data{ .allocator = allocator, .added_zones = body.value.zones, .changed_zones = &.{}, .removed_ids = &.{}, .original_message = .{ .subscribed = body, }, .message_bytes = message, }; } if (std.mem.eql(u8, meta.service, "Changed")) { _, const body_ctx = try moo.WellKnownHeaders.parse(message, header_ctx); const body = try moo.JsonBody(SubscribeZoneChangesChangedResponse).parse( allocator, message, body_ctx, .{ .ignore_unknown_fields = true, }, ); return Data{ .allocator = allocator, .added_zones = body.value.zones_added, .changed_zones = body.value.zones_changed, .removed_ids = body.value.zones_removed, .original_message = .{ .changed = body, }, .message_bytes = message, }; } return Error.UnknownService; } }; pub fn init(conn: *Connection) ZoneChangeSubscription { return .{ .subscription = conn.subscribe(id, "zones"), }; } /// Call to this function after the call to `Connection.deinit()` is useless and invalid. pub fn deinit(self: *ZoneChangeSubscription) void { self.subscription.deinit(); } pub fn iter(self: *ZoneChangeSubscription, allocator: std.mem.Allocator) !Iterator { try self.subscription.activate(allocator); return .{ .parent = self }; } }; };
-