Skip to content

Instantly share code, notes, and snippets.

@karlbohlmark
Created March 2, 2024 21:16
Show Gist options
  • Save karlbohlmark/0fac5f74ee9de47026a079682946c0ca to your computer and use it in GitHub Desktop.
Save karlbohlmark/0fac5f74ee9de47026a079682946c0ca to your computer and use it in GitHub Desktop.
Zig io_uring multishot recv
const std = @import("std");
const os = std.os;
const socket_t = std.os.socket_t;
const io_uring = std.os.linux.IO_Uring;
const Allocator = std.mem.Allocator;
const fs = std.fs;
pub fn main() !void {
const MAX_BUFFERS = 10;
const BUFFER_SIZE = 2048;
const GROUP_ID = 1;
const INITIAL_BUFFER_ID = 0;
var ring = io_uring.init(256, 0) catch |err| {
std.debug.print("Failed to initialize io_uring: {s}\n", .{@errorName(err)});
return;
};
// Init an allocator for allocating buffers for io_uring receive operations
var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
const allocator = arena.allocator();
const buffer_mem = try allocator.alloc(u8, BUFFER_SIZE * MAX_BUFFERS);
const UserData = enum(u64) {
ProvideBuffers = 1,
Recv = 2,
// Add more as needed
};
const provide_buffers_userdata: u64 = @intFromEnum(UserData.ProvideBuffers);
const buffer_sqe = try ring.provide_buffers(provide_buffers_userdata, buffer_mem.ptr, BUFFER_SIZE, MAX_BUFFERS, GROUP_ID, INITIAL_BUFFER_ID);
_ = buffer_sqe;
std.debug.print("Registered buffer group {}\n", .{GROUP_ID});
const submitted_buffers = try ring.submit_and_wait(1);
std.debug.print("submitted_buffers {} \n", .{submitted_buffers});
const buffer_cqe = try ring.copy_cqe();
if (buffer_cqe.res < 0) {
std.debug.print("buffer_cqe.res = {}\n", .{buffer_cqe.res});
std.process.exit(1);
}
// Defer the deinit of the io_uring instance
defer io_uring.deinit(&ring);
const act = std.os.Sigaction{
.handler = .{ .sigaction = @ptrCast(@alignCast(std.os.SIG.DFL)) },
.mask = std.os.empty_sigset,
.flags = 0,
};
std.os.sigaction(@intCast(std.os.SIG.INT), &act, null) catch {};
const socket = try bindUdpSocket(12345);
// Prepare the multishot recv SQE
const recv_sqe = try ring.recv(@intFromEnum(UserData.Recv), @intCast(socket), .{ .buffer_selection = .{ .group_id = GROUP_ID, .len = BUFFER_SIZE } }, 0);
// _ = recv_sqe;
recv_sqe.ioprio |= std.os.linux.IORING_RECV_MULTISHOT;
while (true) {
// Wait for at least one completion
const num_submitted = try ring.submit_and_wait(1);
std.debug.print("submitted: {}\n", .{num_submitted});
while (ring.cq_ready() > 0) {
var cqe = try ring.copy_cqe();
switch (cqe.user_data) {
@intFromEnum(UserData.ProvideBuffers) => {
std.debug.print("Received ProvideBuffers CQE\n", .{});
// We don't need to do anything here, the buffers are already provided
},
@intFromEnum(UserData.Recv) => {
std.debug.print("Received Recv CQE - flags: {}\n", .{cqe.flags});
if (cqe.res < 0) {
std.debug.print("Error receiving packet: {}\n", .{cqe.err()});
} else {
const bytesRead = cqe.res;
std.debug.print("Received {} bytes. error: {}\n", .{ bytesRead, cqe.err() });
}
// This line below is what I'm trying to do without
// _ = try ring.recv(@intFromEnum(UserData.Recv), @intCast(socket), .{ .buffer_selection = .{ .group_id = GROUP_ID, .len = BUFFER_SIZE } }, 0);
},
else => {
std.debug.print("Unknown user_data: {} res {}. error {}\n", .{ cqe.user_data, cqe.res, cqe.err() });
},
}
// _ = nanosleep(100000000);
}
}
}
pub fn bindUdpSocket(port: u16) !usize {
const socket = std.os.linux.socket(std.os.linux.AF.INET, std.os.SOCK.DGRAM, 0);
const address = std.os.linux.sockaddr.in{
.family = std.os.linux.AF.INET,
.port = std.mem.nativeToBig(u16, port),
.addr = 0, // Bind to any address
};
const ret = std.os.linux.bind(@intCast(socket), @ptrCast(&address), @sizeOf(@TypeOf(address)));
std.debug.print("UDP Socket created and bound to port 12345. Ret: {}\n", .{ret});
return socket;
}
pub fn nanosleep(ns: isize) usize {
if (ns < 0) {
return 0;
}
if (ns > 999_999_999) {
return std.os.linux.nanosleep(@ptrCast(&std.os.linux.timespec{ .tv_sec = @divFloor(ns, 1_000_000_000), .tv_nsec = @rem(ns, 1_000_000_000) }), null);
}
const sleep = std.os.linux.timespec{
.tv_sec = 0,
.tv_nsec = ns,
};
return std.os.linux.nanosleep(@ptrCast(&sleep), null);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment