25
2025
06
19:00:03

io_uring 实例教程:第2部分—— 批量请求排队

io_uring By Example: Part 2 – Queuing multiple requests

本文是 io_uring 系列文章的一部分

  • 系列介绍
  • 第一部分:io_uring 简介。在这篇文章中,我们基于原生 io_uring 接口创建了 cat_uring,并基于更高级的 liburing 构建了 cat_liburing
  • 第二部分:即本文。
  • 第三部分:使用 io_uring 编写的一个 Web 服务器。

在第一部分中,我们了解了如何分别使用原生 io_uring 接口和 liburing 提供的高级接口来构建与 Unix cat 工具等效的程序。然而,在这两个示例中,我们都没有同时对多个请求进行排队。io_uring 的一个主要目标是让用户能够一次性对多个操作进行排队,从而减少所需的系统调用数量,这样内核就可以一次性处理这些操作,而程序无需为每个 I/O 请求都执行一个或多个系统调用。

为此,在这一部分中,我们将构建一个用于复制文件的程序。该程序会尽可能高效地工作,它会在队列深度允许的范围内对尽可能多的请求进行排队。让我们来看一些代码。需要说明的是,这个程序很大程度上借鉴了 fio 软件包中的一个程序。

https://github.com/axboe/fio/blob/master/t/io_uring.c


#include <stdio.h>
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <liburing.h>

#define QD  2
#define BS (16 * 1024)

static int infd, outfd;

struct io_data {
    int read;
    off_t first_offset, offset;
    size_t first_len;
    struct iovec iov;
};

static int setup_context(unsigned entries, struct io_uring *ring) {
    int ret;

    ret = io_uring_queue_init(entries, ring, 0);
    if( ret < 0) {
        fprintf(stderr, "queue_init: %s\n", strerror(-ret));
        return -1;
    }

    return 0;
}

static int get_file_size(int fd, off_t *size) {
    struct stat st;

    if (fstat(fd, &st) < 0 )
        return -1;
    if(S_ISREG(st.st_mode)) {
        *size = st.st_size;
        return 0;
    } elseif (S_ISBLK(st.st_mode)) {
        unsigned long long bytes;

        if (ioctl(fd, BLKGETSIZE64, &bytes) != 0)
            return -1;

        *size = bytes;
        return 0;
    }
    return -1;
}

static void queue_prepped(struct io_uring *ring, struct io_data *data) {
    struct io_uring_sqe *sqe;

    sqe = io_uring_get_sqe(ring);
    assert(sqe);

    if (data->read)
        io_uring_prep_readv(sqe, infd, &data->iov, 1, data->offset);
    else
        io_uring_prep_writev(sqe, outfd, &data->iov, 1, data->offset);

    io_uring_sqe_set_data(sqe, data);
}

static int queue_read(struct io_uring *ring, off_t size, off_t offset) {
    struct io_uring_sqe *sqe;
    struct io_data *data;

    data = malloc(size + sizeof(*data));
    if (!data)
        return 1;

    sqe = io_uring_get_sqe(ring);
    if (!sqe) {
        free(data);
        return 1;
    }

    data->read = 1;
    data->offset = data->first_offset = offset;

    data->iov.iov_base = data + 1;
    data->iov.iov_len = size;
    data->first_len = size;

    io_uring_prep_readv(sqe, infd, &data->iov, 1, offset);
    io_uring_sqe_set_data(sqe, data);
    return 0;
}

static void queue_write(struct io_uring *ring, struct io_data *data) {
    data->read = 0;
    data->offset = data->first_offset;

    data->iov.iov_base = data + 1;
    data->iov.iov_len = data->first_len;

    queue_prepped(ring, data);
    io_uring_submit(ring);
}

int copy_file(struct io_uring *ring, off_t insize) {
    unsigned long reads, writes;
    struct io_uring_cqe *cqe;
    off_t write_left, offset;
    int ret;

    write_left = insize;
    writes = reads = offset = 0;

    while (insize || write_left) {
        int had_reads, got_comp;

        /* 尽可能多地将读取请求排入队列 */
        had_reads = reads;
        while (insize) {
            off_t this_size = insize;

            if (reads + writes >= QD)
                break;
            if (this_size > BS)
                this_size = BS;
            elseif (!this_size)
                break;

            if (queue_read(ring, this_size, offset))
                break;

            insize -= this_size;
            offset += this_size;
            reads++;
        }

        if (had_reads != reads) {
            ret = io_uring_submit(ring);
            if (ret < 0) {
                fprintf(stderr, "io_uring_submit: %s\n", strerror(-ret));
                break;
            }
        }

        /* 此时队列已满。让我们至少找出一个已完成的任务 */
        got_comp = 0;
        while (write_left) {
            struct io_data *data;

            if (!got_comp) {
                ret = io_uring_wait_cqe(ring, &cqe);
                got_comp = 1;
            } else {
                ret = io_uring_peek_cqe(ring, &cqe);
                if (ret == -EAGAIN) {
                    cqe = NULL;
                    ret = 0;
                }
            }
            if (ret < 0) {
                fprintf(stderr, "io_uring_peek_cqe: %s\n",
                        strerror(-ret));
                return 1;
            }
            if (!cqe)
                break;

            data = io_uring_cqe_get_data(cqe);
            if (cqe->res < 0) {
                if (cqe->res == -EAGAIN) {
                    queue_prepped(ring, data);
                    io_uring_cqe_seen(ring, cqe);
                    continue;
                }
                fprintf(stderr, "cqe failed: %s\n",
                        strerror(-cqe->res));
                return 1;
            } elseif (cqe->res != data->iov.iov_len) {
                /* short read/write; adjust and requeue */
                data->iov.iov_base += cqe->res;
                data->iov.iov_len -= cqe->res;
                queue_prepped(ring, data);
                io_uring_cqe_seen(ring, cqe);
                continue;
            }
            
            /*
             * 所有操作已完成。如果是写操作,那么无需再做其他事情。如果是读操作,
             * 则将对应的写操作排入队列。
             * */
            if (data->read) {
                queue_write(ring, data);
                write_left -= data->first_len;
                reads--;
                writes++;
            } else {
                free(data);
                writes--;
            }
            io_uring_cqe_seen(ring, cqe);
        }
    }

    return 0;
}

int main(int argc, char *argv[]) {
    struct io_uring ring;
    off_t insize;
    int ret;

    if (argc < 3) {
        printf("Usage: %s <infile> <outfile>\n", argv[0]);
        return 1;
    }

    infd = open(argv[1], O_RDONLY);
    if (infd < 0) {
        perror("open infile");
        return 1;
    }

    outfd = open(argv[2], O_WRONLY | O_CREAT | O_TRUNC, 0644);
    if (outfd < 0) {
        perror("open outfile");
        return 1;
    }

    if (setup_context(QD, &ring))
        return 1;

    if (get_file_size(infd, &insize))
        return 1;

    ret = copy_file(&ring, insize);

    close(infd);
    close(outfd);
    io_uring_queue_exit(&ring);
    return ret;
}


程序结构

这个复制程序和大多数同类程序一样,会将第一个参数所指向的文件复制到第二个参数所指向的文件中。该程序的核心是 copy_file() 函数。在这里,我们设置了一个外层 while 循环,该循环内部嵌套了两个处于同一层级的 while 循环。外层 while 循环的作用是确保源文件中的所有字节都被复制,而第一个嵌套的 while 循环则负责尽可能多地创建 readv() 请求。实际上,它会在队列深度允许的范围内对尽可能多的请求进行排队。

一旦队列满了,我们就进入第二个嵌套的 while 循环。这个循环会处理完成队列中的条目,并在数据读取完成后提交向目标文件写入数据的请求。有几个变量用于跟踪状态,这可能会让人有点困惑。但一个异步文件复制程序能有多难呢?

下一步计划

既然我们已经了解了如何使用 io_uring 一次性处理多个请求,那么让我们将其与一些网络编程结合起来。在本系列的下一部分中,我们将从头开始构建一个简单的 Web 服务器,该服务器将完全使用 io_uring 来处理所有 I/O 操作。

源代码

所有示例的完整源代码可在 Github 上获取。

示例的完整源代码: https://github.com/shuveb/io_uring-by-example

关于作者

我叫 Shuveb Hussain,是这个专注于 Linux 的博客的作者。你可以在 Twitter 上关注我,我会发布与技术相关的内容,主要聚焦于 Linux、性能、可扩展性和云技术。

Src

https://unixism.net/2020/04/io-uring-by-example-part-2-queuing-multiple-requests/




推荐本站淘宝优惠价购买喜欢的宝贝:

image.png

本文链接:https://www.hqyman.cn/post/11895.html 非本站原创文章欢迎转载,原创文章需保留本站地址!

分享到:
打赏





休息一下~~


« 上一篇 下一篇 »

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

请先 登录 再评论,若不是会员请先 注册

您的IP地址是: