Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

liburing使用之拷贝文件 #64

Open
BruceChen7 opened this issue Apr 26, 2023 · 0 comments
Open

liburing使用之拷贝文件 #64

BruceChen7 opened this issue Apr 26, 2023 · 0 comments

Comments

@BruceChen7
Copy link
Owner

BruceChen7 commented Apr 26, 2023

参考资料

在拷贝之的初始化

int main(int argc, char *argv[])
{
    struct io_uring ring;
    off_t insize;
    int ret;

    if (argc < 3) {
        ...
    }

    //open files
    infd = open(argv[1], O_RDONLY);
    if (infd < 0) {
        ...
    }
    // write file
    outfd = open(argv[2], O_WRONLY | O_CREAT | O_TRUNC, 0644);
    if (outfd < 0) {
        ...
    }

    if (setup_context(QD, &ring))
        ...
    // get file size
    if (get_file_size(infd, &insize))
        ...

    // copy to ring
    ret = copy_file(&ring, insize);

    close(infd);
    close(outfd);
    io_uring_queue_exit(&ring);
    return ret;
}
  • 这里的关键是setup_context(), 用来初始化io-uring.

setup_context

  • 用来初始化queue

    static int setup_context(unsigned entries, struct io_uring *ring)
    {
        int ret;
    
        // init io_uring queue size which is 64
        ret = io_uring_queue_init(entries, ring, 0);
        if (ret < 0) {
            ...
        }
    
        return 0;
    }

拷贝文件

static int copy_file(struct io_uring *ring, off_t insize)
{
    unsigned long reads, writes;
    // complete queue
    struct io_uring_cqe *cqe;
    off_t write_left, offset;
    int ret;

    // left bytes to be written
    write_left = insize;
    writes = reads = offset = 0;
    while(insize || write_left) {
        ....
    }
    while(writes) {
        ...
    }
}
  • 看这个结构,不用多说,很自然,一边写,一边读,如果还剩余写的部分,那么剩余的部分继续写

读写文件的过程

while (insize || write_left) {
    unsigned long had_reads;
    int got_comp;

    /*
     * Queue up as many reads as we can
     */
    had_reads = reads;
    while (insize) {
        ...
    }

    if (had_reads != reads) {
        ...
    }

    /*
     * Queue is full at this point. Find at least one completion.
     */
    got_comp = 0;
    while (write_left) {
        ...
    }
}
  • reads 是指已经读的次数,writes 是指已经写的次数,前面指定了最多读写次数QD,不能超过
  • while(insize),如果还有剩余的字节数没读取完毕,那么需要读,而且尽可能的多读
  • 如果这次读和上次读相比,有增加,需要提交(if (had_reads != reads))

while(insize)

while (insize) {
    off_t this_size = insize;

    // over buffer size
    if (reads + writes >= QD)
        break;

    // 32 KB
    if (this_size > BS)
        this_size = BS;
    // is zero
    else if (!this_size)
        break;

    // queue read
    // read error
    // none zero means failed
    if (queue_read(ring, this_size, offset))
        break;

    // left read size
    insize -= this_size;
    // last read pointer
    offset += this_size;
    reads++;
}
  • 读的次数 + 写的次数 不能超过QD, 超过就不提交IO
  • 一次最多读取64KB, 如果没有文件内容可读了,那么就停止
  • 最重要的是是queue_read,会结合queue来准备提交IO
  • offset 是下次读取文件的位置

queue_read(ring, this_size, offset)

static int queue_read(struct io_uring *ring, off_t size, off_t offset)
{
    // submit queue
    struct io_uring_sqe *sqe;
    // io data
    struct io_data *data;

    data = malloc(size + sizeof(*data));
    if (!data)
        return 1;

    // get a submit entry
    sqe = io_uring_get_sqe(ring);
    if (!sqe) {
        free(data);
        return 1;
    }

    // set data ready to be read
    data->read = 1;
    // set off set
    data->offset = data->first_offset = offset;

    // set actual data pointer
    data->iov.iov_base = data + 1;
    // read size
    data->iov.iov_len = size;
    data->first_len = size;

    // prepare for read, set read buffer which is data->iov
    // offset 上次读的位置
    // 设置关联的文件描述符
    io_uring_prep_readv(sqe, infd, &data->iov, 1, offset);
    // set bussiness data pointer
    io_uring_sqe_set_data(sqe, data);
    return 0;
}
  • 首先从submit queue 中get submit entry
  • 设置该数据准备读,设置文件开始读取的偏移量
  • 设置读数据,放入的buffer的位置,注意data+1 指向的是iov位置
  • 设置entry关联的读文件描述符
  • 关联sqe对应的用户态data
struct io_data {
    int read;
    off_t first_offset, offset;
    size_t first_len;
    struct iovec iov;
};
  • 用户态data

if (had_reads != reads)

if (had_reads != reads) {
    ret = io_uring_submit(ring);
    if (ret < 0) {
        ...
        break;
    }
}
  • 提交read IO操作

while(write_left)

got_comp = 0;
while (write_left) {
    // actual data
    struct io_data *data;

    if (!got_comp) {
        // wait io complete, will write cqe
        ret = io_uring_wait_cqe(ring, &cqe);
        got_comp = 1;
    } else {
        ...
    }
    if (ret < 0) {
        ...
        return 1;
    }
    if (!cqe)
        break;

    // read from complete queue
    data = io_uring_cqe_get_data(cqe);
    // deal with error
    if (cqe->res < 0) {
        ...
    } else if ((size_t)cqe->res != data->iov.iov_len) {
        ...
    }

    /*
     * All done. if write, nothing else to do. if read,
     * queue up corresponding write.
     */
    // data has been read
    if (data->read) {
        ...
    } else {
        ...
    }
    // mark cqe has been dealed with
    io_uring_cqe_seen(ring, cqe);
}
  • 首先等待一次io读完成(!got_comp), 等待过程中,如果失败, ret < 0,直接退出程序

  • 如果等待过,那么peek 这次IO的完成情况(!got_comp对应的else分支)

    // last time complete, this time just peak
    // we don't need to wait
    ret = io_uring_peek_cqe(ring, &cqe);
    if (ret == -EAGAIN) {
        cqe = NULL;
        ret = 0;
    }
    • 这种情况需要区分是否是EAGIN的错误,如果是,那么退出while,进入整个while,进行下一轮IO

    • 如果不是,那么直接退出程序

      if (ret < 0) {
          fprintf(stderr, "io_uring_peek_cqe: %s\n",
                      strerror(-ret));
          return 1;
      }
      if (!cqe)
          break;
  • 这时确认有complete entry,那么从complete entry处来获取数据

  • 同样如果读取complete entry读取数据失败,也要区分什么样的错误类型

    • 如果是eagain的错误,那么还是要进行重新提交
    • 如果不是,那么直接程序退出.
      if (cqe->res < 0) {
          // ignore eagain error
          if (cqe->res == -EAGAIN) {
              queue_prepped(ring, data);
              io_uring_submit(ring);
              io_uring_cqe_seen(ring, cqe);
              continue;
          }
          // other error
          fprintf(stderr, "cqe failed: %s\n",
                  strerror(-cqe->res));
          return 1;
      }
  • 读取成功了,不是程序指定的大小,比指定的要小,那么需要进一步的读, cqe->res是实际读取的字符数

    • 需要注意的是,需要重新提交,并且mark这次读取已经完成(io_uring_cqe_seen)
     else if ((size_t)cqe->res != data->iov.iov_len) {
        // res less than actual size
        /* Short read/write, adjust and requeue */
        data->iov.iov_base += cqe->res;
        data->iov.iov_len -= cqe->res;
        //data actural put size
        data->offset += cqe->res;
        queue_prepped(ring, data);
        // submit again
        io_uring_submit(ring);
        io_uring_cqe_seen(ring, cqe);
        continue;
    }
  • 这时读取指定大小的数据了,那么能开始写了,同时标记这个complete entry已经被处理了

    • data->read表示这个数据是通过io_uring读提交的,否则表示是写提交的完成,直接释放掉data,完成一次writes
    • 放入到队列中写
    • 完成了一次读提交(reads--)
    • 新增一次写提交(writes++)
    • mark这次读取已经完成(io_uring_cqe_seen)
    if (data->read) {
        // need to write
        queue_write(ring, data);
        write_left -= data->first_len;
        reads--;
        writes++;
    } else {
        // free data
        free(data);
        writes--;
    }
    io_uring_cqe_seen(ring, cqe);
  • 这部分写就完成了,但不代表写一定成功,所以while(writes)

while(writes)

/* wait out pending writes */
while (writes) {
    struct io_data *data;

    ret = io_uring_wait_cqe(ring, &cqe);
    if (ret) {
        fprintf(stderr, "wait_cqe=%d\n", ret);
        return 1;
    }
    if (cqe->res < 0) {
        fprintf(stderr, "write res=%d\n", cqe->res);
        return 1;
    }
    // 从read 中获取datata
    data = io_uring_cqe_get_data(cqe);
    free(data);
    writes--;
    io_uring_cqe_seen(ring, cqe);
}

queue_write(ring, data)

static void queue_write(struct io_uring *ring, struct io_data *data)
{
    data->read = 0;
    data->offset = data->first_offset;

    data->iov.iov_base = data + 1;
    data->iov.iov_len = data->first_len;

    queue_prepped(ring, data);
    io_uring_submit(ring);
}
  • 重置data,用来开始写,将read 置为0,表示是写提交, 重置buffer
  • 关联data和ring(queue_prepped)

queue_prepped

  • 关联in_fd, out_fd, ring, buffer
static void queue_prepped(struct io_uring *ring, struct io_data *data)
{
    struct io_uring_sqe *sqe;

    // get a submit entry
    sqe = io_uring_get_sqe(ring);
    assert(sqe);

    if (data->read)
        // prepare for read, 1 means iov_len
        io_uring_prep_readv(sqe, infd, &data->iov, 1, data->offset);
    else
        io_uring_prep_writev(sqe, outfd, &data->iov, 1, data->offset);

    // set bussiness data pointer
    io_uring_sqe_set_data(sqe, data);
}

总结

在使用liburing时,

  • 需要保证每一个submit entry 都mark,(io_uring_cqe_seen)
  • 如果read 的错误是EAGAIN, 需要重新提交io 到队列中
  • 使用struct iovec 来进行IO
  • 通过下面的代码,来关联fd, buffer,submit entry
io_uring_prep_readv(sqe, infd, &data->iov, 1, data->offset);
io_uring_prep_writev(sqe, outfd, &data->iov, 1, data->offset);

#type/code #type/linux #public

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant