Skip to content

Commit

Permalink
feat: start work on an io_uring driver
Browse files Browse the repository at this point in the history
  • Loading branch information
Panke committed Mar 27, 2021
1 parent a257d6c commit c2dc2cd
Show file tree
Hide file tree
Showing 10 changed files with 691 additions and 25 deletions.
29 changes: 15 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ EpollEventDriver | yes | — | — | — | — | —
WinAPIEventDriver | — | yes | — | — | — | —
KqueueEventDriver | — | — | yes | yes¹ | — | —
LibasyncEventDriver | —¹| —¹| —¹| —¹| — | —
UringEventDriver | —¹| no | no | no | unknown | no

¹ planned, but not currenly implemented

Expand All @@ -48,20 +49,20 @@ The following compilers are tested and supported:
Driver development status
-------------------------

Feature \ EventDriver | Select | Epoll | WinAPI | Kqueue | Libasync
----------------------|--------|-------|---------|---------|----------
TCP Sockets | yes | yes | yes | yes | —
UDP Sockets | yes | yes | yes | yes | —
USDS | yes | yes | — | yes | —
DNS | yes | yes | yes | yes | —
Timers | yes | yes | yes | yes | —
Events | yes | yes | yes | yes | —
Unix Signals | yes² | yes | — | — | —
Files | yes | yes | yes | yes | —
UI Integration | yes¹ | yes¹ | yes | yes¹ | —
File watcher | yes² | yes | yes | yes² | —
Pipes | yes | yes | — | yes | —
Processes | yes | yes | — | yes | —
Feature \ EventDriver | Select | Epoll | WinAPI | Kqueue | Libasync | Uring
----------------------|--------|-------|---------|---------|----------|-------
TCP Sockets | yes | yes | yes | yes | — | —
UDP Sockets | yes | yes | yes | yes | — | —
USDS | yes | yes | — | yes | — | —
DNS | yes | yes | yes | yes | — | —
Timers | yes | yes | yes | yes | — | —
Events | yes | yes | yes | yes | — | —
Unix Signals | yes² | yes | — | — | — | —
Files | yes | yes | yes | yes | — | yes
UI Integration | yes¹ | yes¹ | yes | yes¹ | — | yes?
File watcher | yes² | yes | yes | yes² | — | —
Pipes | yes | yes | — | yes | — | —
Processes | yes | yes | — | yes | — | —

¹ Manually, by adopting the X11 display connection socket

Expand Down
3 changes: 2 additions & 1 deletion dub.sdl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ name "eventcore"
description "Pro-actor based abstraction layer over operating system asynchronous I/O facilities."
license "MIT"
copyright "Copyright © 2016-2018 Sönke Ludwig"

license "MIT"
dependency "during" version="~>0.2.1"
targetType "library"

libs "resolv" platform="linux"
Expand Down
68 changes: 68 additions & 0 deletions examples/uring.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/++ dub.sdl:
name "uring-example"
description "this does nothing, but also does not crash"
dependency "eventcore" path=".."
versions "UringEventLoopDebug"
debugVersions "UringEventLoopDebug"
+/
import std.stdio;
import core.time;
import std.string;

import eventcore.drivers.posix.epoll;
import eventcore.drivers.posix.io_uring.io_uring;
import eventcore.drivers.posix.io_uring.files;
import eventcore.driver;

import std.datetime.stopwatch;

void main()
{
StopWatch sw;
sw.start();
auto drv = new EpollEventDriver();
auto files = drv.files;

auto fd = files.open("/tmp/testfile", FileOpenMode.createTrunc);
assert (files.isValid(fd));
files.write(fd, 0, "this is a testwrite".representation, IOMode.init,
(FileFD file, IOStatus status, size_t written)
{
try {writefln("write: %s, %s, %s, %s", file, fd, status, written);}
catch(Exception e) {}
}
);
ExitReason r = drv.core.processEvents(Duration.max);
writeln(r);
//assert (r == ExitReason.idle);
ubyte[256] buffer;
files.read(fd, 0, buffer[], IOMode.init,
(FileFD file, IOStatus status, size_t read)
{
try {
writefln("read: %s, %s, %s, %s", file, fd, status, read);
writeln(cast(char[]) buffer[0 .. read]);
}
catch(Exception e) {}
}
);
writeln(drv.core.processEvents(Duration.max));
files.read(fd, 0, buffer[], IOMode.init,
(FileFD file, IOStatus status, size_t read)
{
try {
writefln("read: %s, %s, %s, %s", file, fd, status, read);
writeln(cast(char[]) buffer[0 .. read]);
}
catch(Exception e) {}
}
);
files.write(fd, 19, "this is a snd testwrite".representation, IOMode.init,
(FileFD file, IOStatus status, size_t written)
{
try {writefln("write: %s, %s, %s, %s", file, fd, status, written);}
catch(Exception e) {}
}
);
writeln(drv.core.processEvents(Duration.max));
}
5 changes: 4 additions & 1 deletion meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ project_version_suffix = ''
project_version = meson.project_version()
project_version_full = project_version + project_version_suffix

taggedalgebraic_dep = dependency('taggedalgebraic', version: ['>=0.10.12', '<0.12'])
taggedalgebraic_dep = dependency('taggedalgebraic',
version: ['>=0.10.12', '<0.12'],
fallback: ['taggedalgebraic', 'taggedalgebraic_source_dep'])

source_root = meson.source_root()
build_root = meson.build_root()
subdir('source/eventcore')
subdir('examples')
1 change: 1 addition & 0 deletions source/eventcore/driver.d
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,7 @@ alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD, scope
alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t);
alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress);
alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]);
alias FileOpenCallback = void delegate(FileFD);
alias FileIOCallback = void delegate(FileFD, IOStatus, size_t);
alias FileCloseCallback = void delegate(FileFD, CloseStatus);
alias PipeIOCallback = void delegate(PipeFD, IOStatus, size_t);
Expand Down
32 changes: 25 additions & 7 deletions source/eventcore/drivers/posix/driver.d
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import eventcore.drivers.posix.sockets;
import eventcore.drivers.posix.watchers;
import eventcore.drivers.posix.processes;
import eventcore.drivers.posix.pipes;
import eventcore.drivers.posix.io_uring.io_uring : UringEventLoop;
import eventcore.drivers.posix.io_uring.files : UringDriverFiles;
import eventcore.drivers.timer;
import eventcore.drivers.threadedfile;
import eventcore.internal.consumablequeue : ConsumableQueue;
Expand Down Expand Up @@ -51,7 +53,8 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
version (Windows) alias DNSDriver = EventDriverDNS_GHBN!(EventsDriver, SignalsDriver);
else version (EventcoreUseGAIA) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver);
else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver);
alias FileDriver = ThreadedFileEventDriver!EventsDriver;
version (linux) alias FileDriver = UringDriverFiles;
else alias FileDriver = ThreadedFileEventDriver!EventsDriver;
version (Posix) alias PipeDriver = PosixEventDriverPipes!Loop;
else alias PipeDriver = DummyEventDriverPipes!Loop;
version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver;
Expand All @@ -61,6 +64,7 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
else alias ProcessDriver = DummyEventDriverProcesses!Loop;

Loop m_loop;
version (linux) UringEventLoop m_uring;
CoreDriver m_core;
EventsDriver m_events;
SignalsDriver m_signals;
Expand All @@ -76,15 +80,17 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
this()
@nogc @trusted {
m_loop = mallocT!Loop;
version (linux) m_uring = mallocT!UringEventLoop;
m_sockets = mallocT!SocketsDriver(m_loop);
m_events = mallocT!EventsDriver(m_loop, m_sockets);
m_signals = mallocT!SignalsDriver(m_loop);
m_timers = mallocT!TimerDriver;
m_pipes = mallocT!PipeDriver(m_loop);
m_processes = mallocT!ProcessDriver(m_loop, this);
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes);
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes, m_uring);
m_dns = mallocT!DNSDriver(m_events, m_signals);
m_files = mallocT!FileDriver(m_events);
version (linux) m_files = mallocT!FileDriver(m_uring);
else m_files = mallocT!FileDriver(m_events);
m_watchers = mallocT!WatcherDriver(m_events);
}

Expand Down Expand Up @@ -177,14 +183,16 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
Timers m_timers;
Events m_events;
Processes m_processes;
UringEventLoop m_uring;
bool m_exit = false;
EventID m_wakeupEvent;

shared Mutex m_threadCallbackMutex;
ConsumableQueue!ThreadCallbackEntry m_threadCallbacks;
}

this(Loop loop, Timers timers, Events events, Processes processes)
this(Loop loop, Timers timers, Events events, Processes processes,
UringEventLoop uring)
@nogc {
m_loop = loop;
m_timers = timers;
Expand All @@ -194,6 +202,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
m_threadCallbackMutex = mallocT!(shared(Mutex));
m_threadCallbacks = mallocT!(ConsumableQueue!ThreadCallbackEntry);
m_threadCallbacks.reserve(1000);
m_uring = uring;
m_uring.registerEventID(m_wakeupEvent);
}

final void dispose()
Expand All @@ -207,7 +217,9 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
} catch (Exception e) assert(false, e.msg);
}

@property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount + m_processes.pendingCount; }
@property size_t waiterCount() const {
return m_loop.m_waiterCount + m_timers.pendingCount + m_processes.pendingCount + m_uring.waiterCount;
}

final override ExitReason processEvents(Duration timeout)
{
Expand All @@ -223,12 +235,17 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
if (!waiterCount) {
return ExitReason.outOfWaiters;
}

version (linux) {
// this is required to make the kernel aware of
// submitted SEQ, otherwise the first call to
// process events could stall
m_uring.submit;
}
bool got_events;

if (timeout <= 0.seconds) {
got_events = m_loop.doProcessEvents(0.seconds);
m_timers.process(MonoTime.currTime);
version (linux) got_events |= m_uring.doProcessEvents(0.seconds);
} else {
auto now = MonoTime.currTime;
do {
Expand All @@ -237,6 +254,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
auto prev_step = now;
now = MonoTime.currTime;
got_events |= m_timers.process(now);
version(linux) got_events |= m_uring.doProcessEvents(0.seconds);
if (timeout != Duration.max)
timeout -= now - prev_step;
} while (timeout > 0.seconds && !m_exit && !got_events);
Expand Down
5 changes: 3 additions & 2 deletions source/eventcore/drivers/posix/events.d
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
if (!() @trusted {
fd[1] = socket(AF_INET, SOCK_DGRAM, 0);
int nl = addr.nameLen;
import eventcore.internal.utils : print;
if (bind(fd[1], addr.name, addr.nameLen) != 0)
return false;
assert(nl == addr.nameLen);
Expand Down Expand Up @@ -167,7 +166,9 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
@trusted {
EventID event = cast(EventID)fd;
ulong cnt;
() @trusted { .read(cast(int)event, &cnt, cnt.sizeof); } ();
() @trusted {
.read(cast(int)event, &cnt, cnt.sizeof);
} ();
trigger(event, cnt > 0);
}
} else {
Expand Down
Loading

0 comments on commit c2dc2cd

Please sign in to comment.