You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
intmain(intargc, char*argv[])
{
structio_uringring;
off_tinsize;
intret;
if (argc<3) {
...
}
//open filesinfd=open(argv[1], O_RDONLY);
if (infd<0) {
...
}
// write fileoutfd=open(argv[2], O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (outfd<0) {
...
}
if (setup_context(QD, &ring))
...
// get file sizeif (get_file_size(infd, &insize))
...
// copy to ringret=copy_file(&ring, insize);
close(infd);
close(outfd);
io_uring_queue_exit(&ring);
returnret;
}
这里的关键是setup_context(), 用来初始化io-uring.
setup_context
用来初始化queue
staticintsetup_context(unsignedentries, structio_uring*ring)
{
intret;
// init io_uring queue size which is 64ret=io_uring_queue_init(entries, ring, 0);
if (ret<0) {
...
}
return0;
}
拷贝文件
staticintcopy_file(structio_uring*ring, off_tinsize)
{
unsigned longreads, writes;
// complete queuestructio_uring_cqe*cqe;
off_twrite_left, offset;
intret;
// left bytes to be writtenwrite_left=insize;
writes=reads=offset=0;
while(insize||write_left) {
....
}
while(writes) {
...
}
}
看这个结构,不用多说,很自然,一边写,一边读,如果还剩余写的部分,那么剩余的部分继续写
读写文件的过程
while (insize||write_left) {
unsigned longhad_reads;
intgot_comp;
/* * Queue up as many reads as we can */had_reads=reads;
while (insize) {
...
}
if (had_reads!=reads) {
...
}
/* * Queue is full at this point. Find at least one completion. */got_comp=0;
while (write_left) {
...
}
}
reads 是指已经读的次数,writes 是指已经写的次数,前面指定了最多读写次数QD,不能超过
while(insize),如果还有剩余的字节数没读取完毕,那么需要读,而且尽可能的多读
如果这次读和上次读相比,有增加,需要提交(if (had_reads != reads))
while(insize)
while (insize) {
off_tthis_size=insize;
// over buffer sizeif (reads+writes >= QD)
break;
// 32 KBif (this_size>BS)
this_size=BS;
// is zeroelseif (!this_size)
break;
// queue read// read error// none zero means failedif (queue_read(ring, this_size, offset))
break;
// left read sizeinsize-=this_size;
// last read pointeroffset+=this_size;
reads++;
}
读的次数 + 写的次数 不能超过QD, 超过就不提交IO
一次最多读取64KB, 如果没有文件内容可读了,那么就停止
最重要的是是queue_read,会结合queue来准备提交IO
offset 是下次读取文件的位置
queue_read(ring, this_size, offset)
staticintqueue_read(structio_uring*ring, off_tsize, off_toffset)
{
// submit queuestructio_uring_sqe*sqe;
// io datastructio_data*data;
data=malloc(size+sizeof(*data));
if (!data)
return1;
// get a submit entrysqe=io_uring_get_sqe(ring);
if (!sqe) {
free(data);
return1;
}
// set data ready to be readdata->read=1;
// set off setdata->offset=data->first_offset=offset;
// set actual data pointerdata->iov.iov_base=data+1;
// read sizedata->iov.iov_len=size;
data->first_len=size;
// prepare for read, set read buffer which is data->iov// offset 上次读的位置// 设置关联的文件描述符io_uring_prep_readv(sqe, infd, &data->iov, 1, offset);
// set bussiness data pointerio_uring_sqe_set_data(sqe, data);
return0;
}
if (had_reads!=reads) {
ret=io_uring_submit(ring);
if (ret<0) {
...
break;
}
}
提交read IO操作
while(write_left)
got_comp=0;
while (write_left) {
// actual datastructio_data*data;
if (!got_comp) {
// wait io complete, will write cqeret=io_uring_wait_cqe(ring, &cqe);
got_comp=1;
} else {
...
}
if (ret<0) {
...
return1;
}
if (!cqe)
break;
// read from complete queuedata=io_uring_cqe_get_data(cqe);
// deal with errorif (cqe->res<0) {
...
} elseif ((size_t)cqe->res!=data->iov.iov_len) {
...
}
/* * All done. if write, nothing else to do. if read, * queue up corresponding write. */// data has been readif (data->read) {
...
} else {
...
}
// mark cqe has been dealed withio_uring_cqe_seen(ring, cqe);
}
首先等待一次io读完成(!got_comp), 等待过程中,如果失败, ret < 0,直接退出程序
如果等待过,那么peek 这次IO的完成情况(!got_comp对应的else分支)
// last time complete, this time just peak// we don't need to waitret=io_uring_peek_cqe(ring, &cqe);
if (ret==-EAGAIN) {
cqe=NULL;
ret=0;
}
elseif ((size_t)cqe->res!=data->iov.iov_len) {
// res less than actual size/* Short read/write, adjust and requeue */data->iov.iov_base+=cqe->res;
data->iov.iov_len-=cqe->res;
//data actural put sizedata->offset+=cqe->res;
queue_prepped(ring, data);
// submit againio_uring_submit(ring);
io_uring_cqe_seen(ring, cqe);
continue;
}
staticvoidqueue_prepped(structio_uring*ring, structio_data*data)
{
structio_uring_sqe*sqe;
// get a submit entrysqe=io_uring_get_sqe(ring);
assert(sqe);
if (data->read)
// prepare for read, 1 means iov_lenio_uring_prep_readv(sqe, infd, &data->iov, 1, data->offset);
elseio_uring_prep_writev(sqe, outfd, &data->iov, 1, data->offset);
// set bussiness data pointerio_uring_sqe_set_data(sqe, data);
}
参考资料
在拷贝之的初始化
setup_context
用来初始化queue
拷贝文件
读写文件的过程
while(insize)
queue_read(ring, this_size, offset)
if (had_reads != reads)
while(write_left)
首先等待一次io读完成(!got_comp), 等待过程中,如果失败, ret < 0,直接退出程序
如果等待过,那么peek 这次IO的完成情况(!got_comp对应的else分支)
这种情况需要区分是否是EAGIN的错误,如果是,那么退出while,进入整个while,进行下一轮IO
如果不是,那么直接退出程序
这时确认有complete entry,那么从complete entry处来获取数据
同样如果读取complete entry读取数据失败,也要区分什么样的错误类型
读取成功了,不是程序指定的大小,比指定的要小,那么需要进一步的读, cqe->res是实际读取的字符数
这时读取指定大小的数据了,那么能开始写了,同时标记这个complete entry已经被处理了
这部分写就完成了,但不代表写一定成功,所以while(writes)
while(writes)
queue_write(ring, data)
queue_prepped
总结
在使用liburing时,
#type/code #type/linux #public
The text was updated successfully, but these errors were encountered: