基于 Linux Kernel 源码精确分析(含 litmus test 逐行解读) 覆盖路径:
kernel/locking/|include/linux/|include/asm-generic/|tools/memory-model/
- 内核并发来源与同步层次总览
- spinlock:自旋锁深度剖析
- mutex:互斥锁的快慢路径
- rwlock 与 rwsem:读写分离语义
- RCU:读-复制-更新机制
- seqlock:写优先的无锁读
- 原子操作与引用计数
- per-CPU 变量
- 内存屏障与内核内存模型(LKMM)
- 等待队列(wait queue)
- 完成量(completion)
- lockdep:死锁检测系统
- 同步原语选择指南
Linux 内核运行在多种并发场景下,理解并发的来源是选择同步手段的前提:
内核并发来源分类
┌─────────────────────────────────────────────────────────────┐
│ 内核并发来源 │
│ │
│ 1. 多核并行(SMP) │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ CPU0 │ │ CPU1 │ │ CPU2 │ │ CPU3 │ │
│ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ │
│ │ │ │ │ │
│ └──────────┴────┬─────┴──────────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ 共享内存 │ ← 数据竞争发生处 │
│ └─────────────┘ │
│ │
│ 2. 中断抢占(硬中断/NMI) │
│ 进程在内核态执行 → 硬件中断 → 中断处理函数访问共享数据 │
│ │
│ 3. 内核抢占(CONFIG_PREEMPT) │
│ 进程A在内核态 → 调度点 → 进程B抢占执行 │
│ │
│ 4. 软中断(softirq)/ tasklet │
│ 网络收包、块设备等下半部,可与进程并发执行 │
│ │
│ 5. 工作队列(workqueue) │
│ 内核线程异步执行,完全可调度 │
└─────────────────────────────────────────────────────────────┘
不同的并发场景需要不同粒度的同步原语。以下是内核中各种同步原语的层次与适用范围:
同步原语层次(从最轻量到最重量)
原子操作(atomic_t)
├── 无锁,硬件单指令
├── 适用:简单计数器、标志位
└── 不能保护复合数据结构
per-CPU 变量
├── 无需锁(CPU 私有)
├── 适用:统计计数、per-CPU 缓存
└── 禁止抢占期间使用
seqlock
├── 写优先,读者自旋重试
├── 适用:读多写少、小数据(时钟、路由缓存)
└── 不适用于含指针的数据
spinlock / raw_spinlock
├── 忙等待,持锁期间禁止抢占
├── 适用:临界区极短、不能睡眠
└── 用于中断上下文 / 持锁期间调用其他锁
rwlock(读写自旋锁)
├── 允许多个读者同时持有
├── 适用:读多写少 + 不能睡眠
└── 写者需等所有读者退出
RCU(Read-Copy-Update)
├── 读者无锁无等待,写者异步延迟释放
├── 适用:读极多写极少(路由表、进程列表)
└── 写者需处理宽限期,内存开销较大
mutex(互斥锁)
├── 可睡眠,公平性好
├── 适用:持锁时间较长、复杂数据结构
└── 不能用于中断上下文
rwsem(读写信号量)
├── 可睡眠的读写分离锁
├── 适用:mmap_lock、内核模块加载等
└── 写者独占,读者可并发
spinlock 在内核中有两个层次:raw_spinlock_t 和 spinlock_t,其区别对于实时内核(PREEMPT_RT)尤为重要。
raw_spinlock_t 结构体(include/linux/spinlock_types_raw.h:14):
context_lock_struct(raw_spinlock) {
arch_spinlock_t raw_lock; // 架构相关锁实现(qspinlock 或 ticket lock)
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu; // 调试:魔数 0xdead4ead
void *owner; // 持锁者指针
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map; // lockdep 依赖图节点
#endif
};
typedef struct raw_spinlock raw_spinlock_t;spinlock_t 结构体(include/linux/spinlock_types.h:17):
// 非 PREEMPT_RT 内核:spinlock 就是 raw_spinlock 的包装
context_lock_struct(spinlock) {
union {
struct raw_spinlock rlock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct {
u8 __padding[LOCK_PADSIZE];
struct lockdep_map dep_map;
};
#endif
};
};
typedef struct spinlock spinlock_t;
// PREEMPT_RT 内核:spinlock 映射到 rt_mutex(可抢占!)
context_lock_struct(spinlock) {
struct rt_mutex_base lock; // 实时互斥锁,允许优先级继承
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};为什么这样设计?
在实时内核(PREEMPT_RT)中,spinlock_t 被替换为 rt_mutex,这意味着:
- 普通
spin_lock()变成可睡眠操作 - 实现了优先级继承,避免优先级反转
- 持有
spinlock_t的代码路径可以被抢占
而 raw_spinlock_t 在所有配置下都是真正的自旋锁,用于那些绝对不能被抢占的临界代码(如调度器内部、中断控制器)。
raw_spinlock_t vs spinlock_t
CONFIG_PREEMPT_RT=n CONFIG_PREEMPT_RT=y
───────────────────── ──────────────────────
raw_spinlock_t → 真正自旋(忙等待) 真正自旋(忙等待)
spinlock_t → raw_spinlock 包装 rt_mutex(可睡眠)
规则:
- 绝对不能睡眠的代码 → raw_spinlock_t
- 一般临界区代码 → spinlock_t
- 中断上下文 → raw_spinlock_t 或 spin_lock_irq*
现代 x86/ARM64 内核使用 qspinlock(队列自旋锁),其核心数据结构定义于 include/asm-generic/qspinlock_types.h:14:
typedef struct qspinlock {
union {
atomic_t val; // 整个锁状态的原子值
#ifdef __LITTLE_ENDIAN
struct {
u8 locked; // 位 0-7:锁定字节(0=未锁,1=锁定)
u8 pending; // 位 8:挂起位(第2个等待者)
};
struct {
u16 locked_pending; // 位 0-15
u16 tail; // 位 16-31:等待队列尾指针
};
#endif
};
} arch_spinlock_t;32位 val 字段的位图布局(include/asm-generic/qspinlock_types.h:52):
val 字段位布局(NR_CPUS < 16K 时):
31 18 17 16 15 9 8 7 0
┌──────────┬─────┬────────┬───────────┐
│ tail_cpu │ idx │ 保留 │ pending │ locked │
│ (14位) │(2位)│ (6位) │ (1位) │ (8位) │
└──────────┴─────┴────────┴───────────┘
^ ^ ^ ^
等待队列CPU MCS节点索引 第2等待者 锁定状态
快路径与慢路径(include/asm-generic/qspinlock.h:107):
static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
int val = 0;
// 快路径:原子 CAS,val==0 时直接获得锁
if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL)))
return;
// 慢路径:有竞争,进入 MCS 队列等待
queued_spin_lock_slowpath(lock, val);
}三状态转换图:
qspinlock 状态机
锁空闲 (val=0)
│
spin_lock()
│
┌────────▼─────────┐
│ CAS(0→LOCKED) │
│ 快路径成功 │
└────────┬─────────┘
│ 成功
▼
持锁状态 (locked=1)
│
spin_unlock()
│
┌────────▼─────────┐
│ smp_store_release │
│ locked = 0 │
└────────┬─────────┘
│
▼
锁空闲 (val=0)
竞争发生时(已有持锁者):
│
┌────────▼─────────┐
│ 设置 pending=1 │ 第 2 个等待者使用 pending 位
│ 自旋等待 locked=0 │
└────────┬─────────┘
│ 超过 2 个等待者
┌────────▼─────────┐
│ 加入 MCS 等待队列 │ 使用 tail 字段指向队列尾
│ 自旋等待前驱释放 │
└──────────────────┘
为什么使用 MCS 锁?
传统 ticket spinlock 在大量 CPU 竞争同一锁时,每个 CPU 都在监视同一个内存位置,导致严重的缓存行抖动(cache line bouncing)。MCS 锁让每个等待者在自己私有的节点上自旋,只有前驱唤醒后继时才产生跨 CPU 的缓存通信,极大减少了总线流量。
spin_lock 变体对比
函数 作用 典型场景
────────────────────────────────────────────────────────────────
spin_lock(lock) 禁止内核抢占 普通内核代码
spin_lock_bh(lock) 禁抢占 + 禁软中断 网络协议栈
spin_lock_irq(lock) 禁抢占 + 禁本地中断 通用中断处理
spin_lock_irqsave(l,f) 禁抢占 + 禁中断 + 保存flags 嵌套中断场景
spin_lock_nested(l,sc) 同 spin_lock + lockdep子类 同类锁嵌套
为什么需要 spin_lock_irqsave?
如果不保存 flags,则在 spin_unlock_irq 时会无条件打开中断,但调用者可能本来就处于关中断状态(如另一个中断处理函数)。irqsave 版本保存调用前的 EFLAGS.IF,irqrestore 时原样恢复,正确处理了嵌套关中断的情况。
每个锁变量都绑定一个 lock_class_key,lockdep 通过静态变量确保相同代码位置的锁共享同一个类(include/linux/spinlock_types_raw.h:31):
// RAW_SPIN_DEP_MAP_INIT 宏示例
# define RAW_SPIN_DEP_MAP_INIT(lockname) \
.dep_map = { \
.name = #lockname, \
.wait_type_inner = LD_WAIT_SPIN, \ // 自旋锁不允许在内部睡眠
}
// SPIN_DEP_MAP_INIT 对应普通 spinlock_t
# define SPIN_DEP_MAP_INIT(lockname) \
.dep_map = { \
.name = #lockname, \
.wait_type_inner = LD_WAIT_CONFIG, // PREEMPT_RT 下可配置
}struct mutex 的核心结构定义于 include/linux/mutex_types.h:41:
context_lock_struct(mutex) {
atomic_long_t owner; // 持锁者 task_struct 指针 | 状态标志位
raw_spinlock_t wait_lock; // 保护 wait_list 的自旋锁
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; // 乐观自旋 MCS 队列
#endif
struct list_head wait_list; // 睡眠等待者链表
#ifdef CONFIG_DEBUG_MUTEXES
void *magic; // 调试魔数
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map; // lockdep 节点
#endif
};owner 字段的双重用途(include/linux/mutex.h:80):
#define __MUTEX_INITIALIZER(lockname) \
{ .owner = ATOMIC_LONG_INIT(0) \ // 0 表示未持有
, .wait_lock = __RAW_SPIN_LOCK_UNLOCKED(lockname.wait_lock) \
, .wait_list = LIST_HEAD_INIT(lockname.wait_list) \
... }owner 字段同时编码了:
- 高位:持锁者
task_struct *(地址按 8 字节对齐,低 3 位为 0) - 低 3 位:
MUTEX_FLAG_WAITERS(0x1)、MUTEX_FLAG_HANDOFF(0x2)、MUTEX_FLAG_PICKUP(0x4)
owner 字段位编码:
63 3 2 1 0
┌──────────────────────────────┬──┬──────┬──────┐
│ task_struct 指针(高61位) │ P │ H │ W │
└──────────────────────────────┴──┴──────┴──────┘
^ ^ ^
│ │ └── WAITERS:有等待者
│ └──────── HANDOFF:需要移交
└───────────── PICKUP:等待接管
mutex_lock() 执行路径
调用 mutex_lock()
│
▼
┌───────────────────────────────────┐
│ 快路径(Fast Path) │
│ atomic_long_try_cmpxchg(owner, │
│ 0 → current) │
│ 成功?→ 直接持有,返回 │
└────────────┬──────────────────────┘
│ 失败(已有持锁者)
▼
┌───────────────────────────────────┐
│ 中间路径(MidPath / 乐观自旋) │
│ CONFIG_MUTEX_SPIN_ON_OWNER │
│ │
│ 如果持锁者正在 CPU 上运行: │
│ → 在 OSQ(Optimistic Spin │
│ Queue)上自旋等待 │
│ → 持锁者让出 CPU 时停止自旋 │
│ → 成功获锁则返回 │
└────────────┬──────────────────────┘
│ 持锁者不在 CPU 上 / 超时
▼
┌───────────────────────────────────┐
│ 慢路径(Slow Path) │
│ 1. spin_lock(&mutex->wait_lock) │
│ 2. 将自身加入 wait_list │
│ 3. spin_unlock(&mutex->wait_lock) │
│ 4. set_current_state(TASK_UNINT..)│
│ 5. schedule() ← 真正睡眠 │
│ 6. 被 mutex_unlock() 唤醒 │
│ 7. 重新检查 owner,继续竞争 │
└───────────────────────────────────┘
乐观自旋是 mutex 性能优化的关键。其核心思路:
假设:如果当前锁持有者正在某个 CPU 上运行,那么它很快就会释放锁,此时与其立刻睡眠(消耗两次上下文切换),不如先自旋等一会儿。
实现(include/linux/osq_lock.h):osq(Optimistic Spin Queue)是一个 MCS 风格的锁,让多个乐观自旋者排队,每人只监视前驱的状态,避免缓存抖动。
乐观自旋决策树
mutex_lock_optimistic():
while (true) {
if (mutex_can_spin_on_owner(lock)) {
// 持锁者仍在 CPU 上 → 继续自旋
if (try_fast_lock())
return SUCCESS
} else {
// 持锁者已离开 CPU → 放弃自旋
break → 进入慢路径睡眠
}
if (need_resched() || !osq_is_locked())
break → 有更紧急任务,放弃自旋
}
选择 mutex 还是 spinlock?
临界区中可以睡眠?
┌─────────────────┐
│ │
是 否
│ │
▼ ▼
mutex spinlock
↓ ↓
持锁时间较长? 在中断上下文中?
┌──────────┐ ┌──────────────┐
是 否 是 否
│ │ │ │
mutex mutex spinlock + spinlock
但考虑 irqsave 即可
rwsem
规则总结:
1. 中断上下文(hardirq / softirq)→ 必须用 spinlock
2. 持锁时间超过几微秒 → 考虑 mutex
3. 读多写少 → 考虑 rwsem 或 RCU
4. 统计计数 → per-CPU 变量或 atomic_t
rwlock_t 定义于 include/linux/rwlock_types.h:25:
// 非 PREEMPT_RT
context_lock_struct(rwlock) {
arch_rwlock_t raw_lock; // 架构相关实现(如 x86 的 32位原子计数)
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu;
void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
typedef struct rwlock rwlock_t;
// PREEMPT_RT:rwlock 映射到 rwbase_rt(基于 rtmutex)
context_lock_struct(rwlock) {
struct rwbase_rt rwbase;
atomic_t readers; // 当前读者计数
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};x86 架构的 rwlock 实现原理:
x86 rwlock 计数语义(32位整数):
写锁: -1 (0xFFFFFFFF)
未持锁: 0
1个读者: +1
2个读者: +2
...
读者加锁: atomic_add(+1),检查是否 < 0(表示有写者)
写者加锁: atomic_sub(RW_LOCK_BIAS),其中 RW_LOCK_BIAS = 0x01000000
然后自旋等待计数降为 -RW_LOCK_BIAS(即全部读者退出)
struct rw_semaphore 定义于 include/linux/rwsem.h:48:
context_lock_struct(rw_semaphore) {
atomic_long_t count; // 读写者计数,编码状态信息
atomic_long_t owner; // 写者或读者信息(含优化标志)
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
struct optimistic_spin_queue osq; // 乐观自旋队列(同 mutex)
#endif
raw_spinlock_t wait_lock; // 保护 wait_list 的内部锁
struct list_head wait_list; // 等待者链表
#ifdef CONFIG_DEBUG_RWSEMS
void *magic;
#endif
};count 字段编码(include/linux/rwsem.h:69):
#define RWSEM_UNLOCKED_VALUE 0UL
#define RWSEM_WRITER_LOCKED (1UL << 0) // bit0:写者锁定标志count 字段的完整语义:
rw_semaphore count 字段:
bit 0: WRITER_LOCKED(写者持有锁)
bit 1: WRITER_PARK(写者等待停车)
bit 2-n: 读者数量(每个读者 +RWSEM_READER_BIAS)
状态示例:
0 → 无人持锁
1 → 写者持锁
READER_BIAS → 1个读者
2*READER_BIAS → 2个读者
1|READER_BIAS → 不合法(写者+读者同时,仅过渡态)
down_read() 执行流程:
调用 down_read(sem)
│
▼
┌──────────────────────────────┐
│ 快路径: │
│ atomic_long_try_cmpxchg( │
│ count: 0 → READER_BIAS) │
│ 成功 → 持有读锁,返回 │
└──────────────┬───────────────┘
│ 失败(有写者或写等待者)
▼
┌──────────────────────────────┐
│ 慢路径: │
│ 1. 乐观自旋(若写者在CPU上) │
│ 2. 加入 wait_list(读者队列) │
│ 3. set_state(TASK_UNINT..) │
│ 4. schedule() 睡眠 │
│ 5. 写者 up_write() 后唤醒 │
└──────────────────────────────┘
down_write() 执行流程:
调用 down_write(sem)
│
▼
┌──────────────────────────────┐
│ 快路径: │
│ CAS(count: 0 → WRITER_LOCKED)│
│ 成功 → 持有写锁,返回 │
└──────────────┬───────────────┘
│ 失败(有读者或其他写者)
▼
┌──────────────────────────────┐
│ 慢路径(排他等待): │
│ 1. 加入 wait_list(写者队列) │
│ 2. set_state(TASK_UNINT..) │
│ 3. schedule() 睡眠 │
│ 4. 所有读者 up_read() 后唤醒 │
└──────────────────────────────┘
Linux rwsem 实现的是写者优先(writer-preferred)策略:
- 写等待者到达后,新读者不能插队,必须排队等候
- 避免写者饥饿(starvation)
- 缺点:连续的写请求会阻塞所有读者
相关 API(include/linux/rwsem.h:229):
extern void down_read(struct rw_semaphore *sem) __acquires_shared(sem);
extern void down_write(struct rw_semaphore *sem) __acquires(sem);
extern void up_read(struct rw_semaphore *sem) __releases_shared(sem);
extern void up_write(struct rw_semaphore *sem) __releases(sem);
// 写锁降级为读锁(原子操作,不释放再获取)
extern void downgrade_write(struct rw_semaphore *sem)
__releases(sem) __acquires_shared(sem);RCU(Read-Copy-Update)是 Linux 内核中最具创新性的同步原语,由 Paul McKenney 主导开发。其核心思路:
RCU 核心思想
传统锁的问题:
读者也需要加锁 → 读者之间互相阻塞 → 高并发下性能差
RCU 的解决方案:
读者:完全无锁,仅依赖内存屏障
写者:不锁定读者,而是"发布"新版本
旧版本延迟释放(等所有读者离开后)
关键洞察:
如果写者保证在替换指针时使用 release 语义,
读者使用 acquire 语义读取指针,
那么读者要么看到旧版本(完整),要么看到新版本(完整),
永远不会看到"撕裂"的中间状态。
include/linux/rcupdate.h:845:
static __always_inline void rcu_read_lock(void)
__acquires_shared(RCU)
{
__rcu_read_lock(); // 根据配置展开:
// 非抢占内核 → preempt_disable()
// 抢占内核 → current->rcu_read_lock_nesting++
__acquire_shared(RCU); // lockdep 注解
rcu_lock_acquire(&rcu_lock_map); // lockdep 追踪
RCU_LOCKDEP_WARN(!rcu_is_watching(), ...);
}非抢占内核下的实现(include/linux/rcupdate.h:101):
static inline void __rcu_read_lock(void)
{
preempt_disable(); // 关闭抢占 = 建立 RCU 读临界区
}
static inline void __rcu_read_unlock(void)
{
if (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD))
rcu_read_unlock_strict();
preempt_enable(); // 开启抢占 = 退出 RCU 读临界区
}为什么禁止抢占等价于 RCU 读锁?
RCU 宽限期的基础:静止状态(Quiescent State)
对于非抢占 RCU:
每次上下文切换都是一个静止状态
→ 每个 CPU 至少发生一次上下文切换 = 一个宽限期结束
因此:
rcu_read_lock() = preempt_disable()
(禁止当前 CPU 发生上下文切换)
宽限期 = 等待所有 CPU 各发生一次上下文切换
对于抢占 RCU(PREEMPT_RCU):
使用 nesting 计数器追踪嵌套深度
抢占发生时,RCU 临界区可以迁移
引入了 rcu_read_lock_nesting 来追踪
这对操作是 RCU 发布-订阅(publish-subscribe)模式的核心(include/linux/rcupdate.h:570):
// 发布:写者使用
#define rcu_assign_pointer(p, v) \
context_unsafe( \
uintptr_t _r_a_p__v = (uintptr_t)(v); \
rcu_check_sparse(p, __rcu); \
\
if (__builtin_constant_p(v) && (_r_a_p__v) == 0) \
WRITE_ONCE((p), (typeof(p))(_r_a_p__v)); \
else \
smp_store_release(&p, \ // release 语义!
RCU_INITIALIZER((typeof(p))_r_a_p__v)); \
)
// 订阅:读者使用
#define rcu_dereference(p) rcu_dereference_check(p, 0)
#define __rcu_dereference_check(p, local, c, space) \
({ \
typeof(*p) *local = (typeof(*p) *__force)READ_ONCE(p); \ // acquire 语义隐含于此
RCU_LOCKDEP_WARN(!(c), "suspicious rcu_dereference_check()"); \
rcu_check_sparse(p, space); \
((typeof(*p) __force __kernel *)(local)); \
})发布-订阅内存序保证:
发布端(写者): 订阅端(读者):
// 初始化新节点 rcu_read_lock()
new->data = 42;
new->next = old->next; // READ_ONCE 确保不被编译器合并
p = rcu_dereference(global_ptr)
// smp_store_release 确保 // ACQUIRE 屏障:保证看到 data/next
// data/next 初始化对读者可见 if (p) use(p->data) // 安全!
rcu_assign_pointer(global_ptr, new)
rcu_read_unlock()
宽限期(Grace Period)机制
时间轴
──────────────────────────────────────────→
写者操作: │ 替换指针 │ call_rcu(old, free_fn) │
│ │ │
│ │<──── 宽限期 ────────────>│
│ │ │
│ │ 等待所有现有读者退出 │
│ │ │
│ │ free_fn(old) ──────>│ 释放旧对象
两种释放方式:
synchronize_rcu():
同步等待宽限期结束(阻塞当前线程)
适用:不在中断/原子上下文,希望立即释放
call_rcu(head, func):
异步注册回调(不阻塞)
适用:中断上下文、性能敏感路径
回调在宽限期结束后由 RCU 工作线程执行
kfree_rcu(ptr, rhf):
call_rcu 的便捷封装,自动调用 kfree
include/linux/rcupdate.h:1085
include/linux/rcupdate.h:51:
// 导出的核心接口
void call_rcu(struct rcu_head *head, rcu_callback_t func);
void synchronize_rcu(void);
void rcu_barrier(void); // 等待所有已提交的 call_rcu 回调执行完毕对于大型 SMP 系统,"等待所有 CPU 经历一次静止状态"需要高效的追踪机制,Tree RCU 用树形结构分层聚合各 CPU 的静止状态信息:
Tree RCU 层次结构(假设 16 个 CPU)
Level 2 (根): [rcu_node 0]
/ \
Level 1: [rcu_node 0] [rcu_node 1]
/ \ / \
Level 0: [n0] [n1] [n2] [n3] ← rcu_node(叶节点)
/\ /\ /\ /\
Level CPUs: C0 C1 C2 C3 C4 C5 C6 C7 ... ← 各 CPU
宽限期流程:
1. 写者调用 call_rcu() → 注册到当前 CPU 的待处理链表
2. RCU kthread 启动新宽限期
3. 每个 CPU 经历静止状态后,更新叶节点的 qsmask
4. 叶节点全部报告 → 父节点标记 → 逐层上报 → 根节点宽限期结束
5. 触发所有注册的回调函数
为什么用树形结构? 若所有 CPU 都更新同一个全局变量,会产生严重的缓存行竞争。树形结构将聚合操作分散到多个 rcu_node,每个节点只有少量 CPU 竞争,极大降低了同步开销。
普通 RCU 的读临界区不允许睡眠,SRCU(Sleepable RCU)允许读者睡眠:
// SRCU 使用方式
struct srcu_struct my_srcu;
// 读者(可以在持锁期间睡眠)
int idx = srcu_read_lock(&my_srcu);
// ... 可以睡眠 ...
srcu_read_unlock(&my_srcu, idx);
// 写者
synchronize_srcu(&my_srcu); // 等待持有 my_srcu 读锁的所有读者退出SRCU 的代价是:每个 SRCU 域需要维护 per-CPU 计数器,且宽限期实现更复杂。
// 全局 RCU 保护的链表头
static LIST_HEAD(my_list);
// 读者:遍历链表
rcu_read_lock();
list_for_each_entry_rcu(entry, &my_list, node) {
// 安全访问 entry,指针不会消失
process(entry);
}
rcu_read_unlock();
// 写者:删除元素
spin_lock(&list_lock);
list_del_rcu(&entry->node); // 从链表摘除,但不立即释放
spin_unlock(&list_lock);
// 等待所有读者退出后再释放
call_rcu(&entry->rcu_head, free_entry);
// 写者:添加元素
new_entry = kmalloc(...);
new_entry->data = value;
spin_lock(&list_lock);
list_add_rcu(&new_entry->node, &my_list); // smp_wmb + WRITE_ONCE
spin_unlock(&list_lock);seqlock 适用于以下场景:
- 读操作远多于写操作
- 数据结构简单,可以复制(不含指针,避免在读重试期间指针被释放)
- 写者不能被读者阻塞(写优先)
典型应用:jiffies_64(时钟读取)、timekeeper(系统时间)、路由缓存版本号。
seqlock 读写协议
写者: 读者:
write_seqlock(&sl) do {
// 获得内部自旋锁 seq = read_seqbegin(&sl)
// 递增序号(奇数 = 写入中) // 序号为奇数?→ 写者正在写,重试
sl->seqcount.sequence++
// 读取数据(可能被写者修改)
data.x = new_x; local_x = data.x;
data.y = new_y; local_y = data.y;
write_sequnlock(&sl) } while (read_seqretry(&sl, seq))
// 再次递增序号(偶数 = 写完) // 序号变化?→ 数据被修改,重试
// 释放自旋锁
seqlock 状态:
偶数序号:稳定状态(无写者)
奇数序号:写入进行中
为什么读者不阻塞写者? 读者完全不修改任何共享状态,只是读取序号和数据。写者只需要拿到内部 spinlock(排斥其他写者),完全不需要等待读者。
include/linux/seqlock.h:42:
static inline void __seqcount_init(seqcount_t *s, const char *name,
struct lock_class_key *key)
{
lockdep_init_map(&s->dep_map, name, key, 0);
s->sequence = 0; // 初始化为 0(偶数,表示无写者)
}seqcount_t 与 seqlock_t 的区别:
seqcount_t:只有计数器,写者同步需要外部提供(通常是 spinlock 或 mutex)seqlock_t:内置 spinlock,是 seqcount_t + spinlock 的组合
seqlock 适用性矩阵
数据含指针? 写频率高? 需要睡眠? 推荐方案
────────────────────────────────────────────────────────────────
时间戳/计数器 否 否 否 seqlock ✓
路由缓存版本号 否 否 否 seqlock ✓
进程列表 是 否 否 RCU ✓
文件系统元数据 是 是 是 rwsem ✓
网络设备列表 是 否 否 RCU ✓
include/linux/atomic.h:27 说明了四种内存序变体:
// 四种内存序变体(以 atomic_add_return 为例):
atomic_add_return(i, v) // 完全有序(Full Order)
atomic_add_return_acquire(i, v) // Acquire 语义
atomic_add_return_release(i, v) // Release 语义
atomic_add_return_relaxed(i, v) // 无序(Relaxed)构建机制(include/linux/atomic.h:58):
// acquire 变体 = relaxed 变体 + acquire fence
#define __atomic_op_acquire(op, args...) \
({ \
typeof(op##_relaxed(args)) __ret = op##_relaxed(args); \
__atomic_acquire_fence(); // smp_mb__after_atomic \
__ret; \
})
// release 变体 = release fence + relaxed 变体
#define __atomic_op_release(op, args...) \
({ \
__atomic_release_fence(); // smp_mb__before_atomic \
op##_relaxed(args); \
})常用原子操作:
atomic_t 操作总览
基础操作:
atomic_read(v) → 读取当前值(READ_ONCE)
atomic_set(v, i) → 设置值(WRITE_ONCE)
atomic_add(i, v) → 加法,无返回值
atomic_sub(i, v) → 减法,无返回值
atomic_inc(v) → 自增,无返回值
atomic_dec(v) → 自减,无返回值
返回新值:
atomic_add_return(i, v) → 加法并返回新值
atomic_sub_return(i, v) → 减法并返回新值
atomic_inc_return(v) → 自增并返回新值
atomic_dec_return(v) → 自减并返回新值
测试操作:
atomic_dec_and_test(v) → 自减,若结果为0返回true
atomic_inc_and_test(v) → 自增,若结果为0返回true
atomic_add_negative(i, v) → 加法,若结果为负返回true
交换操作:
atomic_xchg(v, new) → 原子交换,返回旧值
atomic_cmpxchg(v, old, new) → CAS,返回旧值
atomic_try_cmpxchg(v, *old, new) → CAS,成功返回true
include/linux/refcount.h 中的 refcount_t 是专为引用计数设计的原子类型,比 atomic_t 更安全(include/linux/refcount.h:22):
refcount_t 与 atomic_t 的关键区别
0 INT_MAX REFCOUNT_SATURATED UINT_MAX
+──────────────────────────+─────────────────+──────────────────+
│ 正常引用计数范围 │ │ "坏值"范围 │
│ │ │ (饱和后锁定) │
└──────────────────────────┘ └──────────────────┘
0xC000_0000
问题:atomic_t 允许溢出/下溢,可能被用于 use-after-free 攻击
refcount_t 的保护:
1. 溢出保护:若计数溢出,饱和到 REFCOUNT_SATURATED(0xC0000000),
此后无法变为 0,防止错误释放
2. 下溢检测:若从 0 减少,触发 WARN_ON
关键操作:
refcount_inc(&ref) // 安全的引用获取
refcount_dec_and_test(&ref) // 安全的引用释放(返回 true 表示可释放)
refcount_inc_not_zero(&ref) // 原子地从非零值增加(RCU 配合使用)
per-CPU 变量的核心思路是:为每个 CPU 分配独立的变量副本,消除多 CPU 之间的缓存一致性开销。
per-CPU 变量内存布局
物理内存:
┌────────────────┬────────────────┬────────────────┐
│ CPU0 区域 │ CPU1 区域 │ CPU2 区域 │
│ var@CPU0 │ var@CPU1 │ var@CPU2 │
└────────────────┴────────────────┴────────────────┘
↑ ↑ ↑
per_cpu_offset[0] per_cpu_offset[1] per_cpu_offset[2]
每个 CPU 只访问自己的副本:
CPU0: this_cpu_read(var) = var@CPU0
CPU1: this_cpu_read(var) = var@CPU1
优势:
- 无锁访问(无缓存行竞争)
- 无 NUMA 节点之间的流量
- 避免原子操作
include/linux/percpu-defs.h:113:
// 声明和定义
#define DECLARE_PER_CPU(type, name) \
DECLARE_PER_CPU_SECTION(type, name, "")
#define DEFINE_PER_CPU(type, name) \
DEFINE_PER_CPU_SECTION(type, name, "")
// 展开为:
// __attribute__((section(".data..percpu"))) type name
// 对齐变体(避免 false sharing):
#define DEFINE_PER_CPU_SHARED_ALIGNED(type, name) \
DEFINE_PER_CPU_SECTION(type, name, PER_CPU_SHARED_ALIGNED_SECTION) \
____cacheline_aligned_in_smp // SMP 下按缓存行对齐
// 热数据变体(常驻 L1 cache):
#define DEFINE_PER_CPU_CACHE_HOT(type, name) \
DEFINE_PER_CPU_SECTION(type, name, "..hot.." #name)include/linux/percpu-defs.h:279:
// get_cpu_var:禁止抢占,返回当前 CPU 变量引用
#define get_cpu_var(var) \
(*({ \
preempt_disable(); \
this_cpu_ptr(&var); \
}))
// put_cpu_var:重新启用抢占
#define put_cpu_var(var) \
do { \
(void)&(var); \
preempt_enable(); \
} while (0)三类访问接口(include/linux/percpu-defs.h:412):
per-CPU 访问接口对比
接口前缀 抢占检查 中断安全 适用场景
──────────────────────────────────────────────────
raw_cpu_* 否 否 已知禁抢占 + 禁中断
__this_cpu_* 有检查 否 已知禁抢占(Debug 版)
this_cpu_* 隐含 否 通用(自动禁抢占)
get/put_cpu_var 显式 否 需要持有引用的场景
per_cpu(v, cpu) 无(任意CPU)否 汇总统计(如读取所有CPU)
// 定义 per-CPU 计数器
DEFINE_PER_CPU(long, my_counter);
// 快速更新(无锁)
void increment_counter(void)
{
this_cpu_inc(my_counter); // 相当于 ++counter[current_cpu]
}
// 读取全局总和(慢速汇总)
long get_total(void)
{
long total = 0;
int cpu;
for_each_possible_cpu(cpu)
total += per_cpu(my_counter, cpu);
return total;
}这种模式的优势:
- 写操作:O(1) 无锁,无缓存竞争
- 读操作:O(NR_CPUS),但读取频率极低
典型应用:vm_stat(内存统计)、网络计数器、调度统计等。
现代 CPU 和编译器为了性能会重排指令执行顺序,这在单 CPU 上等价,但在多 CPU 并发时可能导致不一致:
内存重排的来源
1. 编译器重排
C 代码: 编译后可能:
a = 1; b = 2; // 编译器认为等价,可以重排
b = 2; a = 1;
2. CPU 乱序执行
CPU 内部有多个执行单元,可以并行执行无依赖的指令
3. 存储缓冲区(Store Buffer)
写操作先进入 store buffer,延迟写入 L1 cache
其他 CPU 可能读到旧值
4. 加载队列(Load Queue)
读操作可能从预取缓冲读到旧值
不同架构的内存模型强度:
x86 (TSO):最强,仅允许 store-load 重排
ARM/POWER:最弱,允许几乎所有重排
RISC-V:类似 ARM,弱序模型
include/asm-generic/barrier.h 定义了完整的屏障层次(barrier.h:30):
// 全内存屏障(Full Memory Barrier)
// 保证此前所有读写操作对其他CPU可见,再执行此后操作
#ifdef __mb
#define mb() do { kcsan_mb(); __mb(); } while (0) // barrier.h:30
#endif
// 读屏障(Read Barrier)
// 保证此前所有读操作完成,再执行此后读操作
#ifdef __rmb
#define rmb() do { kcsan_rmb(); __rmb(); } while (0) // barrier.h:34
#endif
// 写屏障(Write Barrier)
// 保证此前所有写操作对其他CPU可见,再执行此后写操作
#ifdef __wmb
#define wmb() do { kcsan_wmb(); __wmb(); } while (0) // barrier.h:38
#endif
// SMP 相关屏障(单CPU时退化为编译器屏障)
// barrier.h:99
#define smp_mb() do { kcsan_mb(); __smp_mb(); } while (0)
#define smp_rmb() do { kcsan_rmb(); __smp_rmb(); } while (0)
#define smp_wmb() do { kcsan_wmb(); __smp_wmb(); } while (0)屏障强度层次:
内核内存屏障强度(从强到弱)
mb() / rmb() / wmb()
└─ 硬件级别屏障,对 DMA / 设备内存也有效
└─ 适用:与硬件设备交互
smp_mb() / smp_rmb() / smp_wmb()
└─ SMP 屏障,单 CPU 退化为编译器屏障
└─ 适用:CPU 间同步
smp_store_release(p, v) / smp_load_acquire(p)
└─ 单向屏障,语义精确,性能最优
└─ 适用:锁的实现、生产者消费者
WRITE_ONCE(p, v) / READ_ONCE(p)
└─ 仅防编译器重排(无 CPU 屏障)
└─ 适用:防止编译器优化、文档标注
barrier()
└─ 纯编译器屏障,无 CPU 效果
└─ 适用:避免编译器优化特定代码路径
include/asm-generic/barrier.h:138:
// smp_store_release:先执行所有此前操作,再写入(发布语义)
#ifndef __smp_store_release
#define __smp_store_release(p, v) \
do { \
compiletime_assert_atomic_type(*p); \
__smp_mb(); // 先全屏障 \
WRITE_ONCE(*p, v); // 再写入 \
} while (0)
#endif
// smp_load_acquire:先读取,再执行所有此后操作(获取语义)
#ifndef __smp_load_acquire
#define __smp_load_acquire(p) \
({ \
__unqual_scalar_typeof(*p) ___p1 = READ_ONCE(*p); // 先读取 \
compiletime_assert_atomic_type(*p); \
__smp_mb(); // 再全屏障 \
(typeof(*p))___p1; \
})
#endifacquire/release 配对语义:
smp_store_release / smp_load_acquire 配对
生产者(CPU A): 消费者(CPU B):
data = 42; val = smp_load_acquire(&ready)
smp_store_release(&ready, 1); // ACQUIRE 屏障
if (val == 1) {
// 保证能看到 data = 42
use(data);
}
语义保证:
release: 此前所有写 happens-before ready=1
acquire: 读到 ready=1 之后,所有此后读都能看到 release 之前的写
关键特性:
- 比全屏障轻量(在强序架构如 x86 上可能是 nop)
- 配对使用才有完整 happens-before 关系
- 是实现 mutex / spinlock 的基础原语
// READ_ONCE:防止编译器多次读取、缓存读取或消除读取
// 等价于 volatile 读,但更精确
#define READ_ONCE(x) (*((volatile typeof(x) *)&(x)))
// WRITE_ONCE:防止编译器拆分写入或消除写入
#define WRITE_ONCE(x, val) (*((volatile typeof(x) *)&(x)) = (val))为什么需要 READ_ONCE/WRITE_ONCE?
// 危险代码:编译器可能优化掉重复读取
while (!flag) { } // 编译器可能优化为无限循环(flag 被缓存到寄存器)
// 正确写法:
while (!READ_ONCE(flag)) { } // 每次都从内存读取
// 另一个危险案例:
// 编译器可能将 long 写入拆分为两个 int 写入
long x = 0xDEADBEEFCAFEBABE;
// 如果 x 被多 CPU 并发读取,可能读到撕裂的值!
// 正确写法:
WRITE_ONCE(x, 0xDEADBEEFCAFEBABE); // 保证原子单次写入LKMM(Linux Kernel Memory Model)是对 Linux 内核允许的内存操作重排进行形式化建模的框架,位于 tools/memory-model/。
LKMM 核心关系
po(Program Order):
同一 CPU 上的程序顺序关系
rf(Reads From):
读操作读取了哪个写操作的值
co(Coherence Order):
对同一内存位置的写操作的全局顺序
fr(From Reads):
若读操作 r 读到了写操作 w 的值,
且后来写操作 w' 覆盖了 w,则 r fr w'
hb(Happens Before):
由 acquire/release/fence 构建的 happens-before 关系
LKMM 的禁止条件(Consistency Axiom):
不允许存在环:
hb ∪ (prop ; hb*) 中不存在环
其中 prop 包含了 fence 传播关系
工作区中的 litmus 测试文件 tools/memory-model/litmus-tests/Z6.0+pooncelock+poonceLock+pombonce.litmus:
C Z6.0+pooncelock+pooncelock+pombonce
(*
* Result: Sometimes
*
* This example demonstrates that a pair of accesses made by different
* processes each while holding a given lock will not necessarily be
* seen as ordered by a third process not holding that lock.
*)
{}
P0(int *x, int *y, spinlock_t *mylock)
{
spin_lock(mylock);
WRITE_ONCE(*x, 1);
WRITE_ONCE(*y, 1);
spin_unlock(mylock);
}
P1(int *y, int *z, spinlock_t *mylock)
{
int r0;
spin_lock(mylock);
r0 = READ_ONCE(*y);
WRITE_ONCE(*z, 1);
spin_unlock(mylock);
}
P2(int *x, int *z)
{
int r1;
WRITE_ONCE(*z, 2);
smp_mb();
r1 = READ_ONCE(*x);
}
exists (1:r0=1 /\ z=2 /\ 2:r1=0)
逐步分析这个测试:
Z6.0 litmus test 场景分析
目标:验证 exists (P1读到y=1) AND (z最终=2) AND (P2读到x=0) 是否可能发生
执行序列(一种导致 Sometimes 的执行路径):
时间顺序 P0 P1 P2
───────────────────────────────────────────────────────
t1 spin_lock()
t2 WRITE(*x, 1)
t3 WRITE(*y, 1)
t4 spin_unlock()
t5 spin_lock()
t6 r0=READ(*y) → r0=1 ✓ (看到P0的写)
t7 WRITE(*z, 1)
t8 spin_unlock()
t9 WRITE(*z, 2)
t10 smp_mb()
t11 r1=READ(*x) → r1=?
问题核心:P2 能看到 z=1(P1写的)还是 z=2(P2自己写的)?
z 的最终值 = 2(P2后写的,co顺序最后)
P2 的 smp_mb() 是否能保证 READ(*x) 看到 P0 的 WRITE(*x, 1)?
关键:P2 的 smp_mb() 只提供 P2 内部的读写顺序:
WRITE(*z, 2) happens-before READ(*x)
但无法跨越锁边界建立与 P0 的 hb 关系!
原因:
P0 持有 mylock 写 x=1
P2 不持有 mylock,与 P0 之间没有 lock/unlock 的 happens-before 链
即使 P2 看到了 z=1(P1写的),
P1 对 y 的读取在锁保护下 happens-after P0 的写,
但这个 happens-before 链不传播到不持该锁的 P2!
结论:Result = Sometimes(r0=1 AND z=2 AND r1=0 可能发生)
对应内核规则:
"锁只对持锁者之间建立顺序,不持该锁的第三方无法观察到该顺序"
这个测试揭示的重要原则:
锁的 happens-before 范围
┌─────────────────────────────────────────────────────┐
│ spin_lock(L) ─ 临界区操作 ─ spin_unlock(L) │
│ │ │ │
│ └──────── 有序 ─────────────┘ │
│ │
│ 下一个 spin_lock(L) 看到之前 spin_unlock(L) 的效果 │
│ │
│ 但:不持有 L 的代码看不到这个顺序! │
└─────────────────────────────────────────────────────┘
如果 P2 想看到 P0 写 x 的效果:
方案1:P2 也持有 mylock(但会降低并发度)
方案2:P0 用 smp_store_release 写 x,P2 用 smp_load_acquire 读 x
方案3:P0/P2 都使用 smp_mb()
内存模型强度对比
操作对 x86 (TSO) ARM64 POWER
──────────────────────────────────────────────────
STORE → STORE 有序 无序 无序
LOAD → LOAD 有序 无序 无序
LOAD → STORE 有序 无序 无序
STORE → LOAD 可重排! 可重排 可重排
x86 TSO 只允许 store-load 重排,因此:
smp_mb() 在 x86 上 = mfence 或 locked 操作
smp_store_release 在 x86 上 = 普通 MOV(TSO 自动提供)
smp_load_acquire 在 x86 上 = 普通 MOV(TSO 自动提供)
ARM64 允许更多重排,因此:
smp_mb() = dsb ish(数据同步屏障)
smp_store_release = stlr(store-release 指令)
smp_load_acquire = ldar(load-acquire 指令)
内核代码应始终使用 LKMM 语义,不依赖特定架构行为!
等待队列是 Linux 内核中进程睡眠-唤醒机制的基础,定义于 include/linux/wait.h。
核心数据结构(include/linux/wait.h:28):
// 单个等待者条目
struct wait_queue_entry {
unsigned int flags; // WQ_FLAG_* 标志位
void *private; // 通常是 task_struct *
wait_queue_func_t func; // 唤醒回调函数
struct list_head entry; // 链表节点
};
// 等待队列头
struct wait_queue_head {
spinlock_t lock; // 保护 head 链表
struct list_head head; // 等待者链表
};
typedef struct wait_queue_head wait_queue_head_t;标志位含义(include/linux/wait.h:19):
#define WQ_FLAG_EXCLUSIVE 0x01 // 独占等待者:wake_up 只唤醒一个
#define WQ_FLAG_WOKEN 0x02 // 已被唤醒
#define WQ_FLAG_CUSTOM 0x04 // 使用自定义唤醒函数
#define WQ_FLAG_DONE 0x08 // 已完成
#define WQ_FLAG_PRIORITY 0x10 // 高优先级,插入队列头部include/linux/wait.h:302 中的 ___wait_event 是所有 wait_event_* 宏的核心实现:
#define ___wait_event(wq_head, condition, state, exclusive, ret, cmd) \
({ \
__label__ __out; \
struct wait_queue_entry __wq_entry; \
long __ret = ret; /* 返回值 */ \
\
init_wait_entry(&__wq_entry, \
exclusive ? WQ_FLAG_EXCLUSIVE : 0); \
for (;;) { \
long __int = prepare_to_wait_event( \
&wq_head, &__wq_entry, state); /* 加入队列,设置状态 */ \
\
if (condition) /* 条件满足? */ \
break; /* → 退出循环 */ \
\
if (___wait_is_interruptible(state) && __int) { \
__ret = __int; /* 被信号中断 */ \
goto __out; \
} \
\
cmd; /* 通常是 schedule() */ /* 睡眠 */ \
\
if (condition) /* 唤醒后再检查条件 */ \
break; \
} \
finish_wait(&wq_head, &__wq_entry); /* 从队列移除 */ \
__out: __ret; \
})wait_event 的完整流程图:
wait_event(wq, condition) 完整流程
调用方
│
▼
might_sleep() ← 编译时检查是否在原子上下文
│
▼
if (condition) ← 快速检查(避免进入等待)
return ← 条件已满足,直接返回
│
▼(条件不满足)
for (;;) {
prepare_to_wait_event()
├─ spin_lock(&wq_head->lock) ← 获取等待队列锁
├─ __add_wait_queue() ← 加入链表
├─ set_current_state(state) ← 设置进程状态
└─ spin_unlock(&wq_head->lock) ← 释放等待队列锁
if (condition) break ← 再次检查(防 missed wakeup)
schedule() ← 让出 CPU,真正睡眠
if (condition) break ← 被唤醒后检查条件
}
finish_wait()
├─ set_current_state(TASK_RUNNING) ← 恢复运行状态
└─ list_del_init(&entry) ← 从队列移除(若还在)
独占等待 vs 非独占等待
非独占(默认): 独占(WQ_FLAG_EXCLUSIVE):
wake_up() 唤醒所有等待者 wake_up() 只唤醒第一个等待者
适用场景: 适用场景:
条件广播 惊群效应(thundering herd)防止
如:数据可读 如:accept 队列中多个服务进程
等待同一个连接
链表结构(独占者在尾部):
头 ←→ [非独占A] ←→ [非独占B] ←→ [独占C] ←→ [独占D] 尾
wake_up_all(): 唤醒所有(A, B, C, D)
wake_up(): 唤醒到第一个独占者为止(A, B, C)
include/linux/wait.h:186:
static inline void
__add_wait_queue_exclusive(struct wait_queue_head *wq_head,
struct wait_queue_entry *wq_entry)
{
wq_entry->flags |= WQ_FLAG_EXCLUSIVE; // 设置独占标志
__add_wait_queue(wq_head, wq_entry); // 插入到队列头(PRIORITY条目之后)
}include/linux/wait.h:104:
static inline bool wq_has_sleeper(struct wait_queue_head *wq_head)
{
/*
* We need to be sure we are in sync with the
* add_wait_queue modifications to the wait queue.
*
* This memory barrier should be paired with one on the
* waiting side.
*/
smp_mb(); // ← 关键:防止 waitqueue_active() 读被提升到 @cond 写之前
return waitqueue_active(wq_head);
}missed wakeup 的防止:
missed wakeup 竞争窗口
错误的实现(无屏障):
等待者 唤醒者
──────────────────────────────────────────
准备加入队列...
cond = true
if (waitqueue_active) ← 看到空队列!
wake_up(...) ← 不发送唤醒
加入队列
check(cond) → cond == true
← 但唤醒已错过!
set_current_state() 中的 smp_mb() 确保:
加入队列 happens-before 检查 cond
唤醒者的 smp_mb() 确保:
写 cond happens-before 读 waitqueue
两个屏障配对,消除竞争窗口。
include/linux/completion.h:26:
struct completion {
unsigned int done; // 完成计数(0=未完成,>0=已完成次数)
struct swait_queue_head wait; // 简化等待队列(swait)
};完成量使用 swait(Simple Wait Queue)而非普通 wait queue,因为:
- 完成量语义更简单(只有一个等待条件)
- swait 不需要处理 exclusive 等待、poll 等复杂场景
- swait 实现更轻量,减少代码大小
completion 工作流程
初始状态:
comp.done = 0
等待方(如模块初始化线程):
wait_for_completion(&comp)
│
▼
if (comp.done) { comp.done--; return; } ← 快路径:已完成
│
▼(未完成)
加入 comp.wait 等待队列
schedule() ← 睡眠
│
▼(被唤醒)
comp.done-- ← 消耗一个完成令牌
return
完成方(如硬件中断处理函数):
complete(&comp)
│
▼
spin_lock(&comp->wait.lock)
if (comp.done != UINT_MAX) ← 防止溢出
comp.done++ ← 增加完成令牌
swake_up_locked(&comp->wait) ← 唤醒一个等待者
spin_unlock(&comp->wait.lock)
complete_all(&comp):
comp.done = UINT_MAX ← 永久完成(唤醒所有等待者)
// 场景1:等待内核线程完成初始化
DECLARE_COMPLETION(thread_ready);
static int my_thread(void *data)
{
// 完成初始化工作...
complete(&thread_ready); // 通知:初始化完成
while (!kthread_should_stop()) { ... }
return 0;
}
void start_module(void)
{
kthread_run(my_thread, NULL, "my-thread");
wait_for_completion(&thread_ready); // 等待线程就绪
}
// 场景2:异步 DMA 完成通知
struct my_dma_work {
struct completion done;
// ...
};
// DMA 完成中断处理函数
void dma_irq_handler(struct my_dma_work *work)
{
complete(&work->done);
}
// 提交 DMA 并等待完成
void do_dma_transfer(struct my_dma_work *work)
{
init_completion(&work->done);
submit_dma(work);
wait_for_completion(&work->done); // 阻塞等待 DMA 完成
}lockdep 是 Linux 内核中的运行时锁依赖检测系统,由 Ingo Molnar 开发,能在第一次出现死锁条件时立即检测到,而不是等到真正死锁发生。
lockdep 能检测的问题
1. 循环死锁(ABBA 死锁):
CPU0: lock(A) → lock(B)
CPU1: lock(B) → lock(A)
→ lockdep 在 lock(B) 时检测到潜在环
2. 中断上下文中使用睡眠锁:
IRQ handler: lock(mutex) ← mutex 是睡眠锁!
→ lockdep 报告上下文错误
3. 中断与普通代码的锁顺序冲突:
进程:lock(A) → lock(B) [with irq enabled]
IRQ: lock(B) [可能与进程的 lock(A→B) 形成环]
4. 同一锁的递归加锁(非递归锁):
lock(A) → lock(A) ← deadlock!
include/linux/lockdep_types.h:70:
struct lock_class_key {
union {
struct hlist_node hash_entry; // 哈希表节点
struct lockdep_subclass_key subkeys[MAX_LOCKDEP_SUBCLASSES]; // 8个子类
};
};锁类的作用:
锁类(Lock Class)概念
问题:内核中有数百万个锁实例,无法逐一追踪
解决:相同代码位置初始化的锁共享同一个锁类
示例:
struct inode {
spinlock_t i_lock; // ← 每个 inode 都有一个
};
inode A 的 i_lock 和 inode B 的 i_lock 属于同一个锁类
只需记录锁类之间的顺序关系,不需要每个实例
内核中锁类的数量:~10000 个
每个锁类追踪:
- 该类锁已持有时,允许获取哪些其他锁类
- 该类锁未持有时,被哪些其他锁类持有时可以获取
- 中断相关的使用历史(irq-safe / irq-unsafe)
lockdep 依赖图
节点:每个锁类 (A, B, C, D, ...)
边:lock(X) 时已持有 Y → 添加边 Y → X (表示 Y 先于 X)
合法图(无环): 非法图(有环,死锁!):
A → B → C A → B
↑ ↓
C ← D
检测算法:
每次 lock_acquire(X):
1. 获取当前 CPU 上的锁持有链 [... Y]
2. 尝试添加边 Y → X 到全局依赖图
3. 运行 DFS 检测是否存在环
4. 若有环 → 打印完整的死锁场景,触发 BUG()
时间复杂度:O(锁类数²) 最坏情况,但实践中极快
include/linux/rcupdate.h:311(以 RCU 为例展示 lockdep hook):
static inline void rcu_lock_acquire(struct lockdep_map *map)
{
lock_acquire(map, // lockdep_map
0, // subclass
0, // try(0=阻塞获取)
2, // read(2=RCU 特殊读取)
0, // check
NULL, // nest_lock
_THIS_IP_); // 返回地址(记录代码位置)
}
static inline void rcu_lock_release(struct lockdep_map *map)
{
lock_release(map, _THIS_IP_);
}lockdep 的等待类型检查(include/linux/lockdep_types.h:17):
enum lockdep_wait_type {
LD_WAIT_INV = 0, // 未检查
LD_WAIT_FREE, // 无等待(如 RCU 读)
LD_WAIT_SPIN, // 自旋(raw_spinlock_t)
LD_WAIT_CONFIG, // 可配置(PREEMPT_RT 下可睡眠的 spinlock_t)
LD_WAIT_SLEEP, // 可睡眠(mutex_t)
LD_WAIT_MAX,
};这套类型系统允许 lockdep 检测:在 LD_WAIT_SPIN 上下文(持有 spinlock)中不能尝试获取 LD_WAIT_SLEEP 类型的锁(mutex)。
实际 lockdep 死锁报告格式
======================================================
WARNING: possible circular locking dependency detected
6.x.x #1 Not tainted
------------------------------------------------------
process/PID is trying to acquire lock:
ffffffff82a5c820 (lockA){+.+.}-{3:3}, at: function_a+0x40/0xa0
but task is already holding lock:
ffffffff82a5c940 (lockB){+.+.}-{3:3}, at: function_b+0x20/0x60
which lock already depends on the new lock.
the existing dependency chain (in reverse order) is:
-> #1 (lockA){+.+.}-{3:3}:
lock_acquire+0xd8/0x2e0
_raw_spin_lock+0x2c/0x40
function_c+0x30/0x80
-> #0 (lockB){+.+.}-{3:3}:
lock_acquire+0xd8/0x2e0
_raw_spin_lock+0x2c/0x40
function_a+0x40/0xa0
other info that might help us debug this:
Possible unsafe locking scenario:
CPU0 CPU1
---- ----
lock(lockB);
lock(lockA);
lock(lockB);
lock(lockA);
*** DEADLOCK ***
同步原语选择决策树
开始
│
▼
是否需要保护共享数据?
├─ 否 → 使用 per-CPU 变量(无共享)
└─ 是 ↓
是否只是简单的计数/标志?
├─ 是 → atomic_t / atomic64_t / refcount_t
└─ 否 ↓
是否在中断上下文中使用?
├─ 是 → spinlock_t (+ irq 变体)
└─ 否 ↓
临界区是否可能睡眠?
├─ 否 → spinlock_t(简单临界区)
└─ 是 ↓
读/写比例如何?
├─ 读极多,写极少(>10:1)
│ └─ 数据是否含指针?
│ ├─ 否 → seqlock
│ └─ 是 → RCU
│
├─ 读多写少(>2:1)
│ └─ rwsem(读写信号量)
│
└─ 读写均衡 → mutex
同步原语性能特征(参考值,实际因架构/竞争程度而异)
原语 无竞争延迟 有竞争行为 内存占用
─────────────────────────────────────────────────────
原子操作 ~1 ns CAS 重试 4-8 字节
per-CPU 变量 ~0.5 ns N/A(无共享) 8*NR_CPUS 字节
spinlock ~2-5 ns 忙等待 4-8 字节
qspinlock ~2-5 ns MCS 队列 4 字节
mutex(快路径) ~5-10 ns 乐观自旋→睡眠 ~40 字节
mutex(慢路径) ~500-1000ns 上下文切换 ~40 字节
rwsem(读快路径) ~10 ns 乐观自旋→睡眠 ~56 字节
RCU(读者) ~1-2 ns N/A(无等待) 宽限期延迟释放
seqlock(读者) ~2-5 ns 重试开销 ~8 字节
常见同步错误
1. 在持 spinlock 时调用可能睡眠的函数
spin_lock(&lock);
kmalloc(size, GFP_KERNEL); ← GFP_KERNEL 可能睡眠!
// 修复:使用 GFP_ATOMIC,或者改用 mutex
2. 忘记 irqsave 导致死锁
// 中断处理函数也持有同一个锁:
spin_lock(&lock); // 进程上下文
local_irq_enable(); // 打开中断
// 中断触发,中断处理函数 spin_lock(&lock) → 死锁!
// 修复:spin_lock_irqsave(&lock, flags)
3. RCU 临界区内修改数据
rcu_read_lock();
entry->value = new_val; ← 错误!RCU 临界区只能读
// 修复:在 rcu_read_unlock() 外持锁修改
4. 忘记 smp_wmb / smp_rmb 导致可见性问题
// 写者:
data = new_value;
flag = 1; ← CPU 可能先写 flag 后写 data!
// 修复:在 data=new_value 后加 smp_wmb()
5. double-free:call_rcu 后访问已释放对象
call_rcu(&entry->head, free_fn);
use(entry->field); ← 可能 free_fn 已执行!
// 修复:在 call_rcu 后不能访问 entry
三种读写保护机制对比
RCU rwsem spinlock/rwlock
───────────────────────────────────────────────────────────
读者开销 极低 中等 低(忙等待)
写者开销 中等 中等 低(忙等待)
读者延迟 无 有(等写者) 有(忙等待)
写者延迟 有(宽限期) 有(等读者) 有(忙等待)
读者可睡眠 否(普通RCU)是 否
写者可睡眠 是(阻塞) 是 否
含指针数据 支持 支持 支持
内存开销 高(延迟释放)低 低
典型场景 路由表/proc mmap_lock 中断处理
关键源文件速查表
数据结构 头文件 关键字段
──────────────────────────────────────────────────────────────────────
raw_spinlock_t spinlock_types_raw.h:14 raw_lock
spinlock_t spinlock_types.h:17 rlock
arch_spinlock_t(qspinlock) qspinlock_types.h:14 val(locked/pending/tail)
struct mutex mutex_types.h:41 owner/wait_lock/wait_list/osq
struct rw_semaphore rwsem.h:48 count/owner/wait_lock
struct wait_queue_head wait.h:35 lock/head
struct wait_queue_entry wait.h:28 flags/private/func/entry
struct completion completion.h:26 done/wait
seqcount_t seqlock_types.h sequence
struct rcu_head (rcu_types.h) next/func
内存屏障速查
屏障 作用 位置
──────────────────────────────────────────────────────────────────
mb() 全屏障(含设备内存) barrier.h:30
rmb() 读屏障(含设备内存) barrier.h:34
wmb() 写屏障(含设备内存) barrier.h:38
smp_mb() SMP 全屏障 barrier.h:99
smp_rmb() SMP 读屏障 barrier.h:103
smp_wmb() SMP 写屏障 barrier.h:107
smp_store_release(p,v) release 写 barrier.h:172
smp_load_acquire(p) acquire 读 barrier.h:176
barrier() 编译器屏障(无 CPU 效果) compiler.h
READ_ONCE(x) 防编译器优化读 rwonce.h
WRITE_ONCE(x,v) 防编译器优化写 rwonce.h
RCU API 速查
函数/宏 用途
──────────────────────────────────────────────────────────
rcu_read_lock() 进入 RCU 读临界区(rcupdate.h:845)
rcu_read_unlock() 退出 RCU 读临界区(rcupdate.h:876)
rcu_dereference(p) 安全读取 RCU 保护指针(rcupdate.h:752)
rcu_assign_pointer(p, v) 安全发布 RCU 指针(rcupdate.h:570)
synchronize_rcu() 同步等待宽限期结束(rcupdate.h:53)
call_rcu(head, func) 异步注册回调(rcupdate.h:51)
kfree_rcu(ptr, rhf) 宽限期后 kfree(rcupdate.h:1085)
rcu_read_lock_held() 调试:是否持有 RCU 读锁
RCU_INIT_POINTER(p, v) 初始化(不需要 release 语义)
rcu_access_pointer(p) 读取但不解引用(用于判断 NULL)
rcu_dereference_protected(p,c) 在持锁时安全读取(写者使用)
由 Claude Code 分析生成