Ordered execution with system sets

This commit is contained in:
duck
2025-05-08 19:17:58 +05:00
parent 32ca78e5c7
commit d239e775f3
3 changed files with 260 additions and 83 deletions

View File

@@ -6,6 +6,8 @@ const System = @import("graph/system.zig");
// TODO: // TODO:
// - Use arena allocator? // - Use arena allocator?
// - Resolve missing resource problem // - Resolve missing resource problem
// - Parse system sets into a properly defined data structure instead of relying on `@typeInfo`
// - Find a better way to represent system sets
pub const Controller = @import("graph/controller.zig"); pub const Controller = @import("graph/controller.zig");
@@ -50,7 +52,7 @@ pub fn init(alloc: std.mem.Allocator) !Self {
for (0..DEFAULT_CONTROLLERS) |i| { for (0..DEFAULT_CONTROLLERS) |i| {
var controller = try Controller.create(alloc); var controller = try Controller.create(alloc);
controller.setDuds(@intCast(DEFAULT_DUDS_PER_CONTROLLER * i), duds.items[DEFAULT_DUDS_PER_CONTROLLER * i .. DEFAULT_DUDS_PER_CONTROLLER * (i + 1)]); controller.setDuds(@intCast(DEFAULT_DUDS_PER_CONTROLLER * i), @intCast(DEFAULT_DUDS_PER_CONTROLLER * (i + 1)));
controllers.appendAssumeCapacity(controller); controllers.appendAssumeCapacity(controller);
} }
@@ -95,12 +97,13 @@ fn runAllSystems(self: *Self) GraphError!void {
while (true) { while (true) {
const system = &self.system_queue.items[self.system_queue.items.len - 1]; const system = &self.system_queue.items[self.system_queue.items.len - 1];
if (system.requires_dud) |dud_id| { if (system.requires_dud) |dud_id| {
if (self.duds.items[dud_id].required_count == 0) { if (self.duds.items[dud_id].required_count == 0) {
break; break;
} }
} else break; } else break;
if (swap_with > 1) { if (swap_with > 0) {
swap_with -= 1; swap_with -= 1;
std.mem.swap( std.mem.swap(
System, System,
@@ -137,6 +140,7 @@ fn runSystem(self: *Self, system: System) GraphError!void {
}, },
.controller => { .controller => {
controller = try self.getController(); controller = try self.getController();
controller.?.submit_dud = system.submit_dud;
buffer[buffer_len] = @ptrCast(&controller.?); buffer[buffer_len] = @ptrCast(&controller.?);
}, },
} }
@@ -157,7 +161,10 @@ fn applyCommands(self: *Self, commands: []const Controller.Command) !void {
for (commands) |command| { for (commands) |command| {
switch (command) { switch (command) {
.add_resource => |r| try self.addResource(r), .add_resource => |r| try self.addResource(r),
.queue_system => |s| try self.enqueueSystem(s), .queue_system => |s| {
if (s.submit_dud) |submit_id| self.duds.items[submit_id].required_count += 1;
try self.enqueueSystem(s);
},
} }
} }
} }
@@ -173,7 +180,7 @@ fn getController(self: *Self) !Controller {
errdefer self.duds.shrinkRetainingCapacity(self.duds.items.len - DEFAULT_DUDS_PER_CONTROLLER); errdefer self.duds.shrinkRetainingCapacity(self.duds.items.len - DEFAULT_DUDS_PER_CONTROLLER);
var controller = try Controller.create(self.alloc); var controller = try Controller.create(self.alloc);
controller.setDuds(@intCast(next_dud_id), self.duds.items[next_dud_id .. next_dud_id + DEFAULT_DUDS_PER_CONTROLLER]); controller.setDuds(@intCast(next_dud_id), @intCast(next_dud_id + DEFAULT_DUDS_PER_CONTROLLER));
return controller; return controller;
} }
@@ -215,7 +222,7 @@ const GraphError = error{
SystemDeadlock, SystemDeadlock,
}; };
test { test "simple graph smoke test" {
const Graph = @This(); const Graph = @This();
const TestResource = struct { const TestResource = struct {
number: u32, number: u32,
@@ -233,17 +240,21 @@ test {
rsc.number -= 1000; rsc.number -= 1000;
} }
fn addEleven(cmd: *Controller) void { fn addEleven(cmd: *Controller) void {
cmd.queueSystem(addTen); cmd.queue(addTen);
cmd.queueSystem(addOne); cmd.queue(addOne);
cmd.queueOrdered(.{ cmd.queue(.{
.first = .{
addThousand, addThousand,
addThousand, addThousand,
addThousand, addThousand,
}, .{ },
.second = .{
subThousand, subThousand,
subThousand, subThousand,
subThousand, subThousand,
},
.ordered = true,
}); });
} }
}; };
@@ -254,12 +265,12 @@ test {
var controller = try graph.getController(); var controller = try graph.getController();
controller.addResource(TestResource{ .number = 100 }); controller.addResource(TestResource{ .number = 100 });
controller.queueSystem(TestResource.addOne); controller.queue(TestResource.addOne);
controller.queueSystem(TestResource.addOne); controller.queue(TestResource.addOne);
controller.queueSystem(TestResource.addTen); controller.queue(TestResource.addTen);
controller.queueSystem(TestResource.addEleven); controller.queue(TestResource.addEleven);
try graph.freeController(controller); try graph.freeController(controller);
@@ -268,3 +279,73 @@ test {
const result = graph.getResource(TestResource); const result = graph.getResource(TestResource);
try std.testing.expectEqual(result.?.number, 123); try std.testing.expectEqual(result.?.number, 123);
} }
test "complex queue graph smoke test" {
const Graph = @This();
const TestResource = struct {
const Rsc = @This();
data1: isize,
data2: isize,
fn queueManySystems(cmd: *Controller) void {
cmd.queue(.{
.@"0" = .{
addTen,
addTen,
addTen,
addTen,
subTwenty,
},
// `data1` = 20
// `data2` = 5
.@"1" = .{
mulTen,
mulTen,
mulTwo,
mulTwo,
},
// `data1` = 8000
// `data2` = 9
.@"2" = .{
subTwenty,
},
.ordered = true,
// `data1` = 7980
// `data2` = 10
});
}
fn addTen(rsc: *Rsc) void {
rsc.data1 += 10;
rsc.data2 += 1;
}
fn subTwenty(rsc: *Rsc) void {
rsc.data1 -= 20;
rsc.data2 += 1;
}
fn mulTen(rsc: *Rsc) void {
rsc.data1 *= 10;
rsc.data2 += 1;
}
fn mulTwo(rsc: *Rsc) void {
rsc.data1 *= 2;
rsc.data2 += 1;
}
};
var graph = try Graph.init(std.testing.allocator);
defer graph.deinit();
var controller = try graph.getController();
controller.addResource(TestResource{ .data1 = 0, .data2 = 0 });
controller.queue(TestResource.queueManySystems);
try graph.freeController(controller);
try graph.runAllSystems();
const result = graph.getResource(TestResource).?;
try std.testing.expectEqual(7980, result.data1);
try std.testing.expectEqual(10, result.data2);
}

View File

@@ -9,7 +9,6 @@ const DEFAULT_CONTROLLER_CAPACITY = 8;
alloc: std.mem.Allocator, alloc: std.mem.Allocator,
command_buffer: std.ArrayListUnmanaged(Command), command_buffer: std.ArrayListUnmanaged(Command),
error_state: ErrorState, error_state: ErrorState,
duds: [*]System.Dud,
dud_range: struct { System.Dud.Id, System.Dud.Id }, dud_range: struct { System.Dud.Id, System.Dud.Id },
submit_dud: ?System.Dud.Id, submit_dud: ?System.Dud.Id,
@@ -28,7 +27,6 @@ pub fn create(alloc: std.mem.Allocator) !Controller {
.alloc = alloc, .alloc = alloc,
.command_buffer = try std.ArrayListUnmanaged(Command).initCapacity(alloc, DEFAULT_CONTROLLER_CAPACITY), .command_buffer = try std.ArrayListUnmanaged(Command).initCapacity(alloc, DEFAULT_CONTROLLER_CAPACITY),
.error_state = .ok, .error_state = .ok,
.duds = &[0]System.Dud{},
.dud_range = .{ 0, 0 }, .dud_range = .{ 0, 0 },
.submit_dud = null, .submit_dud = null,
}; };
@@ -40,9 +38,15 @@ pub fn commands(self: *Controller) []const Command {
return self.command_buffer.items; return self.command_buffer.items;
} }
pub fn setDuds(self: *Controller, start_id: System.Dud.Id, buffer: []System.Dud) void { pub fn setDuds(self: *Controller, start_id: System.Dud.Id, end_id: System.Dud.Id) void {
self.dud_range = .{ start_id, start_id + @as(System.Dud.Id, @intCast(buffer.len)) }; self.dud_range = .{ start_id, end_id };
self.duds = @ptrCast(buffer); }
fn acquireDud(self: *Controller) ?System.Dud.Id {
if (self.dud_range[0] == self.dud_range[1]) return null;
defer self.dud_range[0] += 1;
return self.dud_range[0];
} }
/// Clears the command buffer for the next use (does not deallocate it's contents) /// Clears the command buffer for the next use (does not deallocate it's contents)
@@ -68,70 +72,93 @@ pub inline fn addResource(self: *Controller, resource: anytype) void {
) catch |err| self.fail(err); ) catch |err| self.fail(err);
} }
pub fn queueSystem(self: *Controller, comptime function: anytype) void { /// Queues a multitude of functions to be executed either in parallel or in ordered manner
utils.validateSystem(function); /// `system_set` can be either a `System`-like function or a tuple which may contain other system sets
///
/// Optional tuple fields that control the execution behavior of functions:
///
/// `ordered` - ensures that all systems specified in the tuple are executed in provided order
pub fn queue(self: *Controller, comptime system_set: anytype) void {
utils.validateSystemSet(system_set);
self.queueSystemInternal(function) catch |err| self.fail(err); self.queueInternal(system_set) catch |err| self.fail(err);
} }
/// Function sets are expected to be tuples of system functions fn queueInternal(self: *Controller, comptime system_set: anytype) !void {
pub fn queueOrdered( const prev_count = self.command_buffer.items.len;
self: *Controller,
comptime function_set_first: anytype, const command_buffer = try self.command_buffer.addManyAsSlice(self.alloc, utils.countSystems(system_set));
comptime function_set_second: anytype, errdefer self.command_buffer.shrinkRetainingCapacity(prev_count);
) void {
self.queueOrderedInternal(function_set_first, function_set_second) catch |err| self.fail(err); const commands_created = try self.createQueueCommands(system_set, command_buffer, null, self.submit_dud);
std.debug.assert(commands_created == command_buffer.len);
} }
pub fn queueOrderedInternal( fn createQueueCommands(
self: *Controller, self: *Controller,
comptime function_set_first: anytype, comptime system_set: anytype,
comptime function_set_second: anytype, command_buffer: []Command,
) !void { requires_dud: ?System.Dud.Id,
if (self.dud_range[0] == self.dud_range[1]) { submit_dud: ?System.Dud.Id,
// TODO: Make `Controller` request more ids ) !usize {
self.error_state = .unrecoverable; switch (@typeInfo(@TypeOf(system_set))) {
return; .@"fn" => {
} var system = try System.fromFunction(system_set, self.alloc);
const commands_first = @typeInfo(@TypeOf(function_set_first)).@"struct".fields.len; system.requires_dud = requires_dud;
const commands_second = @typeInfo(@TypeOf(function_set_second)).@"struct".fields.len; system.submit_dud = submit_dud;
var new_commands: [commands_first + commands_second]Command = undefined; command_buffer[0] = .{ .queue_system = system };
var i: usize = 0; return 1;
},
.@"struct" => {
const ordered = utils.getOptionalTupleField(system_set, "ordered", false);
var queued_total: usize = 0;
var prev_dud = requires_dud;
var next_dud = submit_dud;
errdefer for (0..i) |del_i| { errdefer for (command_buffer[0..queued_total]) |command| {
new_commands[del_i].queue_system.deinit(self.alloc); command.queue_system.deinit(self.alloc);
}; };
std.debug.assert(self.duds[0].required_count == 0); if (ordered) {
self.duds[0].required_count = commands_first; next_dud = requires_dud;
}
const dud_id = self.dud_range[0];
self.duds += 1; var queued_sets: usize = 0;
self.dud_range[0] += 1; var total_sets: usize = 0;
inline for (@typeInfo(@TypeOf(system_set)).@"struct".fields) |field| {
inline for (function_set_first) |fn_first| { switch (@typeInfo(field.type)) {
var system = try System.fromFunction(fn_first, self.alloc); .@"fn", .@"struct" => total_sets += 1,
system.submit_dud = dud_id; else => {},
new_commands[i] = Command{ .queue_system = system };
i += 1;
} }
inline for (function_set_second) |fn_second| {
var system = try System.fromFunction(fn_second, self.alloc);
system.requires_dud = dud_id;
system.submit_dud = self.submit_dud;
new_commands[i] = Command{ .queue_system = system };
i += 1;
} }
std.debug.assert(i == new_commands.len);
try self.command_buffer.appendSlice(self.alloc, &new_commands); inline for (@typeInfo(@TypeOf(system_set)).@"struct".fields) |field| {
} if (ordered) {
prev_dud = next_dud;
fn queueSystemInternal(self: *Controller, comptime function: anytype) !void { if (queued_sets == total_sets - 1) {
var system = try System.fromFunction(function, self.alloc); next_dud = submit_dud;
errdefer system.deinit(self.alloc); } else {
// TODO: Soft fail
try self.command_buffer.append(self.alloc, .{ .queue_system = system }); next_dud = self.acquireDud().?;
}
}
switch (@typeInfo(field.type)) {
.@"fn", .@"struct" => {
queued_total += try self.createQueueCommands(
@field(system_set, field.name),
command_buffer[queued_total..],
prev_dud,
next_dud,
);
queued_sets += 1;
},
else => {},
}
}
return queued_total;
},
else => @compileError("System set must be either a single function or a tuple of other system sets"),
}
} }
/// `previous_output` is expected to be aligned accordingly /// `previous_output` is expected to be aligned accordingly

View File

@@ -43,7 +43,6 @@ pub fn validateSystem(comptime system: anytype) void {
switch (@typeInfo(param.type.?).pointer.child) { switch (@typeInfo(param.type.?).pointer.child) {
Controller => { Controller => {
controller_requests += 1; controller_requests += 1;
// _ = &controller_requests;
}, },
else => |t| validateResource(t), else => |t| validateResource(t),
} }
@@ -52,6 +51,33 @@ pub fn validateSystem(comptime system: anytype) void {
} }
} }
pub fn validateSystemSet(comptime system_set: anytype) void {
comptime {
@setEvalBranchQuota(1000);
switch (@typeInfo(@TypeOf(system_set))) {
.@"fn" => validateSystem(system_set),
.@"struct" => |struct_info| {
for (struct_info.fields) |field_info| {
switch (@typeInfo(field_info.type)) {
.@"fn", .@"struct" => validateSystemSet(@field(system_set, field_info.name)),
else => {
if (checkIsField(field_info, "ordered", bool)) continue;
@compileError("Invalid field \"" ++
field_info.name ++
"\" of type `" ++
@typeName(field_info.type) ++
"` in system set");
},
}
}
},
else => {
@compileError("System set must be either a function or a tuple (got `" ++ @typeName(@TypeOf(system_set)) ++ "`)");
},
}
}
}
pub fn generateRunner(comptime system: anytype) fn ([]const *anyopaque) void { pub fn generateRunner(comptime system: anytype) fn ([]const *anyopaque) void {
const RunnerImpl = struct { const RunnerImpl = struct {
fn runner(resources: []const *anyopaque) void { fn runner(resources: []const *anyopaque) void {
@@ -64,3 +90,46 @@ pub fn generateRunner(comptime system: anytype) fn ([]const *anyopaque) void {
}; };
return RunnerImpl.runner; return RunnerImpl.runner;
} }
pub fn checkIsField(field: std.builtin.Type.StructField, field_name: []const u8, comptime field_type: type) bool {
if (!std.mem.eql(u8, field.name, field_name)) return false;
if (field.type != field_type) return false;
return true;
}
pub fn getOptionalTupleField(tuple: anytype, comptime field_name: []const u8, comptime default: anytype) @TypeOf(default) {
return comptime blk: {
for (@typeInfo(@TypeOf(tuple)).@"struct".fields) |field| {
if (!std.mem.eql(u8, field.name, field_name)) continue;
if (@TypeOf(default) != field.type)
@compileError("Cannot get tuple field `" ++
field_name ++
"` with type `" ++
@typeName(@TypeOf(default)) ++
"` (tuple field has type `" ++
@typeName(field.type) ++
"`)");
break :blk @field(tuple, field.name);
}
break :blk default;
};
}
pub fn countSystems(comptime tuple: anytype) usize {
return comptime blk: {
var total: usize = 0;
switch (@typeInfo(@TypeOf(tuple))) {
.@"fn" => total += 1,
.@"struct" => |struct_info| {
for (struct_info.fields) |field| {
switch (@typeInfo(field.type)) {
.@"fn", .@"struct" => total += countSystems(@field(tuple, field.name)),
else => {},
}
}
},
else => {},
}
break :blk total;
};
}