☄️ dyco-coroutine 是一个纯C的动态协程框架。作者希望这个框架是真正实用且好用,而不仅仅是一个Demo。这个项目最早受到wangbojing/NtyCo
启发。也参考了一些别的项目,比如cloudwu/coroutine
, jamwt/libtask
和 stevedekorte/coroutine
。
使用这个框架,开发人员可以用同步的编程方式达到接近异步程序的I/O性能。作者希望这个框架是开箱即用的:没有过多的依赖和环境限制,使用的时候直接将子程序创建为协程即可,协程相关的所有工具(如调度器,套接字接口,协程同步)都由框架提供,所有子程序就会自动按照协程的方式运行。此外,作者提供了详细的示例,覆盖了dyco目前几乎所有的特性。通过快速阅读并运行这些示例,任何人都可以快速上手这个框架。
dyco-coroutine
的特性:
- 全自动的协程调度。
- 支持为协程设定独立的运行栈,或者设定采用共享栈。
- 支持协程池以达到更高的效率。
- 支持Socket/epoll钩子,可以改变一些上层网络API的行为。
- 支持等待信号事件。尤其是等待子进程,适合跟
fork()+exec()
配合使用。 - 支持在协程内部使用epoll,而不阻塞整个调度器循环。
- 提供了半双工信道和发布订阅信道,支持协程通信。
- 提供了信号量和等待组功能,支持实现协程同步。
- 支持非阻塞的TLS/SSL并发服务。
- 调度器及其管理的协程可以被暂停,然后在适当的时机恢复。
- 支持多线程。
- 支持非对称协程。
- 支持对紧急协程的优先调度。
为什么推荐选择dyco-coroutine
?:
- 优秀的性能。
- 优雅的编程方式。
- 没有依赖,易于构建/安装/链接。
- 文档完善,开发者积极维护。
预计未来还有一些工作方向:
- 更好的跨平台支持。目前的跨平台设计参考自
jamwt/libtask
。但是只支持Linux系统 (i386 AMD64 ARM ARM64...)。未来的工作可以参考heiher/hev-task-system
和tboox/tbox
。 - 发掘更多的特性需求,并不断完善。寻找Bug并修复。
- 性能优化。使用ucontext作为底层切换策略,意味着这个框架在协程切换性能上很难做到顶尖的表现(相比于直接用汇编)。但是依然可能存在其他的优化空间。
- 更丰富的构建方式。减少构建的复杂性,而不是增加。
- 有缓冲区的信道。
- 线程专有指针的封装接口。
如果这个项目对你有用,可以给个星星以表支持。也可以推荐给其他人用用看。吐槽或者有问题随时提issue。如果有想法也可以提PR,共同合作让这个项目变得更好。🌈
# 可选
$ sudo apt install libssl-dev
$ sudo apt install libhiredis-dev
# make编译
$ cd dyco-coroutine
$ make
# 运行
$ ./bin/xxx_example
# 可选: 编译之后安装共享库
$ sudo make install
# 编译应用的时候直接链接
$ gcc -o someoutput somecode.c -ldyco
# 卸载共享库
$ sudo make uninstall
# 1. 准备工作
# 可选
$ sudo apt install libssl-dev
$ sudo apt install libhiredis-dev
# 可选,仅用于Meson + ninja构建
$ sudo apt install meson
$ sudo apt install ninja-build
$ sudo apt install pkg-config
# 2. 使用GNU make构建
# 运行make
$ cd dyco-coroutine
$ make
# 运行示例
$ ./bin/xxx_example
# 可选: 编译之后安装共享库
$ sudo make install
# 编译应用的时候直接链接
$ gcc -o someoutput somecode.c -ldyco
# 卸载共享库
$ sudo make uninstall
# 3. 使用Meson + ninja构建
# 运行meson构建工具
$ meson build_dir
$ cd build_dir
$ meson compile
# 运行示例
$ build_dir/example/xxx_example
# 可选: 编译之后安装共享库
$ cd build_dir
$ sudo ninja install
# 编译应用的时候直接链接
$ gcc -o someoutput somecode.c -ldyco
# 卸载共享库
$ cd build_dir
$ sudo ninja uninstall
#include <stdio.h>
#include <stdlib.h>
#include <arpa/inet>
// include this file to enjoy dyco
#include "dyco/dyco_coroutine.h"
// Pass the arguments by pointer.
struct foobarargs {
int arg1;
char *arg2;
};
void foobar(void *arg)
{
// Get the arguments by pointer.
struct foobarargs *args = arg;
int cid, fd, client, ret, status;
// coroID can be obtained
cid = dyco_coroutine_coroID();
// Get Udata if necessary
dyco_semaphore *sem;
dyco_coroutine_getUdata(cid, &sem);
// Create other coroutines if necessary
dyco_coroutine_create(foobar, NULL);
// Use dyco_coroutine_sleep() instead of sleep()
dyco_coroutine_sleep(1000);
// Use dyco_xx socket API if COROUTINE_HOOK is undefined
fd = dyco_socket(AF_INET, SOCK_STREAM, 0);
client = dyco_accept(fd, xxx, yyy);
ret = dyco_recv(client, xxx, yyy, 0);
ret = dyco_recv(client, xxx, yyy, 0);
dyco_close(client);
// Use normal socket API if COROUTINE_HOOK is defined
fd = socket(AF_INET, SOCK_STREAM, 0);
client = accept(fd, xxx, yyy);
ret = recv(client, xxx, yyy, 0);
ret = recv(client, xxx, yyy, 0);
close(client);
ret = fork();
if (ret == 0) {
exec(...)
}
else if (ret < 0) {
return;
}
// Wait child for 3000 ms. Set timeout to -1 to wait until child process is finished
ret = dyco_signal_waitchild(ret, &status, 3000);
// Use dyco_epoll api if COROUTINE_HOOK is not defined
dyco_epoll_create(...)
dyco_epoll_add(...)
dyco_epoll_wait(...)
dyco_epoll_del(...)
dyco_epoll_destroy(...)
// Use normal epoll api if COROUTINE_HOOK is defined
epoll_wait(...)
return;
}
int main()
{
// Optional: Pass the arguments by pointer.
struct foobarargs *args = calloc(1, sizeof(struct foobarargs));
// Create the corotine
int cid = dyco_coroutine_create(foobar, args);
// Optional: Set stack if necessary
dyco_coroutine_setStack(cid, NULL, 4096);
// Optional: Create semaphore, channel, waitgroup or pubsubchannel
dyco_semaphore *sem = dyco_semaphore_create(3);
// Optional: Set Udata if necessary
dyco_coroutine_setUdata(cid, sem);
// Run
dyco_schedule_run();
return 0;
}
这里定义了一些基本的协程方法。注意sleep/wait/coroID
只能在协程里调用。
setStack
是可选的。如果在协程运行之前没有设置栈,那么默认协程是在调度器提供的共享栈上运行。否则,则协程将运行在独立的预设栈上。共享栈节约内存空间,但是每次切换都会拷贝出去,造成性能开销。因此如果一个协程需要频繁地切出切入,最好为其设置独立栈。详见 example/*
。
这里也提供了协程池,来防止频繁地创建释放协程和栈的内存空间。任何时候都可以创建协程池,创建完成后可以修改协程池的大小。协程池中的协程在用户函数执行完成后会自动归还到协程池。通过调用obtain
,你可以从协程池中获得一个空闲协程。详见 example/coropool_example.c
。
为了满足一些紧急协程的需求,dyco提供了2-优先级调度策略。通过调用create_urgent
或obtain_urgent
,你可以获得一个紧急协程,它往往比其他协程更早被唤醒。紧急的属性也可以在协程内部进行设置(setUrgent
)或者取消(unsetUrgent
)。
// return the coroutine ID on success, < 0 on error
int dyco_coroutine_create(proc_coroutine func, void *arg);
// this new coroutine will be resume as soon as possible
int dyco_coroutine_create_urgent(proc_coroutine func, void *arg);
void dyco_coroutine_sleep(uint32_t msecs);
// this coroutine will abort itself but no affect other coroutines.
void dyco_coroutine_abort();
// set timeout to -1 for persistent waiting. set timeout to 0 for no waiting
// return > 0 on success, 0 on timeout, < 0 on error
int dyco_coroutine_waitRead(int fd, int timeout);
int dyco_coroutine_waitWrite(int fd, int timeout);
int dyco_coroutine_waitRW(int fd, int timeout);
// return ID of current coroutine
int dyco_coroutine_coroID();
// stacksize: 0 to cancel independent stack
// return 1 on successfully set, 0 on successfully cancel, < 0 on error
int dyco_coroutine_setStack(int cid, void *stackptr, size_t stacksize);
int dyco_coroutine_getStack(int cid, void **stackptr, size_t *stacksize);
// return 1 on stack size > 7/8 of total, 0 on safe, abort coroutine on stack overflow.
int dyco_coroutine_checkStack();
// return 0 on success
int dyco_coroutine_setUdata(int cid, void *udata);
int dyco_coroutine_getUdata(int cid, void **udata);
// return total yield times of a coroutine
int dyco_coroutine_getSchedCount(int cid);
// return 0 on success.
int dyco_coroutine_setUrgent(int cid);
// return 0 on success
int dyco_coroutine_unsetUrgent(int cid);
// return NULL on error
dyco_coropool* dyco_coropool_create(int totalsize, size_t stacksize);
dyco_coropool* dyco_coropool_resize(dyco_coropool* cp, int newsize);
// return 0 on success
int dyco_coropool_destroy(dyco_coropool** cp);
// return number of available coroutine in this pool
int dyco_coropool_available(dyco_coropool* cp);
// obtain a coroutine from the pool.
// If there is no free coroutine, wait timeout
// return 0 on timeout, -1 on error, > 0 on success
int dyco_coropool_obtain(dyco_coropool* cp, proc_coroutine func, void *arg, int timeout);
int dyco_coropool_obtain_urgent(dyco_coropool* cp, proc_coroutine func, void *arg, int timeout);
这里定义了基本的调度器方法。实际上,dyco里的协程调度是由调度器自动完成的。create
是可选的,run
在大多数情况下足矣。详见example/*
。
// return 0 when done, 1 when stopped, < 0 on error
int dyco_schedule_run();
// stack_size: shared stack memory size
// loopwait_timeout: max delay (ms) of starting new coroutine
// return 0 on success
int dyco_schedule_create(size_t stack_size, uint64_t loopwait_timeout);
void dyco_schedule_free(dyco_schedule *sched);
// return ID of current scheduler
int dyco_schedule_schedID();
int dyco_schedule_setUdata(void *udata);
int dyco_schedule_getUdata(void **udata);
// return total number to coroutines of current scheduler
int dyco_schedule_getCoroCount();
调度器调用为协程提供了影响调度器行为的接口,比如阻塞某些信号,暂停或者终止调度器。详见example/stop_abort.c
。
// see sigprocmask
int dyco_schedcall_sigprocmask(int __how, sigset_t *__set, sigset_t *__oset);
// stop current scheduler and make it return as soon as possible
void dyco_schedcall_stop();
// shutdown the scheduler and kill its all coroutines
void dyco_schedcall_abort();
尽管开发人员使用协程是为了以同步的编程方式达到接近异步的I/O性能,但是dyco同样也支持传统的I/O多路复用编程方式。如果COROUTINE_HOOK
宏被开启,调用epoll_wait
将不会阻塞调度循环。方便起见,dyco也提供了dyco_epoll_xxx
系列接口。详见example/epoll.c
。
// return 0 on success
int dyco_epoll_init();
void dyco_epoll_destroy();
int dyco_epoll_add(int fd, struct epoll_event *ev);
int dyco_epoll_del(int fd, struct epoll_event *ev);
// return number of ready events, < 0 on error
int dyco_epoll_wait(struct epoll_event *events, int maxevents, int timeout);
// see epoll_wait
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
// see poll
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
通过调用dyco_signal
系列接口,等待某些信号的出现将不会导致调度循环被阻塞。waitchild + fork + exec
在某些情况下尤其有用。等待其他信号也是支持的。
注意,调用signal_init
后,指定的信号将被暂时屏蔽,直到signal_destroy
被调用。
详见example/signal.c
。
// return pid of the child on success, < 0 on error
int dyco_signal_waitchild(const pid_t child, int *status, int timeout);
// return 0 on success
int dyco_signal_init(sigset_t *mask);
void dyco_signal_destroy();
// return sizeof struct signalfd_siginfo on success, < 0 on error
int dyco_signal_wait(struct signalfd_siginfo *sinfo, int timeout);
半双工信道用于提供简单的协程通信。send
只有在信道空闲或者消息被接收方读走的情况下才能立刻返回,否则将会引起等待。详见example/channel.c
。
// size: max length of the message
dyco_channel* dyco_channel_create(size_t size);
void dyco_channel_destroy(dyco_channel **chan);
// return the actual sent length, 0 on channel closed, < 0 on error
ssize_t dyco_channel_send(dyco_channel *chan, void *buf, size_t size, int timeout);
// return the actual received length, 0 on channel closed, < 0 on error
ssize_t dyco_channel_recv(dyco_channel *chan, void *buf, size_t maxsize, int timeout);
发布订阅信道提供了1对N协程通信。如果没有订阅者,publish
将会失败。对于每个消息,订阅者都需要提前调用subscribe
来订阅,换言之,订阅不会自动持续。详见example/pubsub.c
。
// size: max length of the message
dyco_pubsubchannel* dyco_pubsub_create(size_t size);
void dyco_pubsub_destroy(dyco_pubsubchannel **pschan);
// return the actual published length, 0 on no subscriber, < 0 on error
ssize_t dyco_pubsub_publish(dyco_pubsubchannel *pschan, void *buf, size_t size);
// return the actual received length, < 0 on error
ssize_t dyco_pubsub_subscribe(dyco_pubsubchannel *pschan, void *buf, size_t maxsize, int timeout);
等待组提供了N对N的协程同步。协程可以通过调用add
来加入某个等待组,调用done
来提醒这个等待组,或者通过调用wait
来等待一定数量的协程完成。详见example/waitgroup.c
。
// suggest_size: estimated max number of coroutines on the waitgroup
dyco_waitgroup* dyco_waitgroup_create(int suggest_size);
void dyco_waitgroup_destroy(dyco_waitgroup **group);
// add coroutine to the waitgroup
int dyco_waitgroup_add(dyco_waitgroup* group, int cid);
// tell the waitgroup this coroutine is done
int dyco_waitgroup_done(dyco_waitgroup* group);
// target: target wait number, set it to -1 to wait until all group member done
// timeout: unit is ms, and set to -1 for persistent waiting
// return > 0 on success, < 0 on error
int dyco_waitgroup_wait(dyco_waitgroup* group, int target, int timeout);
严格来讲,每个dyco调度器只运行在一个线程上,每个线程同一时刻最多只有一个协程在运行。但是这些协程看起来像是同时发生的。因此,信号量在某些场合可能派上用场,比如我们需要控制同时活跃的连接数。详见example/semaphore.c
。
// value: initial value of the semaphore
dyco_semaphore* dyco_semaphore_create(size_t value);
void dyco_semaphore_destroy(dyco_semaphore **sem);
// return 0 on success, < 0 on error or timeout
int dyco_semaphore_wait(dyco_semaphore *sem, int timeout);
// return 0 on success, < 0 on error
int dyco_semaphore_signal(dyco_semaphore *sem);
dyco提供的socket接口可以像系统的socket接口一样调用,而不会阻塞调度循环。
如果COROUTINE_HOOK
宏被开启,系统的socket接口就会表现出dyco提供的socket相同的行为。而且,这也会改变一些高级的网络接口的行为,因为它们底层也调用了系统的socket接口。比如redis的同步接口,和gethostbyname等接口,将不会阻塞调度循环. 详见example/socket_client.c example/socket_server.c example/network.c
.
int dyco_socket(int domain, int type, int protocol);
int dyco_close(int fd);
int dyco_accept(int fd, struct sockaddr *addr, socklen_t *len);
int dyco_connect(int fd, struct sockaddr *name, socklen_t namelen);
ssize_t dyco_send(int fd, const void *buf, size_t len, int flags);
ssize_t dyco_recv(int fd, void *buf, size_t len, int flags);
ssize_t dyco_sendto(int fd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen);
ssize_t dyco_recvfrom(int fd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen);
dyco提供的SSL接口能巨大改善SSL通信的性能。但如果要使用这些接口,libssl
和libcrypto
需要在编译dyco之前预先安装。详见example/ssl_server.c example/ssl_client
。
缺少SSL部分也是可以的,dyco的核心部分不需要任何依赖。
// return 1 on success, else on error
int dyco_SSL_accept(SSL *ssl);
// return 1 on success, else on error
int dyco_SSL_connect(SSL *ssl);
// see SSL_read
int dyco_SSL_read(SSL *ssl, void *buf, int num);
// see SSL_write
int dyco_SSL_write(SSL *ssl, const void *buf, int num);
dyco也支持非对称协程,尽管在大多数情况下对称协程比非对称协程更好用。非对称协程有两个专门的API:resume
和yield
。一个非对称协程可以在主程序或者任意一个协程内部创建。需要注意的是,非对称协程不会被dyco调度器调度。要启动这个协程,你应该带上它的cid
去调用resume
。在这个协程函数里面,可以通过调用yield
来返回调用resume
的上下文。如果协程函数执行完毕,resume
会返回0。这时需要手动调用dyco_asymcoro_free
来释放这个非对称协程。
由于非对称协程不会被dyco调度器调度,因此前面提到的所有特性都不支持非对称协程。
setStack
是可选的。如果一个非对称协程需要在一个对程协程内被唤起执行,你必须确保这个非对称协程具有独立栈。
详见example/asymmetric_example.c
。
dyco也提供了非对称协程池来防止频繁地创建释放非对称协程和栈的内存空间。任何时候都可以创建非对称协程池,创建完成后可以修改协程池的大小。非对称协程池中的协程在用户函数执行完成后不会自动归还到协程池。通过调用obtain
,你可以从协程池中获得一个空闲协程。通过调用return
,你可以将一个执行完成的协程归还到协程池。
// return 1 if the coroutine is asymmetric, 0 if symmetric
int dyco_coroutine_isasymmetric(int cid);
// return the coroutine ID on success, < 0 on error
int dyco_asymcoro_create(proc_coroutine func, void *arg);
// return cid on coroutine yield, 0 on coroutine finish, < 0 on error
int dyco_asymcororesume(int cid);
// interupt and go back
void dyco_asymcoroyield();
void dyco_asymcoro_free(int cid);
// return ID of current coroutine
int dyco_asymcoro_coroID();
// stacksize: 0 to cancel independent stack
// return 1 on successfully set, 0 on successfully cancel, < 0 on error
int dyco_asymcoro_setStack(int cid, void *stackptr, size_t stacksize);
int dyco_asymcoro_getStack(int cid, void **stackptr, size_t *stacksize);
// return 0 on success
int dyco_asymcoro_setUdata(int cid, void *udata);
int dyco_asymcoro_getUdata(int cid, void **udata);
// return NULL on error
dyco_coropool* dyco_asymcpool_create(int totalsize, size_t stacksize);
dyco_coropool* dyco_asymcpool_resize(dyco_coropool* cp, int newsize);
// return 0 on success
int dyco_asymcpool_destroy(dyco_coropool** cp);
// return number of available coroutine in this pool
int dyco_asymcpool_available(dyco_coropool* cp);
// obtain a coroutine from the pool.
// If there is no free coroutine, wait timeout
// return 0 on timeout, -1 on error, > 0 on success
int dyco_asymcpool_obtain(dyco_coropool* cp, proc_coroutine func, void *arg, int timeout);
// return a finished coroutine to the pool
void dyco_asymcpool_return(int cid);
在evaluation.txt
可以查看性能评估结果。
这些测试用到的工具是NeTest和./switchtest/
下面的测试程序。
结论:dyco
的并发吞吐量略低于epoll IO多路复用,明显超过超pthread
。并且dyco
的并发吞吐量和处理延迟非常接近libco
。另外,dyco
协程切换速度远高于libco
,和NtyCo
差别不大。
有3种实现协程切换的主流方案:
- 汇编指令。它比其他两种方案快,但是对于每个平台都要写一些不同的汇编指令代码。
setjmp()
/longjmp()
。它比ucontext快。但是我个人认为这是最不合适的协程切换实现方案。setjmp()
/longjmp()
会导致栈丢失,为了避免这种情况,你不得不去设置env_buf的栈指针。这个数据结构在不同平台上截然不同,那为什么不直接用汇编?- ucontext。ucontext包含了4个平台无关的接口,因此协程切换代码会相对优雅。不幸的是ucontext已经被移除出POSIX,因为使用带有空括号的函数声明器(而不是原型格式的参数类型声明器)是一个过时的特性。ucontext在Linux目前还是可以使用的,而且不会为dyco带来问题。因此我选择了这个方案。为了支持更多的平台,dyco在用户层构建uncontext,这部分参考了
jamwt/libtask
的实现。