25
2025
06
18:59:21

io_uring 实例教程:第1部分——简介

io_uring by example: Part 1 – Introduction

本文是 io_uring 系列文章的一部分。

系列文章简介

  • 第一部分:即本文。
  • 第二部分:排队执行多个操作:我们将开发一个文件复制程序 cp_liburing,它会利用 io_uring 处理多个请求。
  • 第三部分:使用 io_uring 编写的 Web 服务器。

简介

仔细想想,输入/输出(I/O)和计算是计算机真正要做的仅有的两件事。在 Linux 系统中,进行计算时,你可以在进程和线程之间做出选择。而在 I/O 方面,Linux 同时支持同步 I/O(也称为阻塞 I/O)和异步 I/O。尽管异步 I/O(aio 系列系统调用)已经在 Linux 中存在一段时间了,但它们仅适用于直接 I/O,而不适用于缓冲 I/O。对于以缓冲模式打开的文件,aio 的行为就像常规的阻塞系统调用一样。这是一个不太容易处理的限制。此外,Linux 当前的 aio 接口存在大量的系统调用开销。

考虑到开发一个能提供高性能异步 I/O 的 Linux 子系统难度很大,因为这个项目会非常复杂,所以围绕 io_uring 的热议肯定是有道理的。io_uring 不仅提供了一个优雅的内核/用户空间接口,还通过一种特殊的轮询模式,让数据在用户空间和内核空间之间传输时完全避免系统调用,从而实现了卓越的性能。

不过,对于普通开发者来说,异步编程则是另一回事。如果你用 C 这样的底层语言处理过线程和基于 select/poll/epoll 的异步编程,你就会明白我这么说的意思。与使用线程相比,我们并不擅长进行异步思维。线程有一个 “从这里开始”, “执行 1-2-3 件事”, “在这里结束” 的执行流程。虽然操作系统会多次阻塞和解除阻塞线程,但这种情况对程序员是隐藏的,所以这是一个相对容易理解和根据需求采用的思维模型。但这并不意味着异步编程非常难。它通常是你程序中的最底层。一旦你编写了一个抽象层来处理这个问题,你就可以舒适地专注于实现应用程序的主要功能,也就是你的用户最关心的那些功能。

说到抽象,io_uring 确实提供了一个更高级的库 liburing,它实现并隐藏了 io_uring 所需的许多样板代码,同时为你提供了一个更简单的接口来使用。但如果不先了解 io_uring 在底层是如何工作的,直接使用 liburing 又有什么乐趣呢?了解了底层原理后,你可能还是会使用 liburing,但你会知道其中的特殊情况,并且能更好地理解其内部的工作原理。这是一件好事。为此,我们将使用 liburing 构建大部分示例,但也会使用底层接口构建一些示例。

常规的 cat 命令实现

让我们使用 readv() 系统调用以同步(阻塞)方式构建一个简单的 cat 命令等效程序。这将让你熟悉 readv(),它是支持分散/聚集 I/O(也称为向量 I/O)的系统调用之一。如果你已经熟悉 readv() 的工作方式,可以跳过这一部分。

read() 和 write() 函数的参数是文件描述符、缓冲区及其长度,而 readv() 和 writev() 的参数是文件描述符、指向 iovec 结构体数组的指针,以及表示该数组长度的参数。现在让我们来看看 iovec 结构体:

struct iovec {
     void  *iov_base;    /* 起始地址 */
     size_t iov_len;     /* 要传输的字节数 */
};

每个 iovec 结构体只是指向一个缓冲区,包含一个基地址和一个长度。

你可能会问,与常规的 read() 和 write() 相比,使用向量 I/O 或分散/聚集 I/O 有什么好处呢?答案是使用 readv() 和 writev() 更加自然。例如,使用 readv() 时,你可以填充结构体的多个成员,而无需复制缓冲区或多次调用 read(),这两种操作的效率都相对较低。同样的优势也适用于 writev()。此外,这些调用是原子操作,而多次调用 read() 和 write() 则不是,如果你出于某些原因需要保证操作的原子性,这一点就很重要。

虽然 cat 命令通常用于将文件内容打印到控制台,但它实际上会将作为参数传入的文件内容连接(即合并)并打印出来。在我们的 cat 示例中,我们将使用 readv() 从文件中读取数据并打印到控制台。我们将逐块读取文件,每个块由一个 iovec 结构体指向。readv() 是阻塞调用,当它返回时(假设没有错误),iovec 结构体指向一组包含文件数据的缓冲区。然后我们将这些数据打印到控制台。就是这么简单。

#include <stdio.h>
#include <sys/uio.h>
#include <sys/stat.h>
#include <linux/fs.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <stdlib.h>

#define BLOCK_SZ    4096

/*
 * 返回传入的已打开文件描述符所对应文件的大小。
 * 也能正确处理普通文件和块设备。相当实用。
 * */ 

off_t get_file_size(int fd) {
    struct stat st;

    if(fstat(fd, &st) < 0) {
        perror("fstat");
        return -1;
    }
    if (S_ISBLK(st.st_mode)) {
        unsigned long long bytes;
        if (ioctl(fd, BLKGETSIZE64, &bytes) != 0) {
            perror("ioctl");
            return -1;
        }
        return bytes;
    } elseif (S_ISREG(st.st_mode))
        return st.st_size;

    return -1;
}

/*
 * 向标准输出打印长度为 `len` 的字符串。
 * 为提高效率,我们在此处使用缓冲输出,
 * 因为我们需要逐个字符地输出。
 * */
void output_to_console(char *buf, int len) {
    while (len--) {
        fputc(*buf++, stdout);
    }
}

int read_and_print_file(char *file_name) {
    struct iovec *iovecs;
    int file_fd = open(file_name, O_RDONLY);
    if (file_fd < 0) {
        perror("open");
        return 1;
    }

    off_t file_sz = get_file_size(file_fd);
    off_t bytes_remaining = file_sz;
    int blocks = (int) file_sz / BLOCK_SZ;
    if (file_sz % BLOCK_SZ) blocks++;
    iovecs = malloc(sizeof(struct iovec) * blocks);

    int current_block = 0;

    /*
     * 对于我们正在读取的文件,分配足够的块来存储文件数据。
     * 每个块由一个 `iovec` 结构体描述,该结构体作为 `iovec` 数组的一部分传递给 `readv` 函数。
     * */
    while (bytes_remaining) {
        off_t bytes_to_read = bytes_remaining;
        if (bytes_to_read > BLOCK_SZ)
            bytes_to_read = BLOCK_SZ;


        void *buf;
        if( posix_memalign(&buf, BLOCK_SZ, BLOCK_SZ)) {
            perror("posix_memalign");
            return 1;
        }
        iovecs[current_block].iov_base = buf;
        iovecs[current_block].iov_len = bytes_to_read;
        current_block++;
        bytes_remaining -= bytes_to_read;
    }

    /*
     * `readv()` 调用会一直阻塞,直到所有 `iovec` 缓冲区都被文件数据填满。
     * 一旦该调用返回,我们就应该能够从 `iovec` 中获取文件数据,并将其输出到控制台。
     * */ 
    int ret = readv(file_fd, iovecs, blocks);
    if (ret < 0) {
        perror("readv");
        return 1;
    }

    for (int i = 0; i < blocks; i++)
        output_to_console(iovecs[i].iov_base, iovecs[i].iov_len);

    return 0;
}

int main(int argc, char *argv[]) {
    if (argc < 2) {
        fprintf(stderr, "Usage: %s <filename1> [<filename2> ...]\n",
                argv[0]);
        return 1;
    }

    /*
     * 对于作为参数传入的每个文件,调用 `read_and_print_file()` 函数。
     * */
    for (int i = 1; i < argc; i++) {
        if(read_and_print_file(argv[i])) {
            fprintf(stderr, "Error reading file\n");
            return 1;
        }
    }

    return 0;
}

这是一个足够简单的程序。我们现在讨论这个程序,以便接下来能将其与使用 io_uring 的方法进行对比。该程序的核心是一个循环,它通过先确定要读取文件的数据大小,来计算存储该文件数据所需的块数。程序会为所有所需的 iovec 结构分配内存。我们循环的次数等于文件大小对应的块数,分配块大小的内存来存储实际数据,最后调用 readv() 函数读取数据。正如我们之前讨论的,这里的 readv() 是同步的。这意味着它会阻塞,直到完成所请求的操作。当它返回时,我们分配的、由 iovec 结构指向的内存块就会被文件数据填满。然后,我们通过调用 output_to_console() 函数将文件数据打印到控制台。

使用 io_uring 进行文件拼接

现在,让我们编写一个功能等效的程序,使用 io_uring 来拼接文件。我们在 io_uring 中要使用的操作是 readv

io_uring 接口

io_uring 的接口很简单。有一个提交队列和一个完成队列。在提交队列中,你提交想要执行的各种操作的信息。例如,对于我们当前的程序,我们想用 readv() 读取文件,所以我们将描述该操作的提交队列请求作为提交队列条目(SQE:submission queue entry)的一部分放入队列。由于它是一个队列,你可以放入许多请求,数量最多可达队列深度(你可以定义该深度)所允许的范围。这些操作可以是读、写等操作的混合。然后,我们调用 io_uring_enter() 系统调用,告诉内核我们已经向提交队列添加了请求。内核随后会进行处理,一旦处理完这些请求,它会将结果作为完成队列条目(CQE:completion queue entry)的一部分放入完成队列,每个对应的 SQE 都有一个 CQE。这些 CQEs 可以从用户空间访问。

敏锐的读者可能已经注意到,这种用多个 I/O 请求填充队列,然后进行一次系统调用,而不是为每个 I/O 请求进行一次系统调用的接口,效率更高。为了进一步提高效率,io_uring 支持一种模式,在这种模式下,内核会轮询你添加到提交队列的条目,你甚至无需调用 io_uring_enter() 来告知内核有新的提交队列条目。另一个需要注意的点是,在发现了幽灵(Spectre)和熔断(Meltdown)硬件漏洞,操作系统为此创建了变通方法之后,系统调用的开销比以往任何时候都大。因此,对于高性能应用程序来说,减少系统调用的数量确实是一件大事。

Spectre and Meltdown

https://meltdownattack.com/

在进行上述任何操作之前,你需要设置队列,这些队列实际上是具有一定深度/长度的环形缓冲区。你调用 io_uring_setup() 系统调用完成此设置。我们通过向环形缓冲区添加提交队列条目,并从完成队列环形缓冲区读取完成队列条目来完成实际工作。这就是 io_uring 接口的设计概述。

完成队列条目

现在我们对其工作原理有了一个大致的概念,让我们更详细地了解一下具体实现方式。与提交队列条目(SQE)相比,完成队列条目(CQE:Completion Queue Entry)非常简单。所以,让我们先来看一下它。SQE 是一个结构体,你使用它来提交请求,并将其添加到提交环形缓冲区。CQE 是一个结构体实例,对于添加到提交队列的每个 SQE 结构体实例,内核都会用 CQE 进行响应。它包含了你通过 SQE 实例请求的操作的结果。

struct io_uring_cqe {
    __u64  user_data;  /* sqe->user_data 提交时传入的数据原样返回 */
    __s32  res;    /* 此事件的结果代码 */
    __u32  flags;
};

正如代码注释中所提到的,user_data 字段是从 SQE 原样传递到 CQE 实例的内容。假设你在提交队列中提交了一堆请求,这些请求不一定会按提交的顺序完成并到达完成队列。例如,考虑以下场景:你的机器上有两块磁盘:一块是转速较慢的硬盘,另一块是超高速的固态硬盘(SSD)。你在提交队列中提交了两个请求。第一个请求是从转速较慢的硬盘读取一个 100kB 的文件,第二个请求是从更快的 SSD 读取一个相同大小的文件。如果要保持顺序,即使可以预期 SSD 上的文件数据会更快到达,内核是否应该等待硬盘上的文件数据可用呢?这不是个好主意,因为这会阻碍我们尽可能快地运行。所以,CQEs 可以按可用的任意顺序到达。哪个操作先完成,就会立即提供对应的 CQE。由于 CQEs 到达时没有指定顺序,既然你现在已经从上面的 struct io_uring_cqe 了解了 CQE 的样子,那么如何确定某个特定的 CQE 对应哪个 SQE 请求呢?一种方法是使用 user_data 字段来识别。不一定是设置一个唯一的 ID 之类的,通常你会传递一个指针。如果这让人困惑,等会儿看到一个清晰的例子就明白了。

完成队列条目很简单,因为它主要关注系统调用的返回值,该返回值存储在其 res 字段中。例如,如果你排队执行一个读操作,成功完成后,res 字段将包含读取的字节数。如果发生错误,它将包含 -errno,本质上就是 read() 系统调用本身会返回的值。

顺序控制

虽然我提到完成队列条目(CQE)可以以任意顺序到达,但你可以通过提交队列条目(SQE)的顺序控制来强制某些操作按特定顺序执行,实际上就是将它们串联起来。在本系列文章中,我不会讨论顺序控制的问题,但你可以查阅当前权威的 io_uring 参考文档,了解如何实现这一点。

io_uring 参考文档

https://kernel.dk/io_uring.pdf

提交队列条目

提交队列条目比完成队列条目要复杂一些,因为它需要足够通用,以便能够表示和处理目前 Linux 系统中各种可能的 I/O 操作。

struct io_uring_sqe {
    __u8  opcode;    /* 此 SQE 的操作类型 */
    __u8  flags;    /* IOSQE_ 标志 */
    __u16  ioprio;    /* 请求的 I/O 优先级 */
    __s32  fd;    /* 要进行 I/O 操作的文件描述符 */
    __u64  off;    /* 文件偏移量 */
    __u64  addr;    /* 指向缓冲区或 iovec 数组的指针 */
    __u32  len;    /* 缓冲区大小或 iovec 数量 */
    union {
      __kernel_rwf_t  rw_flags;
      __u32    fsync_flags;
      __u16    poll_events;
      __u32    sync_range_flags;
      __u32    msg_flags;
    };
    __u64  user_data;  /* 操作完成时要传回的数据 */
    union {
      __u16  buf_index;  /* 如果使用固定缓冲区,这里是其索引 */
      __u64  __pad2[3];
    };
};

我知道这个结构体看起来内容很多。实际上,常用的字段只有几个,通过一个简单的例子(比如我们正在处理的 cat 命令示例)就能很容易解释清楚。你想使用 readv() 系统调用读取一个文件。

  • opcode 用于指定操作类型,在我们的例子中,使用 IORING_OP_READV 常量表示 readv() 操作。
  • fd 用于指定我们要读取的文件,这里要填入该文件的打开文件描述符。
  • addr 用于指向 iovec 结构体数组,这些结构体保存了我们为 I/O 操作分配的缓冲区的地址和长度。
  • 最后,len 用于保存 iovec 数组的长度。

现在看来也不是太难,对吧?你填充这些值,让 io_uring 知道要执行什么操作。你可以将多个 SQE 排入队列,最后当你希望内核开始处理你的请求时,调用 io_uring_enter() 函数。

io_uring 版的 cat 命令实现

让我们来看看在使用 io_uring 的 cat 程序中,具体如何实现上述操作。


#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <sys/syscall.h>
#include <sys/mman.h>
#include <sys/uio.h>
#include <linux/fs.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>

/* 如果因为缺少下面的头文件而导致编译失败,
 * 那么你的内核可能太旧,不支持 io_uring。
 * */
#include <linux/io_uring.h>

#define QUEUE_DEPTH 1
#define BLOCK_SZ    1024

/* This is x86 specific */
#define read_barrier()  __asm__ __volatile__("":::"memory")
#define write_barrier() __asm__ __volatile__("":::"memory")

struct app_io_sq_ring {
    unsigned *head;
    unsigned *tail;
    unsigned *ring_mask;
    unsigned *ring_entries;
    unsigned *flags;
    unsigned *array;
};

struct app_io_cq_ring {
    unsigned *head;
    unsigned *tail;
    unsigned *ring_mask;
    unsigned *ring_entries;
    struct io_uring_cqe *cqes;
};

struct submitter {
    int ring_fd;
    struct app_io_sq_ring sq_ring;
    struct io_uring_sqe *sqes;
    struct app_io_cq_ring cq_ring;
};

struct file_info {
    off_t file_sz;
    struct iovec iovecs[];      /* Referred by readv/writev */
};

/*
 * 这段代码编写于与 io_uring 相关的系统调用尚未被纳入标准 C 库的时期。
 * 因此,我们自行编写了系统调用封装函数。
 * */

int io_uring_setup(unsigned entries, struct io_uring_params *p)
{
    return (int) syscall(__NR_io_uring_setup, entries, p);
}

int io_uring_enter(int ring_fd, unsigned int to_submit,
                          unsigned int min_complete, unsigned int flags)
{
    return (int) syscall(__NR_io_uring_enter, ring_fd, to_submit, min_complete,
                   flags, NULL, 0);
}

/*
 * 返回传入其已打开文件描述符的文件的大小。
 * 还能妥善处理普通文件和块设备,相当实用。
 * */

off_t get_file_size(int fd) {
    struct stat st;

    if(fstat(fd, &st) < 0) {
        perror("fstat");
        return -1;
    }
    if (S_ISBLK(st.st_mode)) {
        unsigned long long bytes;
        if (ioctl(fd, BLKGETSIZE64, &bytes) != 0) {
            perror("ioctl");
            return -1;
        }
        return bytes;
    } elseif (S_ISREG(st.st_mode))
        return st.st_size;

    return -1;
}

/*
 * io_uring 需要进行大量的设置工作,这些设置看起来相当复杂,但并非难以理解。由于存在大量这样的样板代码,io_uring 的开发者创建了 liburing 库,它相对易于使用。
 * 不过,你应该花些时间来理解这段代码。了解其底层的工作原理总是有益的。除了能让你有吹嘘的资本外,它还能带给你一种独特的极客式的平静感。
 * */ 

int app_setup_uring(struct submitter *s) {
    struct app_io_sq_ring *sring = &s->sq_ring;
    struct app_io_cq_ring *cring = &s->cq_ring;
    struct io_uring_params p;
    void *sq_ptr, *cq_ptr;

    /*
     * 我们需要将已清零的 io_uring_params 结构体传递给 io_uring_setup() 调用。如果有需要,我们可以设置任何标志,但在这个示例中,我们不设置。
     * */
    memset(&p, 0, sizeof(p));
    s->ring_fd = io_uring_setup(QUEUE_DEPTH, &p);
    if (s->ring_fd < 0) {
        perror("io_uring_setup");
        return 1;
    }

    /*
     * io_uring 的通信是通过两个用户空间与内核空间共享的环形缓冲区来实现的。在较新的内核中,这两个缓冲区可以通过一次 `mmap()` 调用进行联合映射。
     * 虽然完成队列(completion queue)可以直接操作,但提交队列(submission queue)中间有一个间接数组。我们也会对这个间接数组进行映射。
     * */ 

    int sring_sz = p.sq_off.array + p.sq_entries * sizeof(unsigned);
    int cring_sz = p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe);

    /* 在 Linux 内核 5.4 及更高版本中,可以通过一次 `mmap()` 调用对提交缓冲区和完成缓冲区进行映射。与其检查内核版本,推荐的做法是检查 `io_uring_params` 结构体的 `features` 字段,该字段是一个位掩码。如果设置了 `IORING_FEAT_SINGLE_MMAP` 标志位,那么我们就无需再进行第二次 `mmap()` 调用来映射完成环了。
     * */
    if (p.features & IORING_FEAT_SINGLE_MMAP) {
        if (cring_sz > sring_sz) {
            sring_sz = cring_sz;
        }
        cring_sz = sring_sz;
    }

    /* 对提交队列和完成队列的环形缓冲区进行内存映射。
     * 不过,较旧版本的内核只会对提交队列进行内存映射。
     * */
    sq_ptr = mmap(0, sring_sz, PROT_READ | PROT_WRITE, 
            MAP_SHARED | MAP_POPULATE,
            s->ring_fd, IORING_OFF_SQ_RING);
    if (sq_ptr == MAP_FAILED) {
        perror("mmap");
        return 1;
    }

    if (p.features & IORING_FEAT_SINGLE_MMAP) {
        cq_ptr = sq_ptr;
    } else {
        /* 在较旧版本的内核中,单独对完成队列的环形缓冲区进行内存映射 */
        cq_ptr = mmap(0, cring_sz, PROT_READ | PROT_WRITE, 
                MAP_SHARED | MAP_POPULATE,
                s->ring_fd, IORING_OFF_CQ_RING);
        if (cq_ptr == MAP_FAILED) {
            perror("mmap");
            return 1;
        }
    }

    /* 将有用的字段保存到全局的 `app_io_sq_ring` 结构体中,以便后续方便引用 */
    sring->head = sq_ptr + p.sq_off.head;
    sring->tail = sq_ptr + p.sq_off.tail;
    sring->ring_mask = sq_ptr + p.sq_off.ring_mask;
    sring->ring_entries = sq_ptr + p.sq_off.ring_entries;
    sring->flags = sq_ptr + p.sq_off.flags;
    sring->array = sq_ptr + p.sq_off.array;

    /* 对提交队列条目数组进行内存映射 */
    s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
            PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
            s->ring_fd, IORING_OFF_SQES);
    if (s->sqes == MAP_FAILED) {
        perror("mmap");
        return 1;
    }

    /* 将有用的字段保存到全局的 `app_io_cq_ring` 结构体中,以便后续方便引用 */
    cring->head = cq_ptr + p.cq_off.head;
    cring->tail = cq_ptr + p.cq_off.tail;
    cring->ring_mask = cq_ptr + p.cq_off.ring_mask;
    cring->ring_entries = cq_ptr + p.cq_off.ring_entries;
    cring->cqes = cq_ptr + p.cq_off.cqes;

    return 0;
}

/*
 * 向标准输出打印长度为 `len` 的字符串。
 * 为了提高效率,我们在这里使用缓冲输出,
 * 因为我们需要逐个字符进行输出。
 */
void output_to_console(char *buf, int len) {
    while (len--) {
        fputc(*buf++, stdout);
    }
}

/*
 * 从完成队列中读取数据。
 * 在这个函数中,我们从完成队列读取完成事件,获取包含文件数据的数据缓冲区,
 * 并将其打印到控制台。
 */
void read_from_cq(struct submitter *s) {
    struct file_info *fi;
    struct app_io_cq_ring *cring = &s->cq_ring;
    struct io_uring_cqe *cqe;
    unsigned head, reaped = 0;

    head = *cring->head;

    do {
        read_barrier();
        /*
         * 记住,这是一个环形缓冲区。如果头指针(head)等于尾指针(tail),
         * 则意味着该缓冲区为空。
         */
        if (head == *cring->tail)
            break;

        /* Get the entry */
        cqe = &cring->cqes[head & *s->cq_ring.ring_mask];
        fi = (struct file_info*) cqe->user_data;
        if (cqe->res < 0)
            fprintf(stderr, "Error: %s\n", strerror(abs(cqe->res)));

        int blocks = (int) fi->file_sz / BLOCK_SZ;
        if (fi->file_sz % BLOCK_SZ) blocks++;

        for (int i = 0; i < blocks; i++)
            output_to_console(fi->iovecs[i].iov_base, fi->iovecs[i].iov_len);

        head++;
    } while (1);

    *cring->head = head;
    write_barrier();
}
/*
 * 向提交队列提交请求。
 * 在这个函数中,我们向提交队列提交请求。你可以提交多种类型的请求。
 * 我们要提交的是 `readv()` 请求,我们通过 `IORING_OP_READV` 来指定该请求。
 */
int submit_to_sq(char *file_path, struct submitter *s) {
    struct file_info *fi;

    int file_fd = open(file_path, O_RDONLY);
    if (file_fd < 0 ) {
        perror("open");
        return 1;
    }

    struct app_io_sq_ring *sring = &s->sq_ring;
    unsigned index = 0, current_block = 0, tail = 0, next_tail = 0;

    off_t file_sz = get_file_size(file_fd);
    if (file_sz < 0)
        return 1;
    off_t bytes_remaining = file_sz;
    int blocks = (int) file_sz / BLOCK_SZ;
    if (file_sz % BLOCK_SZ) blocks++;

    fi = malloc(sizeof(*fi) + sizeof(struct iovec) * blocks);
    if (!fi) {
        fprintf(stderr, "Unable to allocate memory\n");
        return 1;
    }
    fi->file_sz = file_sz;

    /*
     * 对于文件中每个需要读取的块,我们都会分配一个 `iovec` 结构体,
     * 并将其存入 `iovecs` 数组中。这个数组会作为提交请求的一部分被传入。
     * 如果你不理解这一点,那么你需要查阅 `readv()` 和 `writev()` 系统调用的工作原理。
     */
    while (bytes_remaining) {
        off_t bytes_to_read = bytes_remaining;
        if (bytes_to_read > BLOCK_SZ)
            bytes_to_read = BLOCK_SZ;

        fi->iovecs[current_block].iov_len = bytes_to_read;

        void *buf;
        if( posix_memalign(&buf, BLOCK_SZ, BLOCK_SZ)) {
            perror("posix_memalign");
            return 1;
        }
        fi->iovecs[current_block].iov_base = buf;

        current_block++;
        bytes_remaining -= bytes_to_read;
    }
    
    /* 将我们的提交队列条目添加到提交队列条目(SQE)环形缓冲区的尾部 */
    next_tail = tail = *sring->tail;
    next_tail++;
    read_barrier();
    index = tail & *s->sq_ring.ring_mask;
    struct io_uring_sqe *sqe = &s->sqes[index];
    sqe->fd = file_fd;
    sqe->flags = 0;
    sqe->opcode = IORING_OP_READV;
    sqe->addr = (unsigned long) fi->iovecs;
    sqe->len = blocks;
    sqe->off = 0;
    sqe->user_data = (unsigned long long) fi;
    sring->array[index] = index;
    tail = next_tail;

    /* Update the tail so the kernel can see it. */
    if(*sring->tail != tail) {
        *sring->tail = tail;
        write_barrier();
    }

    /*
     * 通过 `io_uring_enter()` 系统调用告知内核我们已提交了事件。
     * 我们还传入了 `IOURING_ENTER_GETEVENTS` 标志,该标志会使 `io_uring_enter()` 调用等待,
     * 直到 `min_complete` 个事件(第三个参数)完成。
     */
    int ret =  io_uring_enter(s->ring_fd, 1,1,
            IORING_ENTER_GETEVENTS);
    if(ret < 0) {
        perror("io_uring_enter");
        return 1;
    }

    return 0;
}

int main(int argc, char *argv[]) {
    struct submitter *s;

    if (argc < 2) {
        fprintf(stderr, "Usage: %s <filename>\n", argv[0]);
        return 1;
    }

    s = malloc(sizeof(*s));
    if (!s) {
        perror("malloc");
        return 1;
    }
    memset(s, 0, sizeof(*s));

    if(app_setup_uring(s)) {
        fprintf(stderr, "Unable to setup uring!\n");
        return 1;
    }

    for (int i = 1; i < argc; i++) {
        if(submit_to_sq(argv[i], s)) {
            fprintf(stderr, "Error reading file\n");
            return 1;
        }
        read_from_cq(s);
    }

    return 0;
}

初始设置

在 main() 函数中,我们调用 app_setup_uring() 函数,该函数会完成使用 io_uring 所需的初始化工作。首先,我们调用 io_uring_setup() 系统调用,传入所需的队列深度以及一个所有成员都初始化为零的 io_uring_params 结构体实例。当该调用返回时,内核会填充此结构体成员的值。io_uring_params 结构体的定义如下:

struct io_uring_params {
    __u32 sq_entries;
    __u32 cq_entries;
    __u32 flags;
    __u32 sq_thread_cpu;
    __u32 sq_thread_idle;
    __u32 resv[5];
    struct io_sqring_offsets sq_off;
    struct io_cqring_offsets cq_off;
};

在将此结构体作为 io_uring_setup() 系统调用的一部分传入之前,你唯一可以指定的是 flags 结构体成员,但在这个示例中,我们不需要传递任何标志。此外,在这个示例中,我们逐个处理文件。由于这是一个主要用于理解 io_uring 的简单示例,我们不会进行任何并行 I/O 操作。为此,我们将队列深度设置为 1。

io_uring_setup() 的返回值(一个文件描述符)以及 io_uring_param 结构体中的其他字段,随后会在调用 mmap() 时使用,以将两个环形缓冲区和一个提交队列条目数组映射到用户空间。请看下面的代码,我移除了一些周边代码,以便专注于 mmap() 调用:

    /* 映射提交和完成队列的环形缓冲区。
     * 不过,较旧的内核仅映射提交队列。
     * */
    sq_ptr = mmap(0, sring_sz, PROT_READ | PROT_WRITE, 
            MAP_SHARED | MAP_POPULATE,
            s->ring_fd, IORING_OFF_SQ_RING);
    if (sq_ptr == MAP_FAILED) {
        perror("mmap");
        return 1;
    }
    if (p.features & IORING_FEAT_SINGLE_MMAP) {
        cq_ptr = sq_ptr;
    } else {
        /* 在较旧的内核中,单独映射完成队列的环形缓冲区 */
        cq_ptr = mmap(0, cring_sz, PROT_READ | PROT_WRITE, 
                MAP_SHARED | MAP_POPULATE,
                s->ring_fd, IORING_OFF_CQ_RING);
        if (cq_ptr == MAP_FAILED) {
            perror("mmap");
            return 1;
        }
    }
    /* 映射提交队列条目数组 */
    s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
            PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
            s->ring_fd, IORING_OFF_SQES);


我们将重要信息保存在 app_io_sq_ring 和 app_io_cq_ring 结构体中,以便后续方便引用。当我们分别映射提交和完成的两个环形缓冲区时,你可能会好奇第二次映射的用途。完成队列环形缓冲区直接索引共享的完成队列条目(CQE)数组,而提交环形缓冲区中间有一个间接数组。提交端的环形缓冲区是这个数组的索引,而该数组又包含提交队列条目(SQE)的索引。这对于某些将提交请求嵌入内部数据结构的应用程序很有用。这种设置允许它们一次性提交多个提交条目,同时使它们更容易采用 io_uring

注意:在 5.4 及更高版本的内核中,一次 mmap() 调用可以同时映射提交队列和完成队列。然而,在较旧的内核中,它们需要分别进行映射。我们可以不检查内核版本,而是通过检查 IORING_FEAT_SINGLE_MMAP 特性标志来判断内核是否具备使用一次 mmap() 调用映射两个队列的能力,就像我们在上面的代码中所做的那样。

处理共享环形缓冲区

在常规编程中,我们习惯处理用户空间和内核之间非常清晰的接口:系统调用。然而,系统调用确实有开销,对于像 io_uring 这样的高性能接口,我们希望尽可能避免使用它们。我们之前看到,使用 io_uring 允许我们将许多 I/O 请求批量处理,并对 io_uring_enter() 系统调用进行一次调用,而不是像通常那样进行多次系统调用。或者在轮询模式下,甚至不需要进行该调用。

当从用户空间读取或更新共享环形缓冲区时,需要注意确保在读取时能看到最新的数据,并且在更新后“刷新”或“同步”写入操作,以便内核能看到你的更新。这是因为 CPU 可以对读写操作进行重新排序,编译器也可以这样做。当这些操作在同一个 CPU 上进行时,通常不会有问题。但在 io_uring 的情况下,当涉及到用户空间和内核这两个不同上下文之间的共享缓冲区时,并且在上下文切换后它们可能在不同的 CPU 上运行。你需要从用户空间确保在读取之前,之前的写入操作是可见的。或者当你在一个 SQE 中填充详细信息并更新提交环形缓冲区的尾指针时,你要确保对 SQE 成员的写入操作在更新环形缓冲区尾指针的写入操作之前完成排序。如果这些写入操作没有排序,内核可能会看到尾指针已更新,但当它读取 SQE 时,可能找不到所需的所有数据。在轮询模式下,内核会查找尾指针的变化,这就会成为一个真正的问题。这一切都是因为 CPU 和编译器为了优化而对读写操作进行重新排序。

读取完成队列条目

一如既往,我们先从完成操作这一侧入手,因为它比提交操作要简单。这些解释是必要的,因为我们需要讨论内存序以及如何处理它。否则,我们就是在探讨如何处理环形缓冲区。对于完成事件,内核会将完成队列条目(CQE)添加到环形缓冲区并更新尾指针,而我们在用户空间从头部读取。和任何环形缓冲区一样,如果头部指针和尾部指针相等,就意味着环形缓冲区为空。看看下面的代码:

unsigned head;
head = cqring->head;
read_barrier(); /* 确保之前的写入操作可见 */
if (head != cqring->tail) {
    /* 环形缓冲区中有可用数据 */
    struct io_uring_cqe *cqe;
    unsigned index;
    index = head & (cqring->mask);
    cqe = &cqring->cqes[index];
    /* 在此处理已完成的 CQE */
     ...
    /* 我们现在已经处理了这个条目 */
    head++;
}
cqring->head = head;
write_barrier();

为了获取头部的索引,应用程序需要将头部指针与环形缓冲区的大小掩码进行按位与运算。要记住,上述代码中的任何一行都可能在上下文切换后执行。所以,在进行比较之前,我们调用了 read_barrier(),这样,如果内核确实更新了尾指针,我们就能在 if 语句的比较中读取到它。一旦我们获取到 CQE 并对其进行处理,就更新头部指针,让内核知道我们已经从环形缓冲区中处理了一个条目。最后的 write_barrier() 确保我们所做的写入操作对内核可见。

进行提交操作

进行提交操作与读取完成操作相反。在完成操作中,内核将条目添加到尾部,我们从环形缓冲区的头部读取条目;而在进行提交操作时,我们将条目添加到尾部,内核从环形缓冲区的头部读取条目。

struct io_uring_sqe *sqe;
unsigned tail, index;
tail = sqring->tail;
index = tail & (*sqring->ring_mask);
sqe = &sqring→sqes[index];
/* 此函数调用为该 I/O 请求填充 SQE 详细信息 */
app_init_io(sqe);
/* 将 SQE 索引填入 SQ 环形数组 */
sqring->array[index] = index;
tail++;
write_barrier();
sqring->tail = tail;
write_barrier();

在上面的代码片段中,应用程序中的 app_init_io() 函数会填充提交请求的详细信息。在更新尾指针之前,我们调用 write_barrier() 来确保在更新尾指针之前,之前的写入操作按顺序执行。然后我们更新尾指针,并再次调用 write_barrier() 以确保我们的更新操作被内核看到。我们在这里确保操作的有序性。

使用 liburing 实现 cat 功能

我们看到,使用 io_uring 构建一个像读取文件这样简单的程序可能并不像看起来那么直接。事实证明,与使用同步 I/O 读取文件的程序相比,它的代码更多。但如果你分析 cat_uring 的代码,你会发现其中大部分是样板代码,可以很容易地隐藏在一个单独的文件中,而不会影响应用程序的逻辑。无论如何,我们有意学习 io_uring 的底层细节,以便更好地理解它的工作原理。但如果你打算在你正在构建的实际应用程序中使用 io_uring,你可能不应该直接使用其原生接口。相反,你应该使用 liburing,它是 io_uring 之上一个很好的高级封装库。

现在让我们看看如何使用 liburing 实现一个功能与 cat_uring 类似的程序。我们将这个程序称为 cat_liburing

#include <fcntl.h>
#include <stdio.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <liburing.h>
#include <stdlib.h>

#define QUEUE_DEPTH 1
#define BLOCK_SZ    1024

struct file_info {
    off_t file_sz;
    struct iovec iovecs[];      /* Referred by readv/writev */
};

/*
 * 返回传入的已打开文件描述符所对应文件的大小。
 * 同时也能正确处理普通文件和块设备。相当实用。
 */ 
off_t get_file_size(int fd) {
    struct stat st;

    if(fstat(fd, &st) < 0) {
        perror("fstat");
        return -1;
    }
    if (S_ISBLK(st.st_mode)) {
        unsigned long long bytes;
        if (ioctl(fd, BLKGETSIZE64, &bytes) != 0) {
            perror("ioctl");
            return -1;
        }
        return bytes;
    } elseif (S_ISREG(st.st_mode))
        return st.st_size;

    return -1;
}

/*
 * 向标准输出(stdout)输出长度为 `len` 的一串字符。
 * 为了提高效率,我们在此处采用带缓冲的输出方式,
 * 因为我们需要逐个字符进行输出。
 */
void output_to_console(char *buf, int len) {
    while (len--) {
        fputc(*buf++, stdout);
    }
}

/*
 * 等待有完成事件可用,从 `readv` 操作中获取数据并将其打印到控制台。
 */
int get_completion_and_print(struct io_uring *ring) {
    struct io_uring_cqe *cqe;
    int ret = io_uring_wait_cqe(ring, &cqe);
    if (ret < 0) {
        perror("io_uring_wait_cqe");
        return 1;
    }
    if (cqe->res < 0) {
        fprintf(stderr, "Async readv failed.\n");
        return 1;
    }
    struct file_info *fi = io_uring_cqe_get_data(cqe);
    int blocks = (int) fi->file_sz / BLOCK_SZ;
    if (fi->file_sz % BLOCK_SZ) blocks++;
    for (int i = 0; i < blocks; i ++)
        output_to_console(fi->iovecs[i].iov_base, fi->iovecs[i].iov_len);

    io_uring_cqe_seen(ring, cqe);
    return 0;
}

/*
 * 通过 liburing 提交 readv 请求。
 */
int submit_read_request(char *file_path, struct io_uring *ring) {
    int file_fd = open(file_path, O_RDONLY);
    if (file_fd < 0) {
        perror("open");
        return 1;
    }
    off_t file_sz = get_file_size(file_fd);
    off_t bytes_remaining = file_sz;
    off_t offset = 0;
    int current_block = 0;
    int blocks = (int) file_sz / BLOCK_SZ;
    if (file_sz % BLOCK_SZ) blocks++;
    struct file_info *fi = malloc(sizeof(*fi) +
                                          (sizeof(struct iovec) * blocks));

    /*
     * 对于文件中每个需要读取的块,我们会分配一个 `iovec` 结构体,
     * 并将其索引存于 `iovecs` 数组中。这个数组作为提交内容的一部分被传入。
     * 如果你不理解这一点,那么你需要查阅 `readv()` 和 `writev()` 系统调用是如何工作的。
     * */ 
    while (bytes_remaining) {
        off_t bytes_to_read = bytes_remaining;
        if (bytes_to_read > BLOCK_SZ)
            bytes_to_read = BLOCK_SZ;

        offset += bytes_to_read;
        fi->iovecs[current_block].iov_len = bytes_to_read;

        void *buf;
        if( posix_memalign(&buf, BLOCK_SZ, BLOCK_SZ)) {
            perror("posix_memalign");
            return 1;
        }
        fi->iovecs[current_block].iov_base = buf;

        current_block++;
        bytes_remaining -= bytes_to_read;
    }
    fi->file_sz = file_sz;

    /* Get an SQE */
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    /* Setup a readv operation */
    io_uring_prep_readv(sqe, file_fd, fi->iovecs, blocks, 0);
    /* Set user data */
    io_uring_sqe_set_data(sqe, fi);
    /* Finally, submit the request */
    io_uring_submit(ring);

    return 0;
}

int main(int argc, char *argv[]) {
    struct io_uring ring;

    if (argc < 2) {
        fprintf(stderr, "Usage: %s [file name] <[file name] ...>\n",
                argv[0]);
        return 1;
    }

    /* Initialize io_uring */
    io_uring_queue_init(QUEUE_DEPTH, &ring, 0);

    for (int i = 1; i < argc; i++) {
        int ret = submit_read_request(argv[i], &ring);
        if (ret) {
            fprintf(stderr, "Error reading file: %s\n", argv[i]);
            return 1;
        }
        get_completion_and_print(&ring);
    }

    /* Call the clean-up function. */
    io_uring_queue_exit(&ring);
    return 0;
}

让我们比较一下这些实现各自所占用的代码行数:

  • 常规的 cat 命令:约 120 行
  • 使用原生 io_uring 的 cat 命令:约 360 行
  • 使用 liburing 的 cat 命令:约 160 行

现在可以看到,使用 liburing 确实显著减少了代码行数。而且去掉所有样板代码后,逻辑更加清晰了。让我们快速梳理一下。我们像这样初始化 io_uring

io_uring_queue_init(QUEUE_DEPTH, &ring, 0);

在 submit_read_request() 函数中,我们获取一个提交队列项(SQE),为 readv 操作做好准备并提交它。

    /* 获取一个 SQE */
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    /* 设置一个 readv 操作 */
    io_uring_prep_readv(sqe, file_fd, fi->iovecs, blocks, 0);
    /* 设置用户数据 */
    io_uring_sqe_set_data(sqe, fi);
    /* 最后,提交请求 */
    io_uring_submit(ring);

我们等待一个完成事件,并获取在提交端设置的用户数据,如下所示:

    struct io_uring_cqe *cqe;
    int ret = io_uring_wait_cqe(ring, &cqe);
    struct file_info *fi = io_uring_cqe_get_data(cqe);

当然,与使用原生接口相比,这使用起来要简单得多。

为什么要为异步编程如此大费周章?

如果你要构建的应用程序每小时处理数千甚至数十万的请求,那么你无需为异步 I/O 操心。基于线程池架构设计的应用程序框架就可以很好地满足你的需求。但如果你希望每小时高效处理数百万的请求,那么你可能需要更深入地研究异步编程。异步编程通过在单线程中处理大部分 I/O 操作,避免了操作系统的线程/进程上下文切换开销。阅读我的系列文章,深入了解你的应用程序可以使用的各种 Linux 进程模型 —— 我们通过从零开始构建基于不同进程架构的 Web 服务器来实现这一点。

处理常规文件的难题

在 Linux 上进行异步编程,特别是涉及套接字时,通常会使用 select()poll() 或 epoll。虽然这些方法在处理套接字时效果很好,但在处理常规文件时效果不佳。如果你正在构建一个 Web 服务器或缓存服务器,你将需要大量处理常规文件,并且根据存储的并发能力和速度,访问常规文件会阻塞并降低服务器的速度。因此,为 NodeJS 提供支持的 libuv 库使用了一组单独的线程来处理文件 I/O 以及其他一些事情。摘自 libuv 文档:

http://docs.libuv.org/en/v1.x/design.html

与网络 I/O 不同,libuv 无法依赖特定于平台的文件 I/O 原语,因此当前的做法是在线程池中运行阻塞式文件 I/O 操作。

libuv 目前使用一个全局线程池,所有事件循环都可以在该线程池上排队执行任务。目前有 3 种类型的操作在这个线程池上运行:

  • 文件系统操作
  • DNS 函数(getaddrinfo 和 getnameinfo
  • 通过 uv_queue_work() 指定的用户代码

使用 io_uring,无论操作是针对套接字还是常规文件,都可以统一处理。用户无需采用像使用线程池来处理文件 I/O 这样的技巧。阅读本文,详细分析异步编程和文件 I/O。

接下来呢?

在本系列文章的第一部分,我们了解了如何以三种不同的方式构建与 Unix 的 cat 命令等效的程序:同步方式、使用原生 io_uring 接口以及使用高级的 liburing。然而,我们每次只处理一个请求。我们的 cat 实现确实可以读取多个文件,但当向 io_uring 提交读取文件的请求时,我们会等待内核提供一个完成项,从中读取数据,然后再处理下一个文件,重复这个过程。我们特意简化了实现,以便理解 io_uring 的工作原理。但 io_uring 的真正强大之处在于能够同时处理多个请求。为了演示这一点,在本系列文章的第二部分,我们将编写一个程序来复制文件,同时让 io_uring 处理多个请求,文件的每个数据块对应一个请求。

源代码

所有示例的完整源代码可在 GitHub 上获取。

https://github.com/shuveb/io_uring-by-example

关于作者

我叫 Shuveb Hussain,是这个专注于 Linux 的博客的作者。你可以在 Twitter 上关注我,我会发布与技术相关的内容,主要聚焦于 Linux、性能、可扩展性和云技术。 

Src

https://unixism.net/2020/04/io-uring-by-example-part-1-introduction/




推荐本站淘宝优惠价购买喜欢的宝贝:

image.png

本文链接:https://www.hqyman.cn/post/11894.html 非本站原创文章欢迎转载,原创文章需保留本站地址!

分享到:
打赏





休息一下~~


« 上一篇 下一篇 »

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

请先 登录 再评论,若不是会员请先 注册

您的IP地址是: