基础 io_uring 架构(SQ/CQ 环、io-wq、基本操作码)见
09_io_uring.md,本文专注高级特性的内核实现细节。
- 固定缓冲区(Fixed Buffers)
- 固定文件(Fixed Files)
- 超时操作(Timeout)
- io_uring_cmd(设备命令直通)
- 网络零拷贝发送
- Multishot 操作模式
- Buffer Ring(提供缓冲区)
- SQPOLL 高级特性
- eBPF 与 io_uring 交互
- 安全与权限模型
- 资源注册系统全貌
普通 I/O 操作在每次调用时都需要执行以下步骤:
get_user_pages()将用户内存页钉住(pin pages),防止内核 I/O 进行期间被换出- 构建
bio_vec链,描述物理页布局 - I/O 完成后调用
put_page()释放页面引用
对于每秒数十万次的高频小 I/O(例如 NVMe SSD、网络收发),这套流程的开销非常显著。固定缓冲区(Fixed Buffers)的核心思想:在注册阶段一次性完成 pin pages,将 bio_vec 建立好后永久保存,后续每次 I/O 直接复用已钉住的物理页映射,完全跳过重复的页面锁定流程。
普通 I/O(每次操作):
用户提交 SQE
|
v
get_user_pages() ← 开销大
|
v
构建 bio_vec 链
|
v
提交给驱动/网络栈
|
v
unpin_pages() ← 开销大
Fixed Buffer I/O(注册后每次操作):
注册阶段(一次性):
pin_user_pages_fast() → 永久保存 io_mapped_ubuf.bvec[]
每次操作:
直接引用 imu->bvec[] → 提交给驱动/网络栈
(无 pin/unpin 开销)
io_uring/rsrc.h 定义了固定缓冲区的核心类型:
struct io_rsrc_node {
unsigned char type; // IORING_RSRC_FILE 或 IORING_RSRC_BUFFER
int refs; // 引用计数
u64 tag; // 用户自定义标签(用于资源跟踪通知)
union {
unsigned long file_ptr; // fixed file 场景,存储 struct file *
struct io_mapped_ubuf *buf; // fixed buffer 场景
};
};
struct io_mapped_ubuf {
u64 ubuf; // 用户空间起始地址
unsigned int len; // 缓冲区长度(字节)
unsigned int nr_bvecs; // bio_vec 段数量
unsigned int folio_shift; // folio 大小的 2^n,用于地址计算
refcount_t refs; // 引用计数(共享场景)
unsigned long acct_pages; // 已向 user->locked_vm 计费的页数
void (*release)(void *); // 释放回调,默认 io_release_ubuf()
void *priv; // release 的参数(通常是 imu 自身)
u8 flags;
u8 dir; // IO_IMU_DEST 或 IO_IMU_SOURCE
struct bio_vec bvec[] __counted_by(nr_bvecs); // 变长尾数组
};系统上限常量(io_uring/rsrc.c:34-37):
#define IORING_MAX_FIXED_FILES (1U << 20) // 最多 1,048,576 个固定文件
#define IORING_MAX_REG_BUFFERS (1U << 14) // 最多 16,384 个固定缓冲区
#define IO_CACHED_BVECS_SEGS 32 // 小缓冲区 imu_cache 阈值注册入口通过 io_uring_register(2) 系统调用,操作码为 IORING_REGISTER_BUFFERS(或更新版 IORING_REGISTER_BUFFERS2):
io_uring_register(ring_fd, IORING_REGISTER_BUFFERS, iovec_array, nr)
|
v
io_sqe_buffers_register() [rsrc.c]
|
+---> io_rsrc_data_alloc(&ctx->buf_table, nr)
| 分配 io_rsrc_node * 指针数组
|
+---> 遍历每个 struct iovec:
|
| io_sqe_buffer_register(ctx, iov, &last_hpage)
| [rsrc.c:762]
| |
| +---> io_validate_user_buf_range() [rsrc.c:86]
| | ├── 长度上限 SZ_1G(1 GB)
| | ├── 长度不得为 0
| | └── 地址 + 长度不得溢出
| |
| +---> io_rsrc_node_alloc(ctx, IORING_RSRC_BUFFER)
| | 从 ctx->node_cache 对象池取一个节点
| |
| +---> io_pin_pages(addr, len, &nr_pages)
| | pin_user_pages_fast() → 钉住所有物理页
| | 返回 struct page ** 数组
| |
| +---> io_check_coalesce_buffer() [rsrc.c:715]
| | 检查是否可将连续页合并为 folio 级别映射
| | 减少 bvec 数量,降低内存占用
| |
| +---> io_account_mem() [rsrc.c:69]
| | 向 user->locked_vm 和 mm->pinned_vm 计费
| |
| +---> 构建 imu->bvec[] 数组
| | 每个 folio 或 page 对应一个 bio_vec 段
| |
| +---> node->buf = imu
|
+---> ctx->buf_table.nodes[i] = node
io_pin_pages() 是固定缓冲区的关键函数。它调用 pin_user_pages_fast() 以 FOLL_WRITE | FOLL_LONGTERM 标志钉住用户内存页,FOLL_LONGTERM 告知内核这些页面将被长期持有,不允许 KSM、页面迁移、透明大页拆分等操作影响它们。
钉住后,这些页面的物理地址不会变化,直到调用 unpin_user_folio() 才解除。
io_check_coalesce_buffer()(rsrc.c:715)在钉住页面后检查是否可以将多个连续的 struct page 合并为以 folio 为单位的 bio_vec:
bool io_check_coalesce_buffer(struct page **page_array, int nr_pages,
struct io_imu_folio_data *data)
{
struct folio *folio = page_folio(page_array[0]);
// 记录 folio_shift(2^n = folio 大小)
data->folio_shift = folio_shift(folio);
// 检查页面是否在 folio 内连续,且所有 folio 大小一致
...
}合并条件:
- 所有相邻页属于同一 compound folio,且在 folio 内地址连续
- 所有中间 folio 的大小相同(
folio_size == 1 << folio_shift)
合并效果:一个 1GB 的 2MB 大页缓冲区,原本需要 512 个 bio_vec,合并后只需 1 个,节省大量内存和 DMA 映射建立时间。合并后 imu->folio_shift 记录 folio 的 shift 值(例如 21 表示 2MB),后续地址计算时用于快速定位:
// rsrc.c:1087 地址偏移到 folio 索引的计算
seg_skip = 1 + (offset >> imu->folio_shift);io_rsrc_cache_init()(rsrc.c:154)在 ring 创建时初始化两个对象缓存:
bool io_rsrc_cache_init(struct io_ring_ctx *ctx)
{
const int imu_cache_size = struct_size_t(struct io_mapped_ubuf, bvec,
IO_CACHED_BVECS_SEGS); // 32段
ret = io_alloc_cache_init(&ctx->node_cache, IO_ALLOC_CACHE_MAX,
sizeof(struct io_rsrc_node), 0);
ret |= io_alloc_cache_init(&ctx->imu_cache, IO_ALLOC_CACHE_MAX,
imu_cache_size, 0);
return ret;
}当缓冲区的 bio_vec 段数量 <= IO_CACHED_BVECS_SEGS(32段),分配时从 ctx->imu_cache 取预分配的对象(rsrc.c:114-117);否则用 kvmalloc_flex() 动态分配。释放时同理回收到缓存中(rsrc.c:119-125)。这避免了大量小型缓冲区注册/注销时的频繁 malloc/free。
io_account_mem()(rsrc.c:69)向系统记账,防止用户绕过 RLIMIT_MEMLOCK 限制:
int io_account_mem(struct user_struct *user, struct mm_struct *mm_account,
unsigned long nr_pages)
{
// 1. 检查并更新 user->locked_vm(原子 cmpxchg 循环)
ret = __io_account_mem(user, nr_pages);
// 2. 更新 mm->pinned_vm(统计进程 pin 的总页数)
atomic64_add(nr_pages, &mm_account->pinned_vm);
}__io_account_mem()(rsrc.c:39)用无锁 CAS 循环确保 locked_vm 不超过 RLIMIT_MEMLOCK >> PAGE_SHIFT,防止一个进程钉住大量内存耗尽系统资源。
注销缓冲区(IORING_UNREGISTER_BUFFERS)
|
v
io_buffer_unmap(ctx, imu) [rsrc.c:127]
|
+---> refcount_dec_and_test(&imu->refs)
| 如果还有其他引用(如 ZC 发送中),推迟释放
|
+---> io_unaccount_mem() 归还 locked_vm/pinned_vm 计费
|
+---> imu->release(imu->priv) → io_release_ubuf()
| for each bvec: unpin_user_folio()
|
+---> io_free_imu()
nr_bvecs <= 32 → 回收到 imu_cache
nr_bvecs > 32 → kvfree()
Linux 内核中每次 read()/write()/send() 都隐含以下操作:
fget(fd)— 从进程 FD 表查找struct file *,原子递增引用计数- 完成后
fput(file)— 原子递减引用计数,可能触发文件释放
在高并发场景(如每秒百万次 I/O),这两个原子操作产生显著的 cache line 竞争。固定文件(Fixed Files)在注册时持有 struct file * 的永久引用,I/O 时直接通过索引获取指针,跳过 fget/fput。
struct io_file_table {
struct io_rsrc_data data; // nodes[] 指针数组(同 buffer 表)
unsigned long *bitmap; // 稀疏位图,标记已占用的槽位
unsigned int alloc_hint; // 位图搜索的起始提示
};
io_ring_ctx->file_table // 固定文件表
|
+-- data.nodes[0..N-1] // io_rsrc_node * 数组
| 每个非 NULL 节点包含 file_ptr(低2位存 direct/legacy 标志)
|
+-- bitmap[0..(N/64)-1] // 64-bit 单词位图
bit=1 表示该槽位已占用
io_uring_register(fd, IORING_REGISTER_FILES, fds_array, nr_fds)
|
v
io_sqe_files_register()
|
+---> io_alloc_file_tables(ctx, &ctx->file_table, nr_fds)
| ├── io_rsrc_data_alloc() 分配 nodes[] 指针数组
| └── bitmap_zalloc(nr_fds) 分配清零的位图
|
+---> 遍历每个 fd:
| ├── fget(fd) → struct file *
| ├── 禁止注册 io_uring 实例本身(防止循环)
| ├── io_rsrc_node_alloc(ctx, IORING_RSRC_FILE)
| ├── io_fixed_file_set(node, file) 存储 file 指针
| └── io_file_bitmap_set(&ctx->file_table, i) 标记位图
|
+---> ctx->nr_user_files = nr_fds
通过 IORING_RSRC_REGISTER_SPARSE 标志可以注册一个稀疏文件表:初始时所有槽位为 NULL,不实际持有任何文件引用,后续通过 IORING_REGISTER_FILES_UPDATE 动态填充。
位图管理由 io_file_bitmap_get() 实现(filetable.c:16):
static int io_file_bitmap_get(struct io_ring_ctx *ctx)
{
struct io_file_table *table = &ctx->file_table;
unsigned long nr = ctx->file_alloc_end;
// 从 alloc_hint 向后搜索第一个为 0 的 bit(空闲槽位)
ret = find_next_zero_bit(table->bitmap, nr, table->alloc_hint);
if (ret != nr)
return ret;
// 回绕到起点重试(循环搜索)
...
}ctx->file_alloc_start / ctx->file_alloc_end 允许用户将自动分配范围限制在文件表的一个子区间内,其余部分留给手动分配。
当 SQE 的 file_index 字段设置为 IORING_FILE_INDEX_ALLOC(值为 (u32)-1)时,内核自动为新文件寻找空槽(filetable.c:87-100):
int __io_fixed_fd_install(struct io_ring_ctx *ctx, struct file *file,
unsigned int file_slot)
{
bool alloc_slot = file_slot == IORING_FILE_INDEX_ALLOC;
if (alloc_slot) {
ret = io_file_bitmap_get(ctx); // 搜索空位
file_slot = ret;
} else {
file_slot--; // 用户传的是 1-based 索引
}
io_install_fixed_file(ctx, file, file_slot);
return alloc_slot ? file_slot : 0;
}分配成功后,实际槽位索引通过 CQE 的 res 字段返回给用户态,用户后续可直接通过该索引引用文件。这一机制在 IORING_ACCEPT_MULTISHOT 中大量使用,每次 accept 的新连接自动入表。
在提交 SQE 时,内核从文件表直接取指针(io_files_update_prep() → io_file_get_fixed()),代码路径中没有任何原子引用计数操作。安全保证来自:
io_rsrc_node.refs在 ring 整个生命周期内保持非零,文件不会提前关闭- 请求进行时通过
io_req_fput()在请求完成路径中释放,而不是即时释放 IORING_SETUP_SINGLE_ISSUER确保只有一个线程提交,消除并发释放竞争
固定文件路径(io_file_get_fixed):
数组下标访问 nodes[index] → 取 file_ptr → 直接使用
[约 2-3 ns,无原子操作]
普通 fd 路径(io_file_get_normal → fget):
fdtable 查找 → rcu_dereference →
atomic_inc(&file->f_count) → ...
[约 10-15 ns,含原子操作和可能的 cache miss]
io_uring/timeout.c 开头定义了两个关键结构体(timeout.c:16-36):
struct io_timeout {
struct file *file;
u32 off; // 序号偏移:等待多少个 CQE 完成后触发
u32 target_seq; // 绝对目标序号
u32 repeats; // multishot 重复次数(0=无限)
struct list_head list; // 挂入 ctx->timeout_list 有序链表
struct io_kiocb *head; // 链式超时:指向被保护的请求
struct io_kiocb *prev; // 链式超时:hrtimer 触发时保存
};
struct io_timeout_rem {
struct file *file;
u64 addr; // 用于匹配需要取消的超时(user_data)
struct timespec64 ts; // 更新超时时的新时间值
u32 flags;
bool ltimeout; // 是否是链式超时
};超时的异步数据保存在 struct io_timeout_data(定义在 timeout.h):
struct io_timeout_data {
struct io_kiocb *req;
struct hrtimer timer; // 内核高精度定时器
struct timespec64 ts; // 超时时间值
enum hrtimer_mode mode; // HRTIMER_MODE_REL 或 ABS
u32 flags; // IORING_TIMEOUT_* 标志
};io_timeout_get_clock()(timeout.c:381)根据 flags 选择时钟源:
static clockid_t io_timeout_get_clock(struct io_timeout_data *data)
{
switch (data->flags & IORING_TIMEOUT_CLOCK_MASK) {
case IORING_TIMEOUT_BOOTTIME:
return CLOCK_BOOTTIME; // 包含系统休眠时间
case IORING_TIMEOUT_REALTIME:
return CLOCK_REALTIME; // 挂钟时间,可被 NTP 调整
default:
case 0:
return CLOCK_MONOTONIC; // 单调时钟(默认)
}
}| 时钟源 | flag | 特性 |
|---|---|---|
CLOCK_MONOTONIC |
0(默认) | 单调递增,不受 NTP 影响,不计休眠 |
CLOCK_BOOTTIME |
IORING_TIMEOUT_BOOTTIME |
单调递增,包含休眠时间 |
CLOCK_REALTIME |
IORING_TIMEOUT_REALTIME |
可被 NTP/adjtime 修改 |
超时触发路径(timeout.c:257-279):
static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
{
struct io_timeout_data *data = container_of(timer,
struct io_timeout_data, timer);
struct io_kiocb *req = data->req;
struct io_timeout *timeout = io_kiocb_to_cmd(req, struct io_timeout);
raw_spin_lock_irqsave(&ctx->timeout_lock, flags);
list_del_init(&timeout->list); // 从有序链表移除
atomic_set(&req->ctx->cq_timeouts,
atomic_read(&req->ctx->cq_timeouts) + 1); // 计数器 +1
raw_spin_unlock_irqrestore(&ctx->timeout_lock, flags);
if (!(data->flags & IORING_TIMEOUT_ETIME_SUCCESS))
req_set_fail(req);
io_req_set_res(req, -ETIME, 0);
req->io_task_work.func = io_timeout_complete;
io_req_task_work_add(req); // 调度到任务工作队列完成
return HRTIMER_NORESTART; // 默认单次触发
}hrtimer 在软中断上下文触发,不能直接操作 CQ,因此通过 io_req_task_work_add() 将完成工作推迟到提交任务的上下文执行。
ctx->timeout_list 是一个按 target_seq 排序的有序链表。插入时需要找到正确的位置,确保序号小的超时排在前面,方便 io_flush_timeouts() 按顺序扫描。
io_flush_timeouts()(timeout.c:126)在每次 CQE 完成后被调用,检查链表头部是否到达触发条件:
void io_flush_timeouts(struct io_ring_ctx *ctx)
{
seq = READ_ONCE(ctx->cached_cq_tail) - atomic_read(&ctx->cq_timeouts);
list_for_each_entry_safe(timeout, tmp, &ctx->timeout_list, list) {
// 基于序号差值判断,处理 u32 回绕
events_needed = timeout->target_seq - ctx->cq_last_tm_flush;
events_got = seq - ctx->cq_last_tm_flush;
if (events_got < events_needed)
break; // 链表有序,剩余项无需检查
io_kill_timeout(req, &list);
}
ctx->cq_last_tm_flush = seq;
}基于时间的超时(IORING_TIMEOUT_ABS 或相对时间):
- hrtimer 到期后无条件触发,返回
-ETIME
基于完成计数的超时(sqe->off > 0):
- 等待指定数量的 CQE 完成后触发,相当于"等待 N 个操作完成,或超时"
timeout->off存储等待的 CQE 数量,target_seq = cached_cq_tail + off
ctx->cached_cq_tail
|
v
已完成 CQE: [cqe0][cqe1][cqe2]...[cqeN]
超时链表: [timeout(off=3)][timeout(off=5)]...
^
当 cached_cq_tail >= target_seq 时触发
普通超时只触发一次。设置 IORING_TIMEOUT_MULTISHOT 标志后,超时可以周期触发多次(timeout.c:57-67):
static inline bool io_timeout_finish(struct io_timeout *timeout,
struct io_timeout_data *data)
{
if (!(data->flags & IORING_TIMEOUT_MULTISHOT))
return true; // 普通超时:触发后即完成
// multishot:检查 repeats 计数
if (!timeout->off || (timeout->repeats && --timeout->repeats))
return false; // 还需继续重复
return true; // repeats 耗尽,结束
}周期触发逻辑在 io_timeout_complete()(timeout.c:71-90):
static void io_timeout_complete(struct io_tw_req tw_req, io_tw_token_t tw)
{
if (!io_timeout_finish(timeout, data)) {
if (io_req_post_cqe(req, -ETIME, IORING_CQE_F_MORE)) {
// 重新挂入链表并重启 hrtimer
raw_spin_lock_irq(&ctx->timeout_lock);
list_add(&timeout->list, ctx->timeout_list.prev);
hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
raw_spin_unlock_irq(&ctx->timeout_lock);
return;
}
}
io_req_task_complete(tw_req, tw); // 最终完成,释放请求
}每次触发都会产生一个带 IORING_CQE_F_MORE 标志的 CQE,通知用户态超时已触发但请求还在活跃。repeats 字段控制重复次数,0 表示无限重复(直到显式取消)。
链式超时是一种特殊机制:在链式 SQE 序列中,IORING_OP_LINK_TIMEOUT 紧跟在另一操作之后,为该操作设置超时保护:
SQE 链:[IORING_OP_RECV] --link--> [IORING_OP_LINK_TIMEOUT(5s)]
| |
v v
等待数据到达 5秒内无数据时取消 RECV
io_link_timeout_fn()(timeout.c:350)是链式超时的 hrtimer 回调:
static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
{
raw_spin_lock_irqsave(&ctx->timeout_lock, flags);
prev = timeout->head; // 取出被保护的请求(RECV 操作)
timeout->head = NULL;
if (prev) {
io_remove_next_linked(prev);
if (!req_ref_inc_not_zero(prev))
prev = NULL; // prev 已经完成,不需要取消
}
list_del(&timeout->list);
timeout->prev = prev;
raw_spin_unlock_irqrestore(&ctx->timeout_lock, flags);
req->io_task_work.func = io_req_task_link_timeout;
io_req_task_work_add(req);
return HRTIMER_NORESTART;
}任务工作 io_req_task_link_timeout()(timeout.c:323)随后调用 io_try_cancel() 取消被保护的请求,并将链式超时本身的 CQE 结果设置为 -ETIME。
若被保护的操作在超时前完成,则通过 io_disarm_next()(timeout.c:211)取消链式超时的 hrtimer,并以 -ECANCELED 完成超时 CQE。
链式超时状态机:
[RECV 提交]
|
+---> 启动链式超时 hrtimer(5s)
|
[情况A:RECV 在5s内完成]
|
+---> io_disarm_next() 取消 hrtimer
+---> 链式超时 CQE: res=-ECANCELED
+---> RECV CQE: res=接收字节数
[情况B:5s 超时先到]
|
+---> io_link_timeout_fn() 触发
+---> io_try_cancel() 取消 RECV
+---> RECV CQE: res=-ECANCELED
+---> 链式超时 CQE: res=-ETIME
传统 ioctl 是同步阻塞的,无法与 io_uring 的异步模型集成。IORING_OP_URING_CMD 提供了一个通用扩展点,允许设备驱动通过 f_op->uring_cmd() 接口暴露异步可取消的命令,完全利用 io_uring 的提交/完成基础设施。
普通 SQE 为 64 字节,通过 IORING_SETUP_SQE128 标志可以启用 128 字节 SQE。IORING_OP_URING_CMD 在 128 字节 SQE 模式下,后 64 字节完全由驱动定义(uring_cmd.c:219-225):
static inline size_t uring_sqe_size(struct io_kiocb *req)
{
if (req->ctx->flags & IORING_SETUP_SQE128 ||
req->opcode == IORING_OP_URING_CMD128)
return 2 * sizeof(struct io_uring_sqe); // 128 字节
return sizeof(struct io_uring_sqe); // 64 字节
}NVMe passthrough 正是利用 128 字节 SQE,将整个 NVMe 命令(64字节)嵌入其中,实现零拷贝命令传递。
128 字节 SQE 布局(NVMe passthrough 用例):
+--------------------+--------------------+
| 前 64 字节 | 后 64 字节 |
| io_uring 标准头 | NVMe 命令数据 |
| opcode=URING_CMD | (驱动定义) |
| fd=nvme_fd | |
| cmd_op=... | |
+--------------------+--------------------+
io_uring_cmd_prep()(uring_cmd.c:184)在提交时处理 SQE:
int io_uring_cmd_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
ioucmd->flags = READ_ONCE(sqe->uring_cmd_flags);
if (ioucmd->flags & IORING_URING_CMD_FIXED) {
if (ioucmd->flags & IORING_URING_CMD_MULTISHOT)
return -EINVAL; // FIXED 和 MULTISHOT 不能同时使用
req->buf_index = READ_ONCE(sqe->buf_index);
}
ioucmd->cmd_op = READ_ONCE(sqe->cmd_op);
// 分配异步数据缓存,包含 SQE 副本和 iov 缓存
ac = io_uring_alloc_async_data(&req->ctx->cmd_cache, req);
ioucmd->sqe = sqe; // 先指向 SQ 环中的原始 SQE
return 0;
}当请求需要异步执行(从 io-wq 线程发出)时,io_uring_cmd_sqe_copy()(uring_cmd.c:227)将 SQE 内容复制到 ac->sqes 中,防止 SQ 环中的数据被覆盖。
io_uring_cmd()(uring_cmd.c:239)是核心执行函数:
int io_uring_cmd(struct io_kiocb *req, unsigned int issue_flags)
{
if (!file->f_op->uring_cmd)
return -EOPNOTSUPP;
ret = security_uring_cmd(ioucmd); // LSM 安全检查
if (ret)
return ret;
// 调用驱动实现的 uring_cmd 接口
ret = file->f_op->uring_cmd(ioucmd, issue_flags);
...
}驱动返回值语义:
IOU_COMPLETE:命令已同步完成,直接生成 CQE-EIOCBQUEUED:命令已异步排队,稍后通过__io_uring_cmd_done()完成- 其他负值:命令失败
驱动可以将正在执行的命令标记为"可取消",这样用户通过 IORING_OP_ASYNC_CANCEL 发出取消请求时,io_uring 能找到并取消它。
注册可取消命令(uring_cmd.c:101-122):
void io_uring_cmd_mark_cancelable(struct io_uring_cmd *cmd,
unsigned int issue_flags)
{
// IOPOLL 模式不支持取消(completion 数据与 hash_node 重叠)
if (ctx->flags & IORING_SETUP_IOPOLL)
return;
if (!(cmd->flags & IORING_URING_CMD_CANCELABLE)) {
cmd->flags |= IORING_URING_CMD_CANCELABLE;
hlist_add_head(&req->hash_node, &ctx->cancelable_uring_cmd);
}
}取消时遍历哈希链表(uring_cmd.c:49-75):
bool io_uring_try_cancel_uring_cmd(struct io_ring_ctx *ctx,
struct io_uring_task *tctx, bool cancel_all)
{
hlist_for_each_entry_safe(req, tmp, &ctx->cancelable_uring_cmd, hash_node) {
if (!cancel_all && req->tctx != tctx)
continue;
if (cmd->flags & IORING_URING_CMD_CANCELABLE) {
// 以 IO_URING_F_CANCEL 标志重新调用驱动
file->f_op->uring_cmd(cmd, IO_URING_F_CANCEL |
IO_URING_F_COMPLETE_DEFER);
}
}
io_submit_flush_completions(ctx);
return ret;
}当驱动异步完成时,调用 __io_uring_cmd_done()(uring_cmd.c:150):
void __io_uring_cmd_done(struct io_uring_cmd *ioucmd, s32 ret, u64 res2,
unsigned issue_flags, bool is_cqe32)
{
io_uring_cmd_del_cancelable(ioucmd, issue_flags); // 从哈希表移除
if (ret < 0)
req_set_fail(req);
io_req_set_res(req, ret, 0);
if (is_cqe32) {
// 32字节 CQE 支持(IORING_SETUP_CQE32)
if (req->ctx->flags & IORING_SETUP_CQE_MIXED)
req->cqe.flags |= IORING_CQE_F_32;
io_req_set_cqe32_extra(req, res2, 0);
}
io_req_uring_cleanup(req, issue_flags);
if (req->ctx->flags & IORING_SETUP_IOPOLL) {
// iopoll 模式:设置 iopoll_completed 标志,轮询时收割
smp_store_release(&req->iopoll_completed, 1);
} else if (issue_flags & IO_URING_F_COMPLETE_DEFER) {
io_req_complete_defer(req); // 批量延迟完成
} else {
req->io_task_work.func = io_req_task_complete;
io_req_task_work_add(req); // 调度到任务工作队列
}
}
EXPORT_SYMBOL_GPL(__io_uring_cmd_done);应用层:
struct nvme_passthru_cmd64 cmd = { ... };
sqe.opcode = IORING_OP_URING_CMD128;
sqe.fd = nvme_fd;
sqe.cmd_op = NVME_IOCTL_IO64_CMD;
memcpy(sqe.cmd, &cmd, sizeof(cmd));
io_uring_submit(ring);
驱动层(drivers/nvme/host/ioctl.c):
nvme_uring_cmd() 被调用
解析 ioucmd->sqe,提取 NVMe 命令
提交到 NVMe 队列
完成时调用 io_uring_cmd_done()
内核路径:
io_uring_cmd()
→ security_uring_cmd()
→ file->f_op->uring_cmd(ioucmd, flags) [nvme_uring_cmd]
→ nvme_submit_user_cmd()
→ -EIOCBQUEUED
→ 异步完成
→ __io_uring_cmd_done()
→ CQE 入队
传统 send() 需要将用户数据复制到内核 SKB(Socket Buffer)中,对于大数据包这是显著开销。零拷贝发送允许内核直接引用用户内存的物理页作为 SKB 的 frags,避免数据复制。但这带来了一个新问题:何时可以安全地重用发送缓冲区?
IORING_OP_SEND_ZC 通过双 CQE 解决"缓冲区何时可重用"的问题:
发送请求生命周期:
用户提交 SQE(SEND_ZC)
|
v
第一个 CQE(发送完成通知):
res = 发送的字节数
flags 无 IORING_CQE_F_NOTIF
[此时数据可能还在网络栈缓冲中,用户不能立即修改缓冲区]
|
v
网络栈完成数据传输,SKB frags 释放
|
v
第二个 CQE(缓冲区可释放通知):
flags 包含 IORING_CQE_F_NOTIF
res = 0(如无 REPORT_USAGE)或 IORING_NOTIF_USAGE_ZC_COPIED
[现在用户可以安全修改/释放发送缓冲区]
每个零拷贝发送请求都关联一个 notif(通知)对象(net.c:1342-1347):
notif = zc->notif = io_alloc_notif(ctx);
notif->cqe.user_data = req->cqe.user_data; // 与原请求相同的 user_data
notif->cqe.res = 0;
notif->cqe.flags = IORING_CQE_F_NOTIF; // 标记为通知 CQEnotif 在 SKB frags 的引用计数归零时,通过回调链触发:
skb_unref() / skb_frag_unref()
→ io_tx_ubuf_complete() / io_tx_ubuf_complete_frag()
→ io_notif_flush()
→ 将 notif 的 CQE 入队
固定缓冲区路径下,io_sg_from_iter()(net.c:1401)将 iov_iter(其内部是 bio_vec)中的物理页直接填充为 SKB frags,完全跳过数据复制:
static int io_sg_from_iter(struct sk_buff *skb,
struct iov_iter *from, size_t length)
{
struct skb_shared_info *shinfo = skb_shinfo(skb);
int frag = shinfo->nr_frags;
// 设置 SKBFL_MANAGED_FRAG_REFS 标志,由 io_uring 管理 frag 引用
shinfo->flags |= SKBFL_MANAGED_FRAG_REFS;
bi.bi_size = min(from->count, length);
while (bi.bi_size && frag < MAX_SKB_FRAGS) {
struct bio_vec v = mp_bvec_iter_bvec(from->bvec, bi);
// 直接将物理页填入 SKB frag,不复制数据
__skb_fill_page_desc_noacc(shinfo, frag++, v.bv_page,
v.bv_offset, v.bv_len);
skb->data_len += v.bv_len;
skb->len += v.bv_len;
...
}
}非固定缓冲区路径使用 io_sg_from_iter_iovec()(net.c:1394),它调用 zerocopy_fill_skb_from_iter(),后者通过 skb_zcopy_downgrade_managed() 使用标准的零拷贝机制。
io_send_zc_prep()(net.c:1326)根据是否使用固定缓冲区决定 sg_from_iter 函数指针:
int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
...
if (!(zc->flags & IORING_RECVSEND_FIXED_BUF)) {
// 普通用户缓冲区:需要预先 account 内存锁定
iomsg->msg.sg_from_iter = io_sg_from_iter_iovec;
return io_notif_account_mem(zc->notif, iomsg->msg.msg_iter.count);
}
// 固定缓冲区:不需要额外 account,直接使用 bvec
iomsg->msg.sg_from_iter = io_sg_from_iter;
return 0;
}固定缓冲区路径的优势在于:io_mapped_ubuf.bvec[] 已经是 bio_vec 格式,直接用于 SKB frags,零拷贝效率最高。
设置 IORING_SEND_ZC_REPORT_USAGE 标志后,通知 CQE 的 res 字段会携带实际是否进行了零拷贝的信息:
0:成功零拷贝,数据直接从用户页发送IORING_NOTIF_USAGE_ZC_COPIED(值为(1 << 31)):网络栈实际进行了数据复制(例如数据量太小、路径不支持),虽然数据已发送,但未真正零拷贝
这让应用可以在零拷贝不可用时自适应降级策略。
IORING_OP_SENDMSG_ZC 在 SEND_ZC 基础上支持 sendmsg() 语义(多个 iov、控制信息、目标地址),适用于 UDP 和需要 ancdata 的场景。io_sendmsg_zc()(net.c:1524)与 io_send_zc() 共享同一套双 CQE 模型和 notif 机制。
发送操作对比:
普通 SEND SEND_ZC(普通buf) SEND_ZC(固定buf)
数据复制 是 是* 否
零拷贝 否 否* 是
双 CQE 否 是 是
缓冲区通知 否 是 是
适用场景 所有 大于 64K 大于 64K + 固定buf
* 网络栈实际可能触发复制,通过 REPORT_USAGE 可知
传统 io_uring 是"一个 SQE 对应一个 CQE"的一对一模型。Multishot 打破这一限制:一个 SQE 可以产生多个 CQE,适用于持续性操作(如服务器持续接受连接、持续接收数据)。
普通模式(One-Shot):
SQE(accept) → CQE(fd=5) [完成,请求销毁]
需要再次提交 SQE(accept)
Multishot 模式:
SQE(accept, MULTISHOT) → CQE(fd=5, flags|=IORING_CQE_F_MORE)
→ CQE(fd=6, flags|=IORING_CQE_F_MORE)
→ CQE(fd=7, flags|=IORING_CQE_F_MORE)
→ ... (请求持续活跃)
当 flags 不含 CQE_F_MORE 时,表示请求已终止
IORING_CQE_F_MORE 在以下场景由内核设置:
- Multishot accept:每次 accept 成功的 CQE
- Multishot recv:每次接收成功的 CQE
- Multishot timeout:每次周期触发的 CQE
- Poll multishot:每次 poll 事件触发的 CQE
用户必须处理不含 IORING_CQE_F_MORE 的 CQE:这表示 multishot 请求已经终止(错误、资源耗尽、主动取消),需要重新提交 SQE 或放弃。
Accept multishot 是最常见的用例。设置 IORING_ACCEPT_MULTISHOT 后,一个 SQE 持续接受连接:
// net.c 中 accept 处理
if (accept->iou_flags & IORING_ACCEPT_MULTISHOT) {
// 设置 CQE_F_MORE,请求保持活跃
cflags |= IORING_CQE_F_MORE;
// 自动分配固定文件槽位(若 file_slot == IORING_FILE_INDEX_ALLOC)
if (accept->file_slot != IORING_FILE_INDEX_ALLOC)
...;
}配合 IORING_FILE_INDEX_ALLOC,每个新连接自动注册为固定文件,连接的 fd 索引通过 CQE res 返回。
Multishot recv 持续从 socket 接收数据,通常与 Buffer Ring 配合使用(每次接收自动选择一个空闲缓冲区):
公平性限制(net.c:109):
#define MULTISHOT_MAX_RETRY 32
每轮最多连续接收 32 次(nr_multishot_loops < MULTISHOT_MAX_RETRY),
超过后通过 IOU_REQUEUE 重入队,让出 CPU 给其他请求。
IOU_REQUEUE 是一个特殊返回码,表示请求需要重新排队而不是立即继续。这对于高吞吐场景至关重要,防止单个活跃 socket 独占所有处理资源。
Bundle 模式允许一个 SQE 在单轮中聚合多个接收操作,减少 CQE 数量。通过 IORING_RECV_MULTISHOT + IORING_RECVSEND_BUNDLE 标志启用:
普通 multishot recv(每包一个 CQE):
接收 pkt1 → CQE(len=1500, F_MORE)
接收 pkt2 → CQE(len=1400, F_MORE)
接收 pkt3 → CQE(len=1200, F_MORE)
[3 个 CQE,3 次用户态处理]
Bundle 模式(聚合多包):
接收 pkt1+pkt2+pkt3 → CQE(total=4100, F_MORE)
[1 个 CQE,用户从 Buffer Ring 中依次取出各包]
multishot 内核流控示意:
io_recv_mshot()
|
+---> 接收数据,投递 CQE(flags|CQE_F_MORE)
|
+---> nr_multishot_loops++
|
+---> if nr_multishot_loops >= MULTISHOT_MAX_RETRY:
| nr_multishot_loops = 0
| return IOU_REQUEUE ← 重入提交队列
|
+---> 继续尝试接收下一个包
|
+---> if -EAGAIN(socket 暂无数据):
注册 poll,等待 socket 可读事件
IOU_REQUEUE 让请求回到提交队列末尾,其他请求得以执行。这个公平性机制确保即使面对 DDoS 洪水流量,单一 socket 也不会阻塞整个 ring。
Multishot recv 需要在每次接收时自动选择一个空闲缓冲区。传统方式是用户态预先通过 IORING_OP_PROVIDE_BUFFERS 提交一组缓冲区,但这需要额外的系统调用。Buffer Ring(Provided Buffer Ring)是一种无锁的共享内存机制,允许用户态高效地补充缓冲区。
io_uring/kbuf.c 开头定义了关键结构:
// kbuf.c:21-24
#define MAX_BIDS_PER_BGID (1 << 16) // 每个组最多 65536 个缓冲区 ID
// 从环形缓冲区头部取第 head 个 buffer
#define io_ring_head_to_buf(br, head, mask) &(br)->bufs[(head) & (mask)]
struct io_buffer_list {
struct list_head buf_list; // 旧版 linked-list 模式(兼容)
struct io_uring_buf_ring *buf_ring; // 新版环形缓冲区指针
__u16 bgid; // Buffer Group ID
__u16 head; // 内核侧消费头(内核移动)
__u16 mask; // ring_entries - 1(用于取模)
__u16 flags; // IOBL_BUF_RING | IOBL_INC
int nr_entries; // 环的容量
struct io_region region; // mmap 区域描述
};
// 用户态写入的每个缓冲区描述符
struct io_uring_buf {
__u64 addr; // 用户内存地址
__u32 len; // 缓冲区长度
__u16 bid; // Buffer ID(完成时放入 CQE flags 高16位)
__u16 resv;
};
// 整个 buffer ring(用于 mmap 共享)
struct io_uring_buf_ring {
union {
struct {
__u64 resv1;
__u32 resv2;
__u16 resv3;
__u16 tail; // 用户态写入的 producer tail
};
struct io_uring_buf bufs[0]; // 缓冲区描述符数组
};
};io_register_pbuf_ring()(kbuf.c:608)注册一个 buffer ring:
int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
{
struct io_uring_buf_reg reg;
// 读取注册参数(bgid, ring_entries, ring_addr, flags)
copy_from_user(®, arg, sizeof(reg));
// 验证参数
if (!is_power_of_2(reg.ring_entries)) return -EINVAL;
if (reg.ring_entries >= 65536) return -EINVAL; // head/tail 是 u16
// 分配 io_buffer_list 结构
bl = kzalloc_obj(*bl, GFP_KERNEL_ACCOUNT);
// 计算 mmap 偏移(bgid 决定偏移,用于用户 mmap)
mmap_offset = (unsigned long)reg.bgid << IORING_OFF_PBUF_SHIFT;
ring_size = flex_array_size(br, bufs, reg.ring_entries);
// 创建共享内存区域(用户提供地址或内核分配后 mmap)
io_create_region(ctx, &bl->region, &rd, mmap_offset);
br = io_region_get_ptr(&bl->region);
bl->nr_entries = reg.ring_entries;
bl->mask = reg.ring_entries - 1;
bl->flags |= IOBL_BUF_RING;
if (reg.flags & IOU_PBUF_RING_INC)
bl->flags |= IOBL_INC; // 增量模式
// 注册到 XArray(按 bgid 索引)
io_buffer_add_list(ctx, bl, reg.bgid);
}内核消费缓冲区时(kbuf.c:188-219):
static struct io_br_sel io_ring_buffer_select(struct io_kiocb *req, size_t *len,
struct io_buffer_list *bl,
unsigned int issue_flags)
{
struct io_uring_buf_ring *br = bl->buf_ring;
__u16 tail, head = bl->head; // head 是内核侧消费游标
// smp_load_acquire 确保读取到最新的 tail(用户写入)
tail = smp_load_acquire(&br->tail);
if (unlikely(tail == head))
return sel; // 环为空,无可用缓冲区
// 取出缓冲区描述符
buf = io_ring_head_to_buf(br, head, bl->mask);
buf_len = READ_ONCE(buf->len);
if (*len == 0 || *len > buf_len)
*len = buf_len;
req->buf_index = READ_ONCE(buf->bid); // 将 bid 记录到请求,放入 CQE
sel.addr = u64_to_user_ptr(READ_ONCE(buf->addr));
// 提交消费(移动 head)
io_kbuf_commit(req, sel.buf_list, *len, 1);
}smp_load_acquire(&br->tail) 的作用:用户态通过 WRITE_ONCE(br->tail, new_tail) + 内存屏障写入 tail,内核侧必须用 smp_load_acquire 读取,确保 acquire-release 语义:看到 tail 更新时,tail 之前写入的缓冲区描述符也对内核可见。
Buffer group 通过 XArray(ctx->io_bl_xa)进行 O(log n) 查找(kbuf.c:74-93):
static inline struct io_buffer_list *io_buffer_get_list(struct io_ring_ctx *ctx,
unsigned int bgid)
{
lockdep_assert_held(&ctx->uring_lock);
return xa_load(&ctx->io_bl_xa, bgid); // XArray 查找
}
static int io_buffer_add_list(struct io_ring_ctx *ctx,
struct io_buffer_list *bl, unsigned int bgid)
{
bl->bgid = bgid;
guard(mutex)(&ctx->mmap_lock);
return xa_err(xa_store(&ctx->io_bl_xa, bgid, bl, GFP_KERNEL));
}XArray 相比哈希表的优点:支持范围查找、自动稀疏存储(不占用未使用的 bgid 空间)、无哈希冲突。
IOU_PBUF_RING_INC 标志启用增量消耗模式。在此模式下,一个缓冲区可以被部分消耗:
// kbuf.c:35-56
static bool io_kbuf_inc_commit(struct io_buffer_list *bl, int len)
{
while (len) {
buf = io_ring_head_to_buf(bl->buf_ring, bl->head, bl->mask);
buf_len = READ_ONCE(buf->len);
this_len = min_t(u32, len, buf_len);
buf_len -= this_len;
if (buf_len || !this_len) {
// 缓冲区未用完:更新地址和长度,停留在同一个 buf
WRITE_ONCE(buf->addr, READ_ONCE(buf->addr) + this_len);
WRITE_ONCE(buf->len, buf_len);
return false;
}
// 缓冲区用完:移动到下一个
WRITE_ONCE(buf->len, 0);
bl->head++;
len -= this_len;
}
return true;
}增量模式适用于流式协议(如 TCP),每次接收可能只消耗缓冲区的一部分,下次接收继续使用同一缓冲区的剩余空间。
用户态(生产者):
br->bufs[tail % mask] = {addr=0x10000, len=4096, bid=5}
smp_store_release(&br->tail, tail + 1)
┌─────────────────────────────────┐
│ io_uring_buf_ring │
│ tail=N ←── 用户写入 │
│ bufs[0] {addr,len,bid} │
│ bufs[1] {addr,len,bid} │
│ ... │
│ bufs[N-1] {addr,len,bid} │
└─────────────────────────────────┘
↑ kernel head=M(消费游标)
内核态(消费者):
tail = smp_load_acquire(&br->tail) // acquire 读
if tail == head: return ENOBUFS
buf = &bufs[head & mask]
head++ (记录在 bl->head,不写回 ring)
→ 将 buf->addr 作为接收缓冲区
→ 完成时,bid 放入 CQE.flags 高16位
基础 SQPOLL 原理(内核轮询线程自动消费 SQE)已在 09_io_uring.md 中介绍。本节聚焦高级场景:多 ring 共享 SQPOLL 线程、CPU 绑定、公平性限制。
通过 IORING_SETUP_ATTACH_WQ 标志,多个 io_uring 实例可以共享同一个 SQPOLL 线程(sqpoll.c:117-137):
static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p)
{
// 打开目标 ring 的 fd(通过 wq_fd 参数指定)
CLASS(fd, f)(p->wq_fd);
ctx_attach = fd_file(f)->private_data;
sqd = ctx_attach->sq_data;
// 安全检查:必须同进程(同 tgid)
if (sqd->task_tgid != current->tgid)
return ERR_PTR(-EPERM);
refcount_inc(&sqd->refs);
return sqd;
}共享后,多个 ring 通过 sqd->ctx_list 链表串联,SQPOLL 线程轮询所有 ring:
// sqpoll.c:344
cap_entries = !list_is_singular(&sqd->ctx_list); // 多 ring 时限制批次
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
__io_sq_thread(ctx, sqd, cap_entries, &ist);
}使用场景:多线程服务器每个线程独立 ring,但共享 SQPOLL 线程,节省一个内核线程的开销。
当多个 ring 共享 SQPOLL 线程时,cap_entries 为 true,__io_sq_thread() 限制每次提交(sqpoll.c:204-213):
static int __io_sq_thread(struct io_ring_ctx *ctx, struct io_sq_data *sqd,
bool cap_entries, struct io_sq_time *ist)
{
to_submit = io_sqring_entries(ctx);
// 多 ring 时:每轮每个 ring 最多提交 8 个 SQE
if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE; // = 8
...
}这确保单个 ring 的 SQE 爆发不会饿死其他 ring。8 这个值(IORING_SQPOLL_CAP_ENTRIES_VALUE)是经验值,在提交延迟和公平性间取得平衡。
SQPOLL 线程的 CPU 绑定在线程创建时(sqpoll.c:317-322):
if (sqd->sq_cpu != -1) {
// 用户指定了 CPU(通过 io_uring_params.sq_thread_cpu)
set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
} else {
// 不指定则允许在所有 CPU 运行
set_cpus_allowed_ptr(current, cpu_online_mask);
sqd->sq_cpu = raw_smp_processor_id(); // 记录当前 CPU
}NUMA 最佳实践:
- 将 SQPOLL 线程绑定在与 NIC 同一 NUMA 节点的 CPU
- 应用线程也绑定在同一 NUMA 节点
- I/O 目标设备(NVMe、NIC)在同一 NUMA 节点
- 这样 SQ/CQ 环内存、SQPOLL 线程、设备中断都在同一节点,避免跨 NUMA 内存访问
io_sq_thread()(sqpoll.c:293)是 SQPOLL 内核线程的主体:
io_sq_thread():
while (1):
1. 检查停止/park 信号
2. 遍历 ctx_list,调用 __io_sq_thread()
- 各 ring 消费 SQE,提交 I/O
3. if sqt_spin(有实际工作):
smp_mb() ← 写屏障,确保 SQE 读取不重排
继续轮询(忙等待)
4. if 空闲超过 sq_thread_idle 毫秒:
prepare_to_wait()
if 无 SQE:
schedule() ← 放弃 CPU,进入睡眠
finish_wait()
5. 收到唤醒(新 SQE 到来):
smp_mb() ← 读屏障,确保看到新 SQE
返回步骤 2
SQPOLL 的睡眠/唤醒涉及关键内存屏障:
SQPOLL 线程一侧(进入睡眠前):
// io_sqring_entries() 读取 SQ ring 的 tail
// 此读取之前需要内存屏障,确保看到最新的 SQE
smp_mb();
if (io_sqring_entries(ctx) == 0)
schedule();用户线程一侧(写入 SQE 后):
// 写入 SQ tail(通知新 SQE 就绪)
io_uring_smp_store_release(ring->sq.ktail, ...);
// 如果 SQPOLL 在睡眠,需要唤醒
if (READ_ONCE(ring->flags) & IORING_SQ_NEED_WAKEUP)
io_uring_enter(fd, ...); // 唤醒 SQPOLL 线程IORING_SQ_NEED_WAKEUP 标志是 SQPOLL 进入睡眠前设置的,用户通过 io_uring_enter() 的 IORING_ENTER_SQ_WAIT 标志唤醒。
Linux 内核引入了 BPF Filter 机制,允许用 BPF 程序对 io_uring SQE 进行安全策略过滤(类似 seccomp-bpf 对系统调用的过滤)。头文件定义在 include/uapi/linux/io_uring/bpf_filter.h:
// 传递给 BPF filter 程序的上下文
struct io_uring_bpf_ctx {
__u64 user_data; // SQE 的 user_data
__u8 opcode; // 操作码(IORING_OP_*)
__u8 sqe_flags; // IOSQE_* 标志
__u8 pdu_size; // 操作码特定数据大小
__u8 pad[5];
union {
struct { __u32 family, type, protocol; } socket; // socket 操作
struct { __u64 flags, mode, resolve; } open; // 文件打开操作
};
};
// 注册 BPF filter 的参数
struct io_uring_bpf_filter {
__u32 opcode; // 要过滤的 io_uring 操作码
__u32 flags; // IO_URING_BPF_FILTER_* 标志
__u32 filter_len; // BPF 指令数量
__u8 pdu_size; // 期望的 pdu 大小
__u8 resv[3];
__u64 filter_ptr; // 指向 BPF 指令数组
__u64 resv2[5];
};过滤标志:
IO_URING_BPF_FILTER_DENY_REST:将所有未注册 filter 的操作码默认拒绝IO_URING_BPF_FILTER_SZ_STRICT:严格检查 pdu_size,大小不匹配则注册失败
通过 io_uring_register() + IORING_REGISTER_BPF 操作码注册(内核正在开发中):
struct io_uring_bpf bpf_reg = {
.cmd_type = IO_URING_BPF_CMD_FILTER,
.filter = {
.opcode = IORING_OP_OPENAT,
.flags = IO_URING_BPF_FILTER_DENY_REST,
.filter_len = num_insns,
.filter_ptr = (uintptr_t)bpf_insns,
},
};
io_uring_register(ring_fd, IORING_REGISTER_BPF, &bpf_reg, 1);BPF filter 在 io_uring_cmd() 的安全检查路径(security_uring_cmd())之前执行,如果 filter 返回非零则拒绝该 SQE。
Linux 6.0+ 引入 BPF_MAP_TYPE_IOURINGFD map 类型,允许 BPF 程序持有对 io_uring ring 的引用,并通过 bpf_io_uring_queue_sqe() 助手函数直接向 ring 提交 SQE:
// BPF 程序视角(伪代码):
struct {
__uint(type, BPF_MAP_TYPE_IOURINGFD);
__uint(max_entries, 1);
} io_uring_map SEC(".maps");
int BPF_PROG(tracepoint_handler, ...)
{
struct io_uring_sqe sqe = {
.opcode = IORING_OP_WRITE,
.fd = log_fd,
.addr = (uintptr_t)buf,
.len = buf_len,
};
// BPF 程序直接提交 SQE
bpf_io_uring_queue_sqe(&io_uring_map, 0, &sqe, sizeof(sqe));
}场景1:网络事件驱动 I/O
XDP/tc BPF 程序
|
| 数据包到达
v
bpf_io_uring_queue_sqe()
|
v
io_uring ring(用户态进程)
|
v
高效异步处理(无需 epoll 唤醒)
场景2:安全审计过滤
io_uring BPF filter
|
| 检查每个 SQE 的 opcode/flags
v
允许:io_uring 正常执行
拒绝:返回 -EPERM 给用户
例:容器只允许 IORING_OP_READ/WRITE,
拒绝 IORING_OP_SOCKET 等危险操作
场景3:内核态 I/O 卸载
kprobe/tracepoint 触发 BPF 程序
|
v
直接通过 io_uring map 提交异步 I/O
(无需进程上下文,无系统调用开销)
| 维度 | io_uring | eBPF |
|---|---|---|
| 执行上下文 | 用户进程 | 内核态(中断/kprobe/网络栈) |
| 提交方式 | SQE 环 | bpf_helper 函数 |
| 完成通知 | CQE 环 | 无(写回 map 或 perf_event) |
| 适用场景 | 大量 I/O 密集型应用 | 事件驱动、策略控制 |
| 联合使用 | BPF 控制 io_uring 的准入和行为 | io_uring 执行 BPF 触发的 I/O |
io_uring_allowed()(io_uring.c:3079)在 io_uring_setup() 前检查权限:
static inline int io_uring_allowed(void)
{
int disabled = READ_ONCE(sysctl_io_uring_disabled); // 0/1/2
if (disabled == 2)
return -EPERM; // 级别2:完全禁用(所有用户)
if (disabled == 0 || capable(CAP_SYS_ADMIN))
goto allowed_lsm; // 级别0:完全允许,或 root 跳过组检查
// 级别1:只允许特定组
io_uring_group = make_kgid(&init_user_ns, sysctl_io_uring_group);
if (!gid_valid(io_uring_group))
return -EPERM;
if (!in_group_p(io_uring_group))
return -EPERM;
allowed_lsm:
return security_uring_allowed(); // LSM 最终裁决
}三个级别的含义(通过 /proc/sys/kernel/io_uring_disabled):
| 值 | 含义 | 典型场景 |
|---|---|---|
| 0 | 完全允许(默认) | 开发环境、信任环境 |
| 1 | 仅特定 GID 可用 | 多租户环境,配合 io_uring_group |
| 2 | 完全禁用 | 高安全生产环境,如使用 seccomp 的容器 |
/proc/sys/kernel/io_uring_group:允许的 GID(-1 表示不限组)。
io_uring 在关键路径上设置了 LSM(Linux Security Modules)钩子:
| 钩子函数 | 触发时机 | 位置 |
|---|---|---|
security_uring_allowed() |
io_uring_setup() 前 |
io_uring.c:3098 |
security_uring_cmd() |
IORING_OP_URING_CMD 执行前 |
uring_cmd.c:249 |
security_uring_sqpoll() |
SQPOLL 线程创建时 | sqpoll.c:465 |
security_uring_override_creds() |
凭据覆盖时(如 SQPOLL) | io_uring.c:1820 |
SELinux 和 AppArmor 均实现了这些钩子,可以细粒度控制:
- 哪些进程可以创建 io_uring 实例
- 哪些进程可以使用 SQPOLL
- 哪些驱动命令可以通过 URING_CMD 执行
IORING_SETUP_SINGLE_ISSUER(io_uring.c:2796)标志限制只有创建 ring 的线程(或其指定的继承线程)可以提交 SQE:
// io_uring.c:3021
if (ctx->flags & IORING_SETUP_SINGLE_ISSUER
&& ctx->submitter_task.task != current)
return -EACCES;用途:
- 消除多线程提交竞争,提高性能(无需 SQ 锁)
- 防止恶意/错误代码从其他线程提交 SQE
在设置了 SINGLE_ISSUER 的情况下,内核可以省去对提交者身份的运行时检查(io_uring.h:66):
// 这些标志下不需要 uring_lock
IORING_SETUP_SINGLE_ISSUER |
IORING_SETUP_SQPOLL |
...SQPOLL 线程代表用户执行 I/O,但运行在内核上下文。为了保持正确的 DAC(自主访问控制)语义,SQPOLL 线程使用创建 ring 时的凭据(sqpoll.c:220-221):
if (ctx->sq_creds != current_cred())
creds = override_creds(ctx->sq_creds);
// ... 执行 I/O
if (creds)
revert_creds(creds);这确保即使在 SQPOLL 内核线程中,文件权限检查使用的是原始用户的 UID/GID/capabilities,而非 root(SQPOLL 线程以 root 运行)。
Landlock(基于 eBPF 的沙箱机制)对 io_uring 有特殊处理注意事项:
已支持的场景:
- Landlock 规则通过 VFS 层执行,
IORING_OP_OPENAT/READ/WRITE等文件操作受 Landlock 约束 security_uring_allowed()由 Landlock 实现,可以在 io_uring 创建阶段拒绝
需要注意的场景:
- 固定文件:固定文件的
struct file *在注册时已通过权限检查。但如果 Landlock 规则在注册后修改(通过landlock_restrict_self()),已注册的固定文件不会被重新检查。这意味着 Landlock 的沙箱时机很重要,应在设置固定文件之前应用。 - SQPOLL:SQPOLL 线程的凭据检查通过
security_uring_sqpoll()执行,Landlock 实现可以在此拒绝 SQPOLL 创建。 - URING_CMD:每次 URING_CMD 执行前调用
security_uring_cmd(),允许 Landlock 对驱动命令进行细粒度控制。
推荐安全配置(容器化场景):
1. 使用 seccomp:
允许 io_uring_setup, io_uring_enter, io_uring_register
可以拒绝 io_uring_setup(禁止容器创建新 ring)
但允许 io_uring_enter/register(操作已有 ring)
2. sysctl 配置:
kernel.io_uring_disabled=1
kernel.io_uring_group=<container_gid>
3. Landlock 配置:
在固定文件注册前应用 Landlock 沙箱
通过 io_uring BPF filter 限制允许的操作码
4. IORING_SETUP_SINGLE_ISSUER:
防止容器内其他线程劫持 io_uring 提交
5. 禁用 SQPOLL(需要 CAP_SYS_NICE 或特权):
非特权容器不应使用 IORING_SETUP_SQPOLL
io_uring 的固定缓冲区和固定文件使用统一的 io_rsrc_node / io_rsrc_data 抽象:
io_ring_ctx
|
+-- buf_table (io_rsrc_data) // 固定缓冲区表
| nodes[0..N-1]: io_rsrc_node * → io_mapped_ubuf
|
+-- file_table (io_file_table) // 固定文件表
| data.nodes[0..M-1]: io_rsrc_node * → struct file *
| bitmap[0..(M/64)-1] // 稀疏位图
|
+-- node_cache (io_alloc_cache) // io_rsrc_node 对象池
+-- imu_cache (io_alloc_cache) // 小型 io_mapped_ubuf 对象池
+-- io_bl_xa (XArray) // Buffer Ring,按 bgid 索引
固定缓冲区状态机:
[未注册]
|
| IORING_REGISTER_BUFFERS
v
[已注册] ←──────────────────────┐
| │
| 请求引用(io_fixed_buf_select)│
v │
[使用中] │
| │ IORING_REGISTER_BUFFERS_UPDATE
| 请求完成(io_buffer_unmap) │ (原子替换)
v │
[引用归零]→ 释放 bvec/unpin ──────┘
|
| IORING_UNREGISTER_BUFFERS
v
[已释放]
IORING_REGISTER_BUFFERS2 使用 io_uring_rsrc_register 结构体,支持稀疏注册(IORING_RSRC_REGISTER_SPARSE):
struct io_uring_rsrc_register {
__u32 nr; // 注册数量
__u32 flags; // IORING_RSRC_REGISTER_SPARSE 等
__u64 resv2;
__u64 data; // 指向 iovec 数组
__u64 tags; // 指向 u64 tag 数组(可选)
};稀疏注册预留 N 个槽位但不实际注册缓冲区,后续通过 IORING_REGISTER_BUFFERS_UPDATE 填充,支持动态增减。
每个 io_rsrc_node 可以带一个用户自定义 tag(u64)。当资源被注销时,如果 tag 非零,io_uring 会通过 CQE 将 tag 值返回给用户(类似于文件关闭通知)。这用于资源泄漏检测和资源管理审计。
注销时(io_reset_rsrc_node()):
if (node->tag)
io_post_aux_cqe(ctx, node->tag, 0, 0); // tag 作为 CQE 的 user_data 返回IORING_REGISTER_BUFFERS_UPDATE(rsrc.c:276)支持在不中断其他操作的情况下更新部分槽位:
static int __io_sqe_buffers_update(struct io_ring_ctx *ctx,
struct io_uring_rsrc_update2 *up, ...)
{
// 可以更新 buf_table 中的任意子范围
for (done = 0; done < nr_args; done++) {
node = io_sqe_buffer_register(ctx, iov, &last_hpage);
i = array_index_nospec(up->offset + done, ctx->buf_table.nr);
io_reset_rsrc_node(ctx, &ctx->buf_table, i); // 释放旧节点
ctx->buf_table.nodes[i] = node; // 安装新节点
}
}原子性通过 uring_lock 互斥锁保证。正在使用中的节点(refs > 1)不会被立即释放,而是等引用归零后自然释放。
Buffer Ring 支持通过 mmap() 获取共享内存地址(IOU_PBUF_RING_MMAP 标志):
// 用户态
char *br_addr = mmap(NULL, ring_size,
PROT_READ | PROT_WRITE, MAP_SHARED,
ring_fd,
IORING_OFF_PBUF_RING | ((ulong)bgid << IORING_OFF_PBUF_SHIFT));
// br_addr 即 struct io_uring_buf_ring *
// 可直接写入 bufs[] 和更新 tailmmap 偏移编码:高位是 IORING_OFF_PBUF_RING(表示这是 buffer ring),低位是 bgid。io_buffer_add_list() 在注册时以相同编码存储区域,mmap 请求到来时内核通过偏移找到对应的 io_buffer_list。
| 资源类型 | 最大数量 | 相关常量 |
|---|---|---|
| 固定缓冲区 | 16,384(16K) | IORING_MAX_REG_BUFFERS = 1<<14 |
| 固定文件 | 1,048,576(1M) | IORING_MAX_FIXED_FILES = 1<<20 |
| Buffer Ring 数量 | 65,536(bgid u16) | MAX_BIDS_PER_BGID = 1<<16 |
| 每个 Buffer Ring 容量 | < 65,536 | head/tail 是 u16,满时无法区分 |
| imu_cache 缓存大小 | 全局 IO_ALLOC_CACHE_MAX |
避免频繁 malloc |
| 锁定内存 | RLIMIT_MEMLOCK |
per-user 限制 |
| 功能 | 源文件 | 关键函数 |
|---|---|---|
| 固定缓冲区/文件资源管理 | io_uring/rsrc.c |
io_sqe_buffer_register, io_check_coalesce_buffer, io_account_mem |
| 文件表管理 | io_uring/filetable.c |
io_file_bitmap_get, __io_fixed_fd_install |
| 超时操作 | io_uring/timeout.c |
io_timeout_fn, io_timeout_complete, io_flush_timeouts |
| 设备命令直通 | io_uring/uring_cmd.c |
io_uring_cmd, __io_uring_cmd_done, io_uring_cmd_mark_cancelable |
| 网络操作 | io_uring/net.c |
io_send_zc, io_sg_from_iter, io_send_zc_prep |
| Buffer Ring | io_uring/kbuf.c |
io_register_pbuf_ring, io_ring_buffer_select, io_kbuf_inc_commit |
| SQPOLL | io_uring/sqpoll.c |
io_sq_thread, io_attach_sq_data, __io_sq_thread |
| 安全控制 | io_uring/io_uring.c |
io_uring_allowed |
| BPF filter 定义 | include/uapi/linux/io_uring/bpf_filter.h |
io_uring_bpf_ctx, io_uring_bpf_filter |
固定缓冲区 ──────────────────────┐
↓ 最优路径 │
SEND_ZC(io_sg_from_iter) │
↓ 零拷贝 │
SKB frags │
│
Buffer Ring ──────────────────────┤
↓ 自动选择 │
RECV_MULTISHOT ──────────────────→ IOU_REQUEUE(公平)
↓ 持续接收 │
CQE(F_MORE) │
│
ACCEPT_MULTISHOT ─────────────────┤
↓ 持续 accept │
FILE_INDEX_ALLOC ─────────────────┘
↓ 自动分配固定文件槽
固定文件(跳过 fget/fput)
链式超时 ─────────────────────────┐
↓ 保护 │
任意操作(RECV/SEND/etc.) │
↓ 超时或完成 │
io_disarm_next / io_link_timeout_fn
高性能应用应按以下顺序初始化 io_uring 资源,避免中途 resize 导致的节点迁移:
1. io_uring_setup() → 创建 ring(确定 SQ/CQ 深度)
2. IORING_REGISTER_BUFFERS → 固定缓冲区(越早越好,避免页面碎片)
3. IORING_REGISTER_FILES → 固定文件(稀疏表可后续动态填充)
4. IORING_REGISTER_PBUF_RING → Buffer Ring(按需按 bgid 注册)
5. IORING_SETUP_SQPOLL → 如需 SQPOLL,在 setup 时就指定
调整 RLIMIT_MEMLOCK 来允许足够的锁定内存(固定缓冲区会消耗此配额):
ulimit -l unlimited # 开发环境
# 或在 /etc/security/limits.conf 中设置
* soft memlock 2097152 # 2 GB(单位 KB)
* hard memlock 2097152CQ ring 的深度默认是 SQ 深度的 2 倍(对于 multishot 操作应再加大)。若 CQ 溢出,内核设置 IORING_SQ_CQ_OVERFLOW 标志,未能入队的 CQE 计入 ctx->cq_overflow。
应用应在每轮 io_uring_enter() 后检查此标志,并通过 IORING_ENTER_GETEVENTS 及时收割 CQE 避免积压。对于高频 multishot 场景,建议 CQ 深度设为 SQ 深度的 4-8 倍。
IORING_SETUP_IOPOLL 启用轮询完成(不依赖中断),适合 NVMe 等低延迟设备。在此模式下:
- 不支持可取消 uring_cmd(hash_node 与 iopoll 数据结构重叠,
uring_cmd.c:113) - 用户需主动调用
io_uring_enter()触发完成收割 - 与 SQPOLL 组合时,SQPOLL 线程会代劳
io_do_iopoll()(sqpoll.c:224)
若怀疑固定缓冲区未正确释放,可检查:
# 查看进程的 locked_vm(单位:页)
cat /proc/<pid>/status | grep VmPin
# 查看 io_uring 上下文信息(需要 kernel >= 5.13)
cat /proc/<pid>/fdinfo/<ring_fd>/proc/<pid>/fdinfo/<ring_fd> 会输出 nr_user_files、nr_user_bufs 等计数器,可用于验证注册状态。
由 Claude Code 分析生成