Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
zig-cache/
demo
demo.o
.zig-cache/
zig-out/
29 changes: 29 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
const std = @import("std");

pub fn build(b: *std.Build) void {
const target = b.standardTargetOptions(.{});
const optimize = b.standardOptimizeOption(.{});

const mod = b.addModule("spsc_ring", .{
.root_source_file = b.path("src/spsc_ring.zig"),
.target = target,
});

const example_exe = b.addExecutable(.{
.name = "demo",
.root_module = b.createModule(.{
.root_source_file = b.path("src/demo.zig"),
.target = target,
.optimize = optimize,
.imports = &.{
.{ .name = "spsc_ring", .module = mod },
},
}),
});
b.installArtifact(example_exe);

const run_example = b.step("run", "Run the example");
const run_example_cmd = b.addRunArtifact(example_exe);
run_example.dependOn(&run_example_cmd.step);
if (b.args) |args| run_example_cmd.addArgs(args);
}
15 changes: 15 additions & 0 deletions build.zig.zon
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
.{
.name = .zig_spsc_ring,
.version = "0.0.0",
.fingerprint = 0x2991ff75c63dce16, // Changing this has security and trust implications.
.minimum_zig_version = "0.15.1",
.dependencies = .{},
.paths = .{
"build.zig",
"build.zig.zon",
"src",
// For example...
//"LICENSE",
//"README.md",
},
}
8 changes: 4 additions & 4 deletions demo.zig → src/demo.zig
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ fn producer(ring: *Ring(u64)) void {
}

fn consumer(ring: *Ring(u64)) !void {
const stdout_file = std.io.getStdOut().writer();
var bw = std.io.bufferedWriter(stdout_file);
const stdout = bw.writer();
var stdout_buffer: [1024]u8 = undefined;
var stdout_writer = std.fs.File.stdout().writer(&stdout_buffer);
const stdout = &stdout_writer.interface;

while (true) {
if (ring.dequeue()) |answer| {
try stdout.print("answer = {}\n", .{answer});
try bw.flush();
try stdout.flush();
return;
}
}
Expand Down
32 changes: 14 additions & 18 deletions spsc_ring.zig → src/spsc_ring.zig
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
const std = @import("std");

const PaddedConsumer = struct {
head: std.atomic.Atomic(usize),
padding: [std.atomic.cache_line - @sizeOf(std.atomic.Atomic(usize))]u8 = undefined,
head: std.atomic.Value(usize),
padding: [std.atomic.cache_line - @sizeOf(std.atomic.Value(usize))]u8 = undefined,
};

const PaddedProducer = struct {
tail: std.atomic.Atomic(usize),
padding: [std.atomic.cache_line - @sizeOf(std.atomic.Atomic(usize))]u8 = undefined,
tail: std.atomic.Value(usize),
padding: [std.atomic.cache_line - @sizeOf(std.atomic.Value(usize))]u8 = undefined,
};

pub fn Ring(comptime T: type) type {
Expand All @@ -45,49 +45,45 @@ pub fn Ring(comptime T: type) type {
std.debug.assert(std.math.isPowerOfTwo(items.len));

return Self{
.consumer = PaddedConsumer{ .head = std.atomic.Atomic(usize).init(0) },
.producer = PaddedProducer{ .tail = std.atomic.Atomic(usize).init(0) },
.consumer = PaddedConsumer{ .head = std.atomic.Value(usize).init(0) },
.producer = PaddedProducer{ .tail = std.atomic.Value(usize).init(0) },
.items = items,
.mask = items.len - 1,
};
}

pub inline fn enqueue(self: *Self, value: T) bool {
const consumer = self.consumer.head.load(std.atomic.Ordering.Acquire);
const producer = self.producer.tail.load(std.atomic.Ordering.Acquire);
const consumer = self.consumer.head.load(.acquire);
const producer = self.producer.tail.load(.acquire);
const delta = producer + 1;

if (delta & self.mask == consumer & self.mask)
return false;

self.items[producer & self.mask] = value;

std.atomic.fence(std.atomic.Ordering.Release);
self.producer.tail.store(delta, std.atomic.Ordering.Release);
self.producer.tail.store(delta, .release);

return true;
}

pub inline fn dequeue(self: *Self) ?T {
const consumer = self.consumer.head.load(std.atomic.Ordering.Acquire);
const producer = self.producer.tail.load(std.atomic.Ordering.Acquire);
const consumer = self.consumer.head.load(.acquire);
const producer = self.producer.tail.load(.acquire);

if (consumer == producer)
return null;

std.atomic.fence(std.atomic.Ordering.Acquire);

const value = self.items[consumer & self.mask];

std.atomic.fence(std.atomic.Ordering.Release);
self.consumer.head.store(consumer + 1, std.atomic.Ordering.Release);
self.consumer.head.store(consumer + 1, .release);

return value;
}

pub inline fn length(self: *Self) usize {
const consumer = self.consumer.head.load(std.atomic.Ordering.Acquire);
const producer = self.producer.tail.load(std.atomic.Ordering.Acquire);
const consumer = self.consumer.head.load(.acquire);
const producer = self.producer.tail.load(.acquire);
return (producer - consumer) & self.mask;
}
};
Expand Down