Changes
4 changed files (+86/-62)
-
core/src/Arc.zig (new)
-
@@ -0,0 +1,39 @@// Copyright 2025 Shota FUJI // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // SPDX-License-Identifier: Apache-2.0 //! Atomic Reference Counting based on example code on `std.atomic.Value`. //! https://ziglang.org/documentation/0.14.0/std/#std.atomic.Value const std = @import("std"); count: std.atomic.Value(usize) = std.atomic.Value(usize).init(0), const Arc = @This(); /// Increment reference count by one. pub fn ref(arc: *Arc) void { _ = arc.count.fetchAdd(1, .monotonic); } /// Decrement reference count by one, returns `true` if no reference is alive. pub fn unref(arc: *Arc) bool { if (arc.count.fetchSub(1, .release) == 1) { _ = arc.count.load(.acquire); return true; } else { return false; } }
-
-
-
@@ -19,6 +19,7 @@const moo = @import("moo"); const websocket = @import("websocket"); const Arc = @import("./Arc.zig"); const discovery = @import("./discovery.zig"); const extension = @import("./extension.zig").extension; const ping = @import("./services/ping.zig");
-
@@ -43,7 +44,7 @@ internal: *Internal,code: ConnectionError, const Internal = struct { ref_count: i64 = 1, arc: Arc = .{}, }; pub fn make(code: ConnectionError) std.mem.Allocator.Error!*ConnectionErrorEvent {
-
@@ -66,7 +67,7 @@ pub fn retain(ptr: ?*ConnectionErrorEvent) callconv(.C) *ConnectionErrorEvent {var self = ptr orelse @panic( std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); self.internal.ref_count += 1; self.internal.arc.ref(); return self; }
-
@@ -74,14 +75,11 @@ pub fn release(ptr: ?*ConnectionErrorEvent) callconv(.C) void {var self = ptr orelse @panic( std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); self.internal.ref_count -= 1; if (self.internal.ref_count == 0) { if (self.internal.arc.unref()) { std.log.debug("Releasing {*}...", .{self}); allocator.destroy(self.internal); allocator.destroy(self); } else if (self.internal.ref_count < 0) { std.log.warn("Over deref detected {*}, count={d}", .{ self, self.internal.ref_count }); } }
-
@@ -100,7 +98,7 @@ token: [*:0]const u8,pub const Internal = struct { token: [:0]const u8, ref_count: i64 = 1, arc: Arc = .{}, }; /// This function takes ownership of `token`.
-
@@ -127,7 +125,7 @@ pub fn retain(ptr: ?*ConnectedEvent) callconv(.C) *ConnectedEvent {var self = ptr orelse @panic( std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); self.internal.ref_count += 1; self.internal.arc.ref(); return self; }
-
@@ -135,15 +133,12 @@ pub fn release(ptr: ?*ConnectedEvent) callconv(.C) void {var self = ptr orelse @panic( std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); self.internal.ref_count -= 1; if (self.internal.ref_count == 0) { if (self.internal.arc.unref()) { std.log.debug("Releasing {*}...", .{self}); allocator.free(self.internal.token); allocator.destroy(self.internal); allocator.destroy(self); } else if (self.internal.ref_count < 0) { std.log.warn("Over deref detected {*}, count={d}", .{ self, self.internal.ref_count }); } }
-
@@ -175,7 +170,7 @@ };payload: Payload, ref_count: i64 = 1, arc: Arc = .{}, }; pub fn makeConnectionError(err: anyerror) std.mem.Allocator.Error!*Event {
-
@@ -282,7 +277,7 @@ pub fn retain(ptr: ?*Event) callconv(.C) *Event {var self = ptr orelse @panic( std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); self.internal.ref_count += 1; self.internal.arc.ref(); return self; }
-
@@ -290,8 +285,7 @@ pub fn release(ptr: ?*Event) callconv(.C) void {var self = ptr orelse @panic( std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); self.internal.ref_count -= 1; if (self.internal.ref_count == 0) { if (self.internal.arc.unref()) { std.log.debug("Releasing {*}...", .{self}); switch (self.internal.payload) {
-
@@ -302,8 +296,6 @@ }allocator.destroy(self.internal); allocator.destroy(self); } else if (self.internal.ref_count < 0) { std.log.warn("Over deref detected {*}, count={d}", .{ self, self.internal.ref_count }); } }
-
@@ -317,7 +309,7 @@ std.fmt.comptimePrint("Received null pointer on {s}", .{fn_name}),); if (self.internal.payload == kind) { return @field(self.internal.payload, @tagName(kind)); return @field(self.internal.payload, @tagName(kind)).retain(); } std.log.err("{s} called on {s}", .{ fn_name, @tagName(self.internal.payload) });
-
@@ -351,7 +343,7 @@ request_id: i64 = 0,subscription_id: u64 = 0, host: []const u8, zone_subscription_request_id: ?i64 = null, ref_count: i64 = 1, arc: Arc = .{}, fn init(server: *discovery.Server) !Internal { var addr = std.ArrayList(u8).init(allocator);
-
@@ -414,7 +406,7 @@ pub fn retain(ptr: ?*Connection) callconv(.C) *Connection {var self = ptr orelse @panic( std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); self.internal.ref_count += 1; self.internal.arc.ref(); return self; }
-
@@ -422,15 +414,12 @@ pub fn release(ptr: ?*Connection) callconv(.C) void {var self = ptr orelse @panic( std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); self.internal.ref_count -= 1; if (self.internal.ref_count == 0) { if (self.internal.arc.unref()) { std.log.debug("Releasing {*}...", .{self}); self.internal.deinit(); allocator.destroy(self.internal); allocator.destroy(self); } else if (self.internal.ref_count < 0) { std.log.warn("Over deref detected {*}, count={d}", .{ self, self.internal.ref_count }); } }
-
@@ -443,34 +432,37 @@ var ws = self.internal.ws orelse {// TODO: Pass saved token const new_token = self.connect(null) catch |err| { std.log.err("Unable to connect: {s}", .{@errorName(err)}); return Event.makeConnectionError(err) catch |make_err| { const event = Event.makeConnectionError(err) catch |make_err| { std.log.err("Unable to compose {s} event: {s}", .{ @tagName(Event.Kind.connection_error), @errorName(make_err), }); return null; }; return event.retain(); }; return Event.makeConnected(new_token) catch |err| { const event = Event.makeConnected(new_token) catch |err| { std.log.err("Unable to compose {s} event: {s}", .{ @tagName(Event.Kind.connected), @errorName(err), }); return null; }; return event.retain(); }; while (true) { const meta, const header_ctx, const msg = readMessage(&ws) catch |err| { std.log.err("Failed to read a message: {s}", .{@errorName(err)}); return Event.makeConnectionError(err) catch |make_err| { const event = Event.makeConnectionError(err) catch |make_err| { std.log.err("Unable to compose {s} event: {s}", .{ @tagName(Event.Kind.connection_error), @errorName(make_err), }); return null; }; return event.retain(); }; defer ws.done(msg);
-
@@ -495,13 +487,14 @@ continue;}; defer res.deinit(); return Event.makeZoneListFromInitial(res.value) catch |make_err| { const event = Event.makeZoneListFromInitial(res.value) catch |make_err| { std.log.err("Unable to compose {s} event: {s}", .{ @tagName(Event.Kind.zone_list), @errorName(make_err), }); return null; }; return event.retain(); } if (std.mem.eql(u8, meta.service, "Changed")) {
-
@@ -518,13 +511,14 @@ continue;}; defer res.deinit(); return Event.makeZoneListFromChanges(res.value) catch |err| { const event = Event.makeZoneListFromChanges(res.value) catch |err| { std.log.err("Unable to compose {s} event: {s}", .{ @tagName(Event.Kind.zone_list), @errorName(err), }); return null; }; return event.retain(); } if (std.mem.eql(u8, meta.service, "Unsubscribed")) {
-
-
-
@@ -18,6 +18,8 @@ const std = @import("std");const sood = @import("sood"); const Arc = @import("./Arc.zig"); const udp_dst = std.net.Address.initIp4( sood.discovery.multicast_ipv4_address, sood.discovery.udp_port,
-
@@ -41,8 +43,7 @@ id: [:0]const u8,name: [:0]const u8, version: [:0]const u8, address: std.net.Address, ref_count: i64 = 1, arc: Arc = .{}, }; pub fn make(resp: *const sood.discovery.Response, addr: *const std.net.Address) !*Server {
-
@@ -87,7 +88,7 @@ var self = ptr orelse @panic(std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }, )); self.internal.ref_count += 1; self.internal.arc.ref(); return self; }
-
@@ -97,9 +98,7 @@ "Received null pointer on {s}_{s}",.{ cname, @src().fn_name }, )); self.internal.ref_count -= 1; if (self.internal.ref_count == 0) { if (self.internal.arc.unref()) { std.log.debug("Releasing {*}...", .{self}); allocator.free(self.internal.id);
-
@@ -107,8 +106,6 @@ allocator.free(self.internal.name);allocator.free(self.internal.version); allocator.destroy(self.internal); allocator.destroy(self); } else if (self.internal.ref_count < 0) { std.log.warn("Over deref detected {*}, count={d}", .{ self, self.internal.ref_count }); } }
-
@@ -140,7 +137,7 @@pub const Internal = struct { servers: []*Server = &.{}, ref_count: i64 = 1, arc: Arc = .{}, }; pub fn make() !*ScanResult {
-
@@ -170,7 +167,7 @@ var i: usize = 0;var iter = input.valueIterator(); while (iter.next()) |server| { std.log.debug("Found server ({s})", .{server.*.name}); servers[i] = server.*; servers[i] = server.*.retain(); i += 1; }
-
@@ -184,7 +181,7 @@ var self = ptr orelse @panic(std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }, )); self.internal.ref_count += 1; self.internal.arc.ref(); return self; }
-
@@ -194,9 +191,7 @@ "Received null pointer on {s}_{s}",.{ cname, @src().fn_name }, )); self.internal.ref_count -= 1; if (self.internal.ref_count == 0) { if (self.internal.arc.unref()) { std.log.debug("Releasing {*}...", .{self}); for (self.internal.servers) |server| { server.release();
-
@@ -204,8 +199,6 @@ }allocator.free(self.internal.servers); allocator.destroy(self.internal); allocator.destroy(self); } else if (self.internal.ref_count < 0) { std.log.warn("Over deref detected {*}, count={d}", .{ self, self.internal.ref_count }); } }
-
@@ -232,7 +225,7 @@ ScanError.SocketError => .socket_error,}; }; return result; return result.retain(); } const ScanError = error{
-
-
-
@@ -16,6 +16,7 @@ // SPDX-License-Identifier: Apache-2.0const std = @import("std"); const Arc = @import("./Arc.zig"); const TransportService = @import("./services/transport.zig").TransportService; pub const PlaybackState = enum(c_int) {
-
@@ -38,7 +39,7 @@ pub const Internal = struct {id: [:0]const u8, name: [:0]const u8, ref_count: i64 = 1, arc: Arc = .{}, }; pub fn make(src: *const TransportService.Zone) std.mem.Allocator.Error!*Zone {
-
@@ -78,7 +79,7 @@ pub fn retain(ptr: ?*Zone) callconv(.C) *Zone {var self = ptr orelse @panic( std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); self.internal.ref_count += 1; self.internal.arc.ref(); return self; }
-
@@ -86,16 +87,13 @@ pub fn release(ptr: ?*Zone) callconv(.C) void {var self = ptr orelse @panic( std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); self.internal.ref_count -= 1; if (self.internal.ref_count == 0) { if (self.internal.arc.unref()) { std.log.debug("Releasing {*}...", .{self}); allocator.free(self.internal.id); allocator.free(self.internal.name); allocator.destroy(self.internal); allocator.destroy(self); } else if (self.internal.ref_count < 0) { std.log.warn("Over deref detected {*}, count={d}", .{ self, self.internal.ref_count }); } }
-
@@ -122,7 +120,7 @@ added_zones: []const *Zone,changed_zones: []const *Zone, removed_zone_ids: []const [*:0]const u8, ref_count: i64 = 1, arc: Arc = .{}, }; pub fn makeFromChanges(event: *const TransportService.SubscribeZoneChanges.Response) std.mem.Allocator.Error!*ZoneListEvent {
-
@@ -136,7 +134,8 @@ added_zones[i].release();} } for (event.zones_added) |src| { added_zones[added_i] = try Zone.make(&src); const zone = try Zone.make(&src); added_zones[added_i] = zone.retain(); added_i += 1; }
-
@@ -150,7 +149,8 @@ changed_zones[i].release();} } for (event.zones_changed) |src| { changed_zones[changed_i] = try Zone.make(&src); const zone = try Zone.make(&src); changed_zones[changed_i] = zone.retain(); changed_i += 1; }
-
@@ -206,7 +206,8 @@ added_zones[i].release();} } for (event.zones) |src| { added_zones[added_i] = try Zone.make(&src); const zone = try Zone.make(&src); added_zones[added_i] = zone.retain(); added_i += 1; }
-
@@ -239,7 +240,7 @@ pub fn retain(ptr: ?*ZoneListEvent) callconv(.C) *ZoneListEvent {var self = ptr orelse @panic( std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); self.internal.ref_count += 1; self.internal.arc.ref(); return self; }
-
@@ -247,8 +248,7 @@ pub fn release(ptr: ?*ZoneListEvent) callconv(.C) void {var self = ptr orelse @panic( std.fmt.comptimePrint("Received null pointer on {s}_{s}", .{ cname, @src().fn_name }), ); self.internal.ref_count -= 1; if (self.internal.ref_count == 0) { if (self.internal.arc.unref()) { std.log.debug("Releasing {*}...", .{self}); for (self.internal.added_zones) |zone| {
-
@@ -268,8 +268,6 @@ allocator.free(self.internal.removed_zone_ids);allocator.destroy(self.internal); allocator.destroy(self); } else if (self.internal.ref_count < 0) { std.log.warn("Over deref detected {*}, count={d}", .{ self, self.internal.ref_count }); } }
-