From c2dc2cd945287d31c85b8e54e2efe1544f353aee Mon Sep 17 00:00:00 2001 From: Tobias Pankrath Date: Sun, 14 Mar 2021 15:53:39 +0100 Subject: [PATCH] feat: start work on an io_uring driver --- README.md | 29 +- dub.sdl | 3 +- examples/uring.d | 68 ++++ meson.build | 5 +- source/eventcore/driver.d | 1 + source/eventcore/drivers/posix/driver.d | 32 +- source/eventcore/drivers/posix/events.d | 5 +- .../eventcore/drivers/posix/io_uring/files.d | 259 +++++++++++++++ .../drivers/posix/io_uring/io_uring.d | 312 ++++++++++++++++++ source/eventcore/meson.build | 2 + 10 files changed, 691 insertions(+), 25 deletions(-) create mode 100644 examples/uring.d create mode 100644 source/eventcore/drivers/posix/io_uring/files.d create mode 100644 source/eventcore/drivers/posix/io_uring/io_uring.d diff --git a/README.md b/README.md index da25bc6f..224a3dc0 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ EpollEventDriver | yes | — | — | — | — | — WinAPIEventDriver | — | yes | — | — | — | — KqueueEventDriver | — | — | yes | yes¹ | — | — LibasyncEventDriver | —¹| —¹| —¹| —¹| — | — +UringEventDriver | —¹| no | no | no | unknown | no ¹ planned, but not currenly implemented @@ -48,20 +49,20 @@ The following compilers are tested and supported: Driver development status ------------------------- -Feature \ EventDriver | Select | Epoll | WinAPI | Kqueue | Libasync -----------------------|--------|-------|---------|---------|---------- -TCP Sockets | yes | yes | yes | yes | — -UDP Sockets | yes | yes | yes | yes | — -USDS | yes | yes | — | yes | — -DNS | yes | yes | yes | yes | — -Timers | yes | yes | yes | yes | — -Events | yes | yes | yes | yes | — -Unix Signals | yes² | yes | — | — | — -Files | yes | yes | yes | yes | — -UI Integration | yes¹ | yes¹ | yes | yes¹ | — -File watcher | yes² | yes | yes | yes² | — -Pipes | yes | yes | — | yes | — -Processes | yes | yes | — | yes | — +Feature \ EventDriver | Select | Epoll | WinAPI | Kqueue | Libasync | Uring +----------------------|--------|-------|---------|---------|----------|------- +TCP Sockets | yes | yes | yes | yes | — | — +UDP Sockets | yes | yes | yes | yes | — | — +USDS | yes | yes | — | yes | — | — +DNS | yes | yes | yes | yes | — | — +Timers | yes | yes | yes | yes | — | — +Events | yes | yes | yes | yes | — | — +Unix Signals | yes² | yes | — | — | — | — +Files | yes | yes | yes | yes | — | yes +UI Integration | yes¹ | yes¹ | yes | yes¹ | — | yes? +File watcher | yes² | yes | yes | yes² | — | — +Pipes | yes | yes | — | yes | — | — +Processes | yes | yes | — | yes | — | — ¹ Manually, by adopting the X11 display connection socket diff --git a/dub.sdl b/dub.sdl index aae914fd..1e1d6199 100644 --- a/dub.sdl +++ b/dub.sdl @@ -2,7 +2,8 @@ name "eventcore" description "Pro-actor based abstraction layer over operating system asynchronous I/O facilities." license "MIT" copyright "Copyright © 2016-2018 Sönke Ludwig" - +license "MIT" +dependency "during" version="~>0.2.1" targetType "library" libs "resolv" platform="linux" diff --git a/examples/uring.d b/examples/uring.d new file mode 100644 index 00000000..3cd45d04 --- /dev/null +++ b/examples/uring.d @@ -0,0 +1,68 @@ +/++ dub.sdl: + name "uring-example" + description "this does nothing, but also does not crash" + dependency "eventcore" path=".." + versions "UringEventLoopDebug" + debugVersions "UringEventLoopDebug" ++/ +import std.stdio; +import core.time; +import std.string; + +import eventcore.drivers.posix.epoll; +import eventcore.drivers.posix.io_uring.io_uring; +import eventcore.drivers.posix.io_uring.files; +import eventcore.driver; + +import std.datetime.stopwatch; + +void main() +{ + StopWatch sw; + sw.start(); + auto drv = new EpollEventDriver(); + auto files = drv.files; + + auto fd = files.open("/tmp/testfile", FileOpenMode.createTrunc); + assert (files.isValid(fd)); + files.write(fd, 0, "this is a testwrite".representation, IOMode.init, + (FileFD file, IOStatus status, size_t written) + { + try {writefln("write: %s, %s, %s, %s", file, fd, status, written);} + catch(Exception e) {} + } + ); + ExitReason r = drv.core.processEvents(Duration.max); + writeln(r); + //assert (r == ExitReason.idle); + ubyte[256] buffer; + files.read(fd, 0, buffer[], IOMode.init, + (FileFD file, IOStatus status, size_t read) + { + try { + writefln("read: %s, %s, %s, %s", file, fd, status, read); + writeln(cast(char[]) buffer[0 .. read]); + } + catch(Exception e) {} + } + ); + writeln(drv.core.processEvents(Duration.max)); + files.read(fd, 0, buffer[], IOMode.init, + (FileFD file, IOStatus status, size_t read) + { + try { + writefln("read: %s, %s, %s, %s", file, fd, status, read); + writeln(cast(char[]) buffer[0 .. read]); + } + catch(Exception e) {} + } + ); + files.write(fd, 19, "this is a snd testwrite".representation, IOMode.init, + (FileFD file, IOStatus status, size_t written) + { + try {writefln("write: %s, %s, %s, %s", file, fd, status, written);} + catch(Exception e) {} + } + ); + writeln(drv.core.processEvents(Duration.max)); +} diff --git a/meson.build b/meson.build index 4a6b1950..1073066e 100644 --- a/meson.build +++ b/meson.build @@ -8,8 +8,11 @@ project_version_suffix = '' project_version = meson.project_version() project_version_full = project_version + project_version_suffix -taggedalgebraic_dep = dependency('taggedalgebraic', version: ['>=0.10.12', '<0.12']) +taggedalgebraic_dep = dependency('taggedalgebraic', + version: ['>=0.10.12', '<0.12'], + fallback: ['taggedalgebraic', 'taggedalgebraic_source_dep']) source_root = meson.source_root() build_root = meson.build_root() subdir('source/eventcore') +subdir('examples') diff --git a/source/eventcore/driver.d b/source/eventcore/driver.d index 8316850b..f4ea9cc0 100644 --- a/source/eventcore/driver.d +++ b/source/eventcore/driver.d @@ -1019,6 +1019,7 @@ alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD, scope alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t); alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress); alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]); +alias FileOpenCallback = void delegate(FileFD); alias FileIOCallback = void delegate(FileFD, IOStatus, size_t); alias FileCloseCallback = void delegate(FileFD, CloseStatus); alias PipeIOCallback = void delegate(PipeFD, IOStatus, size_t); diff --git a/source/eventcore/drivers/posix/driver.d b/source/eventcore/drivers/posix/driver.d index dc2b50df..b305aee2 100644 --- a/source/eventcore/drivers/posix/driver.d +++ b/source/eventcore/drivers/posix/driver.d @@ -14,6 +14,8 @@ import eventcore.drivers.posix.sockets; import eventcore.drivers.posix.watchers; import eventcore.drivers.posix.processes; import eventcore.drivers.posix.pipes; +import eventcore.drivers.posix.io_uring.io_uring : UringEventLoop; +import eventcore.drivers.posix.io_uring.files : UringDriverFiles; import eventcore.drivers.timer; import eventcore.drivers.threadedfile; import eventcore.internal.consumablequeue : ConsumableQueue; @@ -51,7 +53,8 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { version (Windows) alias DNSDriver = EventDriverDNS_GHBN!(EventsDriver, SignalsDriver); else version (EventcoreUseGAIA) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver); else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver); - alias FileDriver = ThreadedFileEventDriver!EventsDriver; + version (linux) alias FileDriver = UringDriverFiles; + else alias FileDriver = ThreadedFileEventDriver!EventsDriver; version (Posix) alias PipeDriver = PosixEventDriverPipes!Loop; else alias PipeDriver = DummyEventDriverPipes!Loop; version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver; @@ -61,6 +64,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { else alias ProcessDriver = DummyEventDriverProcesses!Loop; Loop m_loop; + version (linux) UringEventLoop m_uring; CoreDriver m_core; EventsDriver m_events; SignalsDriver m_signals; @@ -76,15 +80,17 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver { this() @nogc @trusted { m_loop = mallocT!Loop; + version (linux) m_uring = mallocT!UringEventLoop; m_sockets = mallocT!SocketsDriver(m_loop); m_events = mallocT!EventsDriver(m_loop, m_sockets); m_signals = mallocT!SignalsDriver(m_loop); m_timers = mallocT!TimerDriver; m_pipes = mallocT!PipeDriver(m_loop); m_processes = mallocT!ProcessDriver(m_loop, this); - m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes); + m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes, m_uring); m_dns = mallocT!DNSDriver(m_events, m_signals); - m_files = mallocT!FileDriver(m_events); + version (linux) m_files = mallocT!FileDriver(m_uring); + else m_files = mallocT!FileDriver(m_events); m_watchers = mallocT!WatcherDriver(m_events); } @@ -177,6 +183,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime Timers m_timers; Events m_events; Processes m_processes; + UringEventLoop m_uring; bool m_exit = false; EventID m_wakeupEvent; @@ -184,7 +191,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime ConsumableQueue!ThreadCallbackEntry m_threadCallbacks; } - this(Loop loop, Timers timers, Events events, Processes processes) + this(Loop loop, Timers timers, Events events, Processes processes, + UringEventLoop uring) @nogc { m_loop = loop; m_timers = timers; @@ -194,6 +202,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime m_threadCallbackMutex = mallocT!(shared(Mutex)); m_threadCallbacks = mallocT!(ConsumableQueue!ThreadCallbackEntry); m_threadCallbacks.reserve(1000); + m_uring = uring; + m_uring.registerEventID(m_wakeupEvent); } final void dispose() @@ -207,7 +217,9 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime } catch (Exception e) assert(false, e.msg); } - @property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount + m_processes.pendingCount; } + @property size_t waiterCount() const { + return m_loop.m_waiterCount + m_timers.pendingCount + m_processes.pendingCount + m_uring.waiterCount; + } final override ExitReason processEvents(Duration timeout) { @@ -223,12 +235,17 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime if (!waiterCount) { return ExitReason.outOfWaiters; } - + version (linux) { + // this is required to make the kernel aware of + // submitted SEQ, otherwise the first call to + // process events could stall + m_uring.submit; + } bool got_events; - if (timeout <= 0.seconds) { got_events = m_loop.doProcessEvents(0.seconds); m_timers.process(MonoTime.currTime); + version (linux) got_events |= m_uring.doProcessEvents(0.seconds); } else { auto now = MonoTime.currTime; do { @@ -237,6 +254,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime auto prev_step = now; now = MonoTime.currTime; got_events |= m_timers.process(now); + version(linux) got_events |= m_uring.doProcessEvents(0.seconds); if (timeout != Duration.max) timeout -= now - prev_step; } while (timeout > 0.seconds && !m_exit && !got_events); diff --git a/source/eventcore/drivers/posix/events.d b/source/eventcore/drivers/posix/events.d index 00631c3d..b1dde659 100644 --- a/source/eventcore/drivers/posix/events.d +++ b/source/eventcore/drivers/posix/events.d @@ -80,7 +80,6 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS if (!() @trusted { fd[1] = socket(AF_INET, SOCK_DGRAM, 0); int nl = addr.nameLen; - import eventcore.internal.utils : print; if (bind(fd[1], addr.name, addr.nameLen) != 0) return false; assert(nl == addr.nameLen); @@ -167,7 +166,9 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS @trusted { EventID event = cast(EventID)fd; ulong cnt; - () @trusted { .read(cast(int)event, &cnt, cnt.sizeof); } (); + () @trusted { + .read(cast(int)event, &cnt, cnt.sizeof); + } (); trigger(event, cnt > 0); } } else { diff --git a/source/eventcore/drivers/posix/io_uring/files.d b/source/eventcore/drivers/posix/io_uring/files.d new file mode 100644 index 00000000..1bbc584f --- /dev/null +++ b/source/eventcore/drivers/posix/io_uring/files.d @@ -0,0 +1,259 @@ +module eventcore.drivers.posix.io_uring.files; + +import eventcore.internal.utils; + +import eventcore.driver; +import eventcore.drivers.posix.io_uring.io_uring; +import core.sys.posix.sys.types; +import core.sys.posix.sys.stat; +import core.sys.linux.fcntl; +import during; + +import taggedalgebraic; + +final class UringDriverFiles : EventDriverFiles +{ + nothrow: + private { + UringEventLoop m_loop; + int m_idx; + } + + this(UringEventLoop loop) @nogc nothrow + { + m_loop = loop; + } + + void dispose() @safe {} + + // + FileFD open(string path, FileOpenMode mode) + { + import std.string : toStringz; + import std.conv : octal; + + int flags; + int amode; + + final switch (mode) { + case FileOpenMode.read: flags = O_RDONLY; break; + case FileOpenMode.readWrite: flags = O_RDWR; break; + case FileOpenMode.createTrunc: flags = O_RDWR|O_CREAT|O_TRUNC; amode = octal!644; break; + case FileOpenMode.append: flags = O_WRONLY|O_CREAT|O_APPEND; amode = octal!644; break; + } + auto fd = () @trusted { return .open(path.toStringz(), flags, amode); } (); + if (fd < 0) return FileFD.init; + return adopt(fd); + } + + void open(string path, FileOpenMode mode, FileOpenCallback) + { + //TODO how do we handle ops without a prior resource/fd? + } + + FileFD adopt(int system_file_handle) + { + auto flags = () @trusted { return fcntl(system_file_handle, F_GETFD); } (); + if (flags == -1) return FileFD.invalid; + return () @trusted { return m_loop.initFD!FileFD(system_file_handle); }(); + } + + /** Disallows any reads/writes and removes any exclusive locks. + + Note that the file handle may become invalid at any point after the + call to `close`, regardless of its current reference count. Any + operations on the handle will not have an effect. + */ + void close(FileFD file, FileCloseCallback onClosed) @trusted + { + if (!isValid(file)) { + onClosed(file, CloseStatus.invalidHandle); + return; + } + SubmissionEntry e; + e.prepClose(cast(int) file); + m_loop.put(cast(FD) file, EventType.status, e, &handleClose, + cast(UserCallback) onClosed); + } + + private void onCompletionEvent(int fd, EventType type, ref const(CompletionEntry) e, + UserCallback cb) + { + } + + private void handleClose(FD fd, ref const(CompletionEntry) e, UserCallback userCb) + nothrow + { + FileCloseCallback cb = cast(FileCloseCallback) userCb; + if (e.res < 0) + cb(cast(FileFD) fd, CloseStatus.ioError); + else + cb(cast(FileFD) fd, CloseStatus.ok); + } + + private void handleOpen(FD fd, ref const(CompletionEntry) e, UserCallback cb) + { + + } + + ulong getSize(FileFD file) + { + import core.sys.linux.unistd : lseek64; + import core.sys.posix.stdio : SEEK_END; + if (!isValid(file)) return ulong.max; + + // stat_t seems to be defined wrong on linux/64 + return lseek64(cast(int)file, 0, SEEK_END); + } + + /** Shrinks or extends a file to the specified size. + + Params: + file = Handle of the file to resize + size = Desired file size in bytes + on_finish = Called when the operation finishes - the `size` + parameter is always set to zero + Note: this is a blocking call, since io_uring does not support + this yet. + */ + void truncate(FileFD file, ulong size, FileIOCallback on_finish) + { + import core.sys.linux.unistd : ftruncate; + // currently not supported by uring + if (ftruncate(cast(int)file, size) != 0) { + on_finish(file, IOStatus.error, 0); + return; + } + on_finish(file, IOStatus.ok, 0); + } + + /** Writes data to a file + + Note that only a single write operation is allowed at once. The caller + needs to make sure that either `on_write_finish` got called, or + `cancelWrite` was called before issuing the next call to `write`. + + Note: IOMode is ignored + */ + void write(FileFD file, ulong offset, const(ubyte)[] buffer, IOMode mode, FileIOCallback on_write_finish) + @trusted + { + if (!isValid(file)) + { + on_write_finish(file, IOStatus.invalidHandle, 0); + return; + } + + SubmissionEntry e; + e.prepWrite(cast(int) file, buffer, offset); + m_loop.put(file, EventType.write, e, &handleIO, + cast(UserCallback) on_write_finish); + } + + private void handleIO(FD file, ref const(CompletionEntry) e, UserCallback cb) + nothrow + { + import std.algorithm : max; + FileIOCallback ioCb = cast(FileIOCallback) cb; + IOStatus status = e.res < 0 ? IOStatus.error : IOStatus.ok; + size_t written = max(0, e.res); + + ioCb(cast(FileFD) file, status, written); + } + + /** Cancels an ongoing write operation. + + After this function has been called, the `FileIOCallback` specified in + the call to `write` is guaranteed not to be called. + */ + void cancelWrite(FileFD file) + { + m_loop.cancelOp(file, EventType.write); + } + + /** Reads data from a file. + + Note that only a single read operation is allowed at once. The caller + needs to make sure that either `on_read_finish` got called, or + `cancelRead` was called before issuing the next call to `read`. + + Note: `mode` is ignored for files + */ + void read(FileFD file, ulong offset, ubyte[] buffer, IOMode, FileIOCallback on_read_finish) + @trusted + { + if (!isValid(file)) + { + on_read_finish(file, IOStatus.invalidHandle, 0); + return; + } + SubmissionEntry e; + e.prepRead(cast(int) file, buffer, offset); + m_loop.put(file, EventType.read, e, &handleIO, + cast(UserCallback) on_read_finish); + } + + /** Cancels an ongoing read operation. + + After this function has been called, the `FileIOCallback` specified in + the call to `read` is guaranteed not to be called. + */ + void cancelRead(FileFD file) + { + m_loop.cancelOp(file, EventType.read); + } + + /** Determines whether the given file handle is valid. + + A handle that is invalid will result in no operations being carried out + when used. In particular `addRef`/`releaseRef` will have no effect, but + can safely be called and I/O operations will result in + `IOStatus.invalidHandle`. + + A valid handle gets invalid when either the reference count drops to + zero, or after the file was explicitly closed. + */ + bool isValid(FileFD handle) const @nogc + { + return m_loop.isValid(handle); + } + + /** Increments the reference count of the given file. + */ + void addRef(FileFD descriptor) + { + m_loop.addRef(descriptor); + } + + /** Decrements the reference count of the given file. + + Once the reference count reaches zero, all associated resources will be + freed and the resource descriptor gets invalidated. + + Returns: + Returns `false` $(I iff) the last reference was removed by this call. + + Passing an invalid handle will result in a return value of `true`. + */ + bool releaseRef(FileFD descriptor) + { + return m_loop.releaseRef(descriptor); + } + + /** Retrieves a reference to a user-defined value associated with a descriptor. + */ + @property final ref T userData(T)(FileFD descriptor) + @trusted { + import std.conv : emplace; + static void init(void* ptr) @nogc { emplace(cast(T*)ptr); } + static void destr(void* ptr) @nogc { destroy(*cast(T*)ptr); } + return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr); + } + + /// Low-level user data access. Use `userData` instead. + protected void* rawUserData(FileFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system nothrow + { + return m_loop.rawUserData(descriptor, size, initialize, destroy); + } + +} diff --git a/source/eventcore/drivers/posix/io_uring/io_uring.d b/source/eventcore/drivers/posix/io_uring/io_uring.d new file mode 100644 index 00000000..734673c3 --- /dev/null +++ b/source/eventcore/drivers/posix/io_uring/io_uring.d @@ -0,0 +1,312 @@ +/** + Linux io_uring based event driver implementation. + + io_uring is an efficient API for asynchronous I/O on Linux, suitable for + large numbers of concurrently open sockets. + + It beats epoll if you have more than 10'000 connections, has a smaller + systemcall overhead and supports sockets as well as timers and files. + + + io_uring works differently from epoll. Poll/epoll/kqueue based solutions + will tell you when it is safe to read or .write from a descriptor + without blocking. Upon such a notification one has to do it manually by + envoking the resp. system call. + On the other hand in io_uring one asynchronously issues a command to + read or write to a descriptor and gets a notification back once the + operation is complete. Issuing a command is done by placing a + Submission Queue Entry (SQE) in a ringbuffer shared by user space and + kernel. Upon completion a Completion Queue Entry (CQE) is placed + by the kernel into another shared ringbuffer. + + This has implications on the layout of the internal data structures. While + the posix event loops center everything around the descriptors, the io_uring + loop tracks individual operations (that reference descriptors). + + 1. For some operations (e.g. opening a file) the descriptor is only + available after the operation completes. + 2. After an operation is cancelled, it will result in an CQE nontheless, + and we cannot reliably handle this without tracking individual operations + (and multiple operations of the same kind per descriptor) + + We still have to track information per descriptor, e.g. to ensure that + only one operation per kind is ongoing at the time. + + This implementation tries to integrate with an sockets based event loop + (the base loop) for the primary reason that not all functionality + (e.g. signalfd) is currently available via io_uring. + + We do this by registering an eventfd with the base loop that triggers + whenever new completions are available and thus wakes up the base loop. +*/ +module eventcore.drivers.posix.io_uring.io_uring; + +version (linux): + +import eventcore.driver; +import eventcore.internal.utils; + +import core.time : Duration; + +import during; +import std.stdio; + +/// eventcore allows exactly one simultaneous operation per kind per +/// descriptor. +enum EventType { + none, + read, + write, + status +} + +private void assumeSafeNoGC(scope void delegate() nothrow doit) nothrow @nogc +@trusted { + (cast(void delegate() nothrow @nogc)doit)(); +} + +// this is the callback provided by the user to e.g. EventDriverFiles +alias UserCallback = void delegate() nothrow; + +// this is provided by i.e. EventDriverFiles to UringEventLoop. The event loop +// will call OpCallback which should in turn call UserCallback. This is useful +// to decouble the uring stuff from the actual drivers. +alias OpCallback = void delegate(FD, ref const(CompletionEntry), UserCallback) nothrow; + +// data stored per operation. Since after completion the `OpCallback` +// is called with the descriptor and CompletionEntry, all necessary information +// should be available without creating a real closure for OpCallback. +struct OpData +{ + OpCallback opCb; + UserCallback userCb; +} + +// information regarding a specific resource / descriptor +private struct ResourceData +{ + // from the os, file descriptor, socket etc + int descriptor; + // ref count, we'll clean up internal data structures + // if this reaches zero + uint refCount; + // all data structures are reused and the validation + // counter makes sure an user cannot access a reused + // slot with an old handle + uint validationCounter; + + // to track that at most one op per EventType is ongoing + OpData[EventType.max+1] runningOps; +} + +// the user can store extra information per resource +// which is kept in a sep. array +private struct UserData +{ + DataInitializer userDataDestructor; + ubyte[16*size_t.sizeof] userData; +} + +/// +final class UringEventLoop +{ + import eventcore.internal.consumablequeue; + import std.typecons : Tuple, tuple; + + private { + Uring m_io; + ChoppedVector!(ResourceData) m_fds; + int m_runningOps; + } + + nothrow @nogc + this() + { + assumeSafeNoGC({ + int res = m_io.setup(); + debug(UringEventLoopDebug) + { + if (res < 0) + print("Setting up uring failed: %s", -res); + } + }); + } + + void registerEventID(EventID id) nothrow @trusted @nogc + { + m_io.registerEventFD(cast(int) id); + } + + bool isValid(FD fd) const @nogc nothrow @safe + { + return fd.value < m_fds.length + && m_fds[fd].validationCounter == fd.validationCounter; + } + + void addRef(FD fd) @nogc nothrow @safe + { + if (!isValid(fd)) + return; + m_fds[fd].refCount += 1; + } + + bool releaseRef(FD fd) @nogc nothrow @trusted + { + if (!isValid(fd)) + return true; + + ResourceData* fds = &m_fds[fd]; + fds.refCount -= 1; + if (fds.refCount == 0) + { + import std.traits : EnumMembers; + + // cancel all pendings ops + foreach (type; EnumMembers!EventType) + { + if (fds.runningOps[type].opCb != null) + cancelOp(fd, type); + } + } + return fds.refCount == 0; + } + + void submit() nothrow @trusted @nogc + { + m_io.submit(0); + } + + bool doProcessEvents(Duration timeout, bool dontWait = true) nothrow @trusted + { + import std.algorithm : max; + + // we add a timeout so that we do not wait indef. + if (!dontWait && timeout != Duration.max) + { + KernelTimespec timespec; + timeout.split!("seconds", "nsecs")(timespec.tv_sec, timespec.tv_nsec); + m_io.putWith!((ref SubmissionEntry e, KernelTimespec timespec) + { + // wait for timeout or first completion + e.prepTimeout(timespec, 1); + e.user_data = ulong.max; + })(timespec); + } + int res = m_io.submit(dontWait ? 0 : 1); + if (res < 0) + { + return false; + } + bool gotEvents = !m_io.empty; + foreach (ref CompletionEntry e; m_io) + { + import eventcore.internal.utils : print; + import std.algorithm : all; + // internally used timeouts + if (e.user_data == ulong.max) + continue; + int fd; + EventType type; + // let the specific driver handle the rest + splitKey(e.user_data, fd, type); + assert (fd < m_fds.length); + OpData* op = &m_fds[fd].runningOps[type]; + // cb might be null, if the operation was cancelled + if (op.opCb) + { + m_runningOps -= 1; + op.opCb(FD(fd, m_fds[fd].validationCounter), e, op.userCb); + *op = OpData.init; + } else if (m_fds[fd].runningOps[].all!(x => x.opCb == null)) + { + resetFD(m_fds[fd]); + } + } + return gotEvents; + } + + void resetFD(ref ResourceData data) nothrow @nogc + { + data.descriptor = 0; + } + + @property size_t waiterCount() const nothrow @safe { return m_runningOps; } + + package FDType initFD(FDType)(size_t fd) + { + auto slot = &m_fds[fd]; + assert (slot.refCount == 0, "Initializing referenced file descriptor slot."); + assert (slot.descriptor == 0, "Initializing referenced file descriptor slot."); + slot.refCount = 1; + return FDType(fd, slot.validationCounter); + } + + package void put(in FD fd, in EventType type, SubmissionEntry e, + OpCallback cb, UserCallback userCb) nothrow + { + m_runningOps += 1; + ulong userData = combineKey(cast(int) fd, type); + e.user_data = userData; + m_io.put(e); + assert (m_fds[fd.value].runningOps[type].opCb == null); + m_fds[fd.value].runningOps[type] = OpData(cb, userCb); + } + + void cancelOp(FD fd, EventType type) @trusted nothrow @nogc + { + if (!isValid(fd)) + { + print("cancel for invalid fd"); + return; + } + if (m_fds[fd].runningOps[type] == OpData.init) + { + print("cancelling op that's not running"); + return; + } + ulong op = combineKey(cast(int) fd, type); + m_io.putWith!((ref SubmissionEntry e, ulong op) + { + // result is ignored (own userData is ulong.max) + prepCancel(e, op); + e.user_data = ulong.max; + })(op); + m_runningOps -= 1; + m_fds[fd].runningOps[type] = OpData.init; + } + + package void* rawUserData(FD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) + nothrow @system + { + return null; + } + + package final void* rawUserDataImpl(size_t descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) + @system @nogc nothrow + { + return null; + } +} + +void splitKey(ulong key, out int fd, out EventType type) @nogc nothrow +{ + fd = cast(int) (key >> 32); + type = cast(EventType) ((key << 32) >>> 32); +} + +ulong combineKey(int fd, EventType type) @nogc nothrow +{ + return cast(ulong)(fd) << 32 | cast(int) type; +} + +@nogc nothrow +unittest +{ + ulong orig = 0xDEAD0001; + int fd; + EventType type; + splitKey(orig, fd, type); + assert (type == cast(EventType)1); + assert (fd == 0xDEAD); + assert (orig == combineKey(driver, op)); +} diff --git a/source/eventcore/meson.build b/source/eventcore/meson.build index 5e054169..4ff70e60 100644 --- a/source/eventcore/meson.build +++ b/source/eventcore/meson.build @@ -6,6 +6,8 @@ eventcore_src = [ 'drivers/posix/driver.d', 'drivers/posix/epoll.d', 'drivers/posix/events.d', + 'drivers/posix/io_uring/io_uring.d', + 'drivers/posix/io_uring/files.d', 'drivers/posix/kqueue.d', 'drivers/posix/pipes.d', 'drivers/posix/processes.d',