内容简介:自旋锁用于处理器之间的互斥,适合保护很短的临界区,并且不允许在临界区睡眠。申请自旋锁的时候,如果自旋锁被其他处理器占有,本处理器自旋等待(也称为忙等待)。进程、软中断和硬中断都可以使用自旋锁。
作者简介:余华兵,在网络通信行业工作十多年,负责 IPv4 协议栈 、 IPv6 协议栈和 Linux 内核。在工作中看着 2.6 版本的专业书籍维护 3.x 和 4.x 版本的 Linux 内核,感觉不方便,于是自己分析 4.x 版本的 Linux 内核整理出一本书,书名叫《 Linux 内核深度解析》, 2019 年 5 月出版,希望对同行有帮助。
自旋锁用于处理器之间的互斥,适合保护很短的临界区,并且不允许在临界区睡眠。申请自旋锁的时候,如果自旋锁被其他处理器占有,本处理器自旋等待(也称为忙等待)。
进程、软中断和硬中断都可以使用自旋锁。
自旋锁的实现经历了 3 个阶段:
(1) 最早的自旋锁是无序竞争的,不保证先申请的进程先获得锁。
(2) 第 2 个阶段是入场券自旋锁,进程按照申请锁的顺序排队,先申请的进程先获得锁。
(3) 第 3 个阶段是 MCS 自旋锁。入场券自旋锁存在性能问题:所有申请锁的处理器在同一个变量上自旋等待,缓存同步的开销大,不适合处理器很多的系统。 MCS 自旋锁的策略是为每个处理器创建一个变量副本,每个处理器在自己的本地变量上自旋等待,解决了性能问题。
入场券自旋锁和 MCS 自旋锁都属于排队自旋锁( queued spinlock ),进程按照申请锁的顺序排队,先申请的进程先获得锁。
1. 数据结构
自旋锁的定义如下:
include/linux/spinlock _ types.h
typedef struct spinlock {
union {
struct raw _ spinlock rlock;
…
};
} spinlock _ t;
typedef struct raw _ spinlock {
arch _ spinlock _ t raw _ lock;
…
} raw _ spinlock _ t;
可以看到,数据类型 spinlock 对 raw_spinlock 做了封装,然后数据类型 raw_spinlock 对 arch_spinlock_t 做了封装,各种处理器架构需要自定义数据类型 arch_spinlock_t 。
spinlock 和 raw_spinlock (原始自旋锁)有什么关系?
Linux 内核有一个实时内核分支(开启配置宏 CONFIG_PREEMPT_RT )来支持硬实时特性,内核主线只支持软实时。
对于没有打上实时内核补丁的内核, spinlock 只是封装 raw_spinlock ,它们完全一样。如果打上实时内核补丁,那么 spinlock 使用实时互斥锁保护临界区,在临界区内可以被抢占和睡眠,但 raw_spinlock 还是自旋锁。
目前 主线 版本还没有合并实时内核补丁,说不定哪天就会合并进来,为了使代码可以兼容实时内核,最好坚持 3 个原则:
( 1 )尽可能使用 spinlock 。
( 2 )绝对不允许被抢占和睡眠的地方,使用 raw_spinlock ,否则使用 spinlock 。
( 3 )如果临界区足够小,使用 raw_spinlock 。
2. 使用方法
定义并且初始化静态自旋锁的方法是:
DEFINE _ SPINLOCK(x);
在运行时动态初始化自旋锁的方法是:
spin _ lock _ init(x);
申请自旋锁的函数是:
( 1 ) void spin_lock(spinlock_t *lock);
申请自旋锁,如果锁被其他处理器占有,当前处理器自旋等待。
( 2 ) void spin_lock_bh(spinlock_t *lock);
申请自旋锁,并且禁止当前处理器的软中断。
( 3 ) void spin_lock_irq(spinlock_t *lock);
申请自旋锁,并且禁止当前处理器的硬中断。
( 4 ) spin_lock_irqsave(lock, flags);
申请自旋锁,保存当前处理器的硬中断状态,并且禁止当前处理器的硬中断。
( 5 ) int spin_trylock(spinlock_t *lock);
申请自旋锁,如果申请成功,返回 1 ;如果锁被其他处理器占有,当前处理器不等待,立即返回 0 。
释放自旋锁的函数是:
( 1 ) void spin_unlock(spinlock_t *lock);
( 2 ) void spin_unlock_bh(spinlock_t *lock);
释放自旋锁,并且开启当前处理器的软中断。
( 3 ) void spin_unlock_irq(spinlock_t *lock);
释放自旋锁,并且开启当前处理器的硬中断。
( 4 ) void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
释放自旋锁,并且恢复当前处理器的硬中断状态。
定义并且初始化静态原始自旋锁的方法是:
DEFINE _ RAW _ SPINLOCK(x);
在运行时动态初始化原始自旋锁的方法是:
raw _ spin _ lock _ init (x);
申请原始自旋锁的函数是:
( 1 ) raw_spin_lock(lock)
申请原始自旋锁,如果锁被其他处理器占有,当前处理器自旋等待。
( 2 ) raw_spin_lock_bh(lock)
申请原始自旋锁,并且禁止当前处理器的软中断。
( 3 ) raw_spin_lock_irq(lock)
申请原始自旋锁,并且禁止当前处理器的硬中断。
( 4 ) raw_spin_lock_irqsave(lock, flags)
申请原始自旋锁,保存当前处理器的硬中断状态,并且禁止当前处理器的硬中断。
( 5 ) raw_spin_trylock(lock)
申请原始自旋锁,如果申请成功,返回 1 ;如果锁被其他处理器占有,当前处理器不等待,立即返回 0 。
释放原始自旋锁的函数是:
( 1 ) raw_spin_unlock(lock)
( 2 ) raw_spin_unlock_bh(lock)
释放原始自旋锁,并且开启当前处理器的软中断。
( 3 ) raw_spin_unlock_irq(lock)
释放原始自旋锁,并且开启当前处理器的硬中断。
( 4 ) raw_spin_unlock_irqrestore(lock, flags)
释放原始自旋锁,并且恢复当前处理器的硬中断状态。
3. 入场券自旋锁
入场券自旋锁( ticket spinlock )的算法类似于银行柜台的排队叫号:
( 1 )锁拥有排队号和服务号,服务号是当前占有锁的进程的排队号。
( 2 )每个进程申请锁的时候,首先申请一个排队号,然后轮询锁的服务号是否等于自己的排队号,如果等于,表示自己占有锁,可以进入临界区,否则继续轮询。
( 3 )当进程释放锁时,把服务号加一,下一个进程看到服务号等于自己的排队号,退出自旋,进入临界区。
ARM64 架构定义的数据类型 arch_spinlock_t 如下所示:
arch/arm64/include/asm/spinlock _ types.h
typedef struct {
#ifdef __ AARCH64EB __ /* 大端字节序(高位存放在低地址) */
u16 next;
u16 owner;
#else /* 小端字节序(低位存放在低地址) */
u16 owner;
u16 next;
#endif
} __ aligned(4) arch _ spinlock _ t;
成员 next 是排队号,成员 owner 是服务号。
在多处理器系统中,函数 spin_lock() 负责申请自旋锁, ARM64 架构的代码如下所示:
spin _ lock() -> raw _ spin _ lock() -> _ raw _ spin _ lock() -> __ raw _ spin _ lock() -> do _ raw _ spin _ lock() -> arch _ spin _ lock()
arch/arm64/include/asm/spinlock.h
1 static inline void arch _ spin _ lock(arch _ spinlock _ t *lock)
2 {
3 unsigned int tmp;
4 arch _ spinlock _ t lockval, newval;
5
6 asm volatile(
7 ARM64 _ LSE _ ATOMIC _ INSN(
8 /* LL/SC */
9 " prfm pstl1strm, %3\n"
10 "1: ldaxr %w0, %3\n"
11 " add %w1, %w0, %w5\n"
12 " stxr %w2, %w1, %3\n"
13 " cbnz %w2, 1b\n",
14 /* 大系统扩展的原子指令 */
15 " mov %w2, %w5\n"
16 " ldadda %w2, %w0, %3\n"
17 __ nops(3)
18 )
19
20 /* 我们得到锁了吗? */
21 " eor %w1, %w0, %w0, ror #16\n"
22 " cbz %w1, 3f\n"
23 " sevl\n"
24 "2: wfe\n"
25 " ldaxrh %w2, %4\n"
26 " eor %w1, %w2, %w0, lsr #16\n"
27 " cbnz %w1, 2b\n"
28 /* 得到锁,临界区从这里开始 */
29 "3:"
30 : "=&r" (lockval), "=&r" (newval), "=&r" (tmp), "+Q" (*lock)
31 : "Q" (lock->owner), "I" (1 << TICKET _ SHIFT)
32 : "memory");
33 }
第 6 ~ 18 行代码,申请排队号,然后把自旋锁的排队号加 1 ,这是一个原子操作,有两种实现方法:
1 )第 9 ~ 13 行代码,使用指令 ldaxr (带有获取语义的独占加载)和 stxr (独占存储)实现,指令 ldaxr 带有获取语义,后面的加载 / 存储指令必须在指令 ldaxr 完成之后开始执行。
2 )第 15 ~ 16 行代码,如果处理器支持大系统扩展,那么使用带有获取语义的原子加法指令 ldadda 实现,指令 ldadda 带有获取语义,后面的加载 / 存储指令必须在指令 ldadda 完成之后开始执行。
第 21 ~ 22 行代码,如果服务号等于当前进程的排队号,进入临界区。
第 24 ~ 27 行代码,如果服务号不等于当前进程的排队号,那么自旋等待。使用指令 ldaxrh (带有获取语义的独占加载, h 表示 halfword ,即 2 字节)读取服务号,指令 ldaxrh 带有获取语义,后面的加载 / 存储指令必须在指令 ldaxrh 完成之后开始执行。
第 23 行代码, sevl ( send event local )指令的功能是发送一个本地事件,避免错过其他处理器释放自旋锁时发送的事件。
第 24 行代码, wfe ( wait for event )指令的功能是使处理器进入低功耗状态,等待事件。
函数 spin_unlock() 负责释放自旋锁, ARM64 架构的代码如下所示:
spin _ unlock() -> raw _ spin _ unlock() -> _ raw _ spin _ unlock() -> __ raw _ spin _ unlock() -> do _ raw _ spin _ unlock() -> arch _ spin _ unlock()
arch/arm64/include/asm/spinlock.h
1 static inline void arch _ spin _ unlock(arch _ spinlock _ t *lock)
2 {
3 unsigned long tmp;
4
5 asm volatile(ARM64 _ LSE _ ATOMIC _ INSN(
6 /* LL/SC */
7 " ldrh %w1, %0\n"
8 " add %w1, %w1, #1\n"
9 " stlrh %w1, %0",
10 /* 大多统扩展的原子指令 */
11 " mov %w1, #1\n"
12 " staddlh %w1, %0\n"
13 __ nops(1))
14 : "=Q" (lock->owner), "=&r" (tmp)
15 :
16 : "memory");
17 }
把自旋锁的服务号加 1 ,有两种实现方法:
( 1 )第 7 ~ 9 行代码,使用指令 ldrh (加载, h 表示 halfword ,即 2 字节)和 stlrh (带有释放语义的存储)实现,指令 stlrh 带有释放语义,前面的加载 / 存储指令必须在指令 stlrh 开始执行之前执行完。因为一次只能有一个进程进入临界区,所以只有一个进程把自旋锁的服务号加 1 ,不需要是原子操作。
( 2 )第 11 ~ 12 行代码,如果处理器支持大系统扩展,那么使用带有释放语义的原子加法指令 staddlh 实现,指令 staddlh 带有释放语义,前面的加载 / 存储指令必须在指令 staddlh 开始执行之前执行完。
在单处理器系统中,自旋锁是空的。
include/linux/spinlock _ types _ up.h
typedef struct { } arch _ spinlock _ t;
函数 spin_lock() 只是禁止内核抢占。
spin _ lock() -> raw _ spin _ lock() -> _ raw _ spin _ lock()
include/linux/spinlock _ api _ up.h
#define _ raw _ spin _ lock(lock) __ LOCK(lock)
#define __ LOCK(lock) \
do { preempt _ disable(); ___ LOCK(lock); } while (0)
#define ___ LOCK(lock) \
do { __ acquire(lock); (void)(lock); } while (0)
4. MCS自旋锁
入场券自旋锁存在性能问题:所有等待同一个自旋锁的处理器在同一个变量上自旋等待,申请或者释放锁的时候会修改锁,导致其他处理器存放自旋锁的缓存行失效,在拥有几百甚至几千个处理器的大型系统中,处理器申请自旋锁时竞争可能很激烈,缓存同步的开销很大,导致系统性能大幅度下降。
MCS ( MCS 是“ Mellor-Crummey ”和“ Scott ”这两个发明人的名字的首字母缩写)自旋锁解决了这个缺点,它的策略是为每个处理器创建一个变量副本,每个处理器在申请自旋锁的时候在自己的本地变量上自旋等待,避免缓存同步的开销。
4.1. 传统的 MCS 自旋锁
传统的 MCS 自旋锁包含:
( 1 )一个指针 tail 指向队列的尾部。
( 2 )每个处理器对应一个队列节点,即 mcs_lock_node 结构体,其中成员 next 指向队列的下一个节点,成员 locked 指示锁是否被其他处理器占有,如果成员 locked 的值为 1 ,表示锁被其他处理器占有。
结构体的定义如下所示:
typedef struct __mcs_lock_node {
struct __mcs_lock_node *next;
int locked;
} ____cacheline_aligned_in_smp mcs_lock_node;
typedef struct {
mcs_lock_node *tail;
mcs_lock_node nodes[NR_CPUS];/* NR_CPUS 是处理器的数量 */
} spinlock_t;
其中“ ____cacheline_aligned_in_smp ”的作用是:在多处理器系统中,结构体的起始地址和长度都是一级缓存行长度的整数倍。
当没有处理器占有或者等待自旋锁的时候,队列是空的, tail 是空指针。
图 4 . 1 处理器 0 申请 MCS 自旋锁
如图 4 . 1 所示,当处理器 0 申请自旋锁的时候,执行原子交换操作,使 tail 指向处理器 0 的 mcs_lock_node 结构体,并且返回 tail 的旧值。 tail 的旧值是空指针,说明自旋锁处于空闲状态,那么处理器 0 获得自旋锁。
图 4 . 2 处理器 1 申请 MCS 自旋锁
如图 4 . 2 所示,当处理器 0 占有自旋锁的时候,处理器 1 申请自旋锁,执行原子交换操作,使 tail 指向处理器 1 的 mcs_lock_node 结构体,并且返回 tail 的旧值。 tail 的旧值是处理器 0 的 mcs_lock_node 结构体的地址,说明自旋锁被其他处理器占有,那么使处理器 0 的 mcs_lock_node 结构体的成员 next 指向处理器 1 的 mcs_lock_node 结构体,把处理器 1 的 mcs_lock_node 结构体的成员 locked 设置为 1 ,然后处理器 1 在自己的 mcs_lock_node 结构体的成员 locked 上面自旋等待,等待成员 locked 的值变成 0 。
图 4 . 3 处理器 0 释放 MCS 自旋锁
如图 4 . 3 所示,处理器 0 释放自旋锁,发现自己的 mcs_lock_node 结构体的成员 next 不是空指针,说明有申请者正在等待锁,于是把下一个节点的成员 locked 设置为 0 ,处理器 1 获得自旋锁。
处理器 1 释放自旋锁,发现自己的 mcs_lock_node 结构体的成员 next 是空指针,说明自己是最后一个申请者,于是执行原子比较交换操作:如果 tail 指向自己的 mcs_lock_node 结构体,那么把 tail 设置为空指针。
4.2. 小巧的 MCS 自旋锁
传统的 MCS 自旋锁存在的缺陷是:结构体的长度太大,因为 mcs_lock_node 结构体的起始地址和长度都必须是一级缓存行长度的整数倍,所以 MCS 自旋锁的长度是(一级缓存行长度 + 处理器数量 * 一级缓存行长度),而入场券自旋锁的长度只有 4 字节。自旋锁被嵌入到内核的很多结构体中,如果自旋锁的长度增加,会导致这些结构体的长度增加。
经过内核社区技术专家的努力,成功地把 MCS 自旋锁放进 4 个字节,实现了小巧的 MCS 自旋锁。自旋锁的定义如下所示:
include/asm-generic/qspinlock_types.h
typedef struct qspinlock {
atomic_t val;
} arch_spinlock_t;
另外,为每个处理器定义 1 个队列节点数组,如下所示:
kernel/locking/qspinlock.c
#ifdef CONFIG_PARAVIRT_SPINLOCKS
#define MAX_NODES 8
#else
#define MAX_NODES 4
#endif
static DEFINE_PER_CPU_ALIGNED(struct mcs_spinlock, mcs_nodes[MAX_NODES]);
配置宏 CONFIG_PARAVIRT_SPINLOCKS 用来启用半虚拟化的自旋锁,给虚拟机使用,本文不考虑这种使用场景。每个处理器需要 4 个队列节点,原因如下:
(1) 申请自旋锁的函数禁止内核抢占,所以进程在等待自旋锁的过程中不会被其他进程抢占。
(2) 进程在等待自旋锁的过程中可能被软中断抢占,然后软中断等待另一个自旋锁。
(3) 软中断在等待自旋锁的过程中可能被硬中断抢占,然后硬中断等待另一个自旋锁。
(4) 硬中断在等待自旋锁的过程中可能被不可屏蔽中断抢占,然后不可屏蔽中断等待另一个自旋锁。
综上所述,一个处理器最多同时等待 4 个自旋锁。
和入场券自旋锁相比, MCS 自旋锁增加的内存开销是数组 mcs_nodes 。
队列节点的定义如下所示:
kernel/locking/mcs_spinlock.h
struct mcs_spinlock {
struct mcs_spinlock *next;
int locked;
int count;
};
其中成员 next 指向队列的下一个节点;成员 locked 指示锁是否被前一个等待者占有,如果值为 1 ,表示锁被前一个等待者占有;成员 count 是嵌套层数,也就是数组 mcs_nodes 已分配的数组项的数量。
自旋锁的 32 个二进制位被划分成 4 个字段:
(1) locked 字段,指示锁已经被占有,长度是一个字节,占用第 0~7 位。
(2) 一个 pending 位,占用第 8 位,第 1 个等待自旋锁的处理器设置 pending 位。
(3) index 字段,是数组索引,指示队列的尾部节点使用数组 mcs_nodes 的哪一项。
(4) cpu 字段,存放队列的尾部节点的处理器编号,实际存储的值是处理器编号加上 1 , cpu 字段减去 1 才是真实的处理器编号。
index 字段和 cpu 字段合起来称为 tail 字段,存放队列的尾部节点的信息,布局分两种情况:
(1) 如果处理器的数量小于,那么第 9~15 位没有使用,第 16~17 位是 index 字段,第 18~31 位是 cpu 字段。
(2) 如果处理器的数量大于或等于,那么第 9~10 位是 index 字段,第 11~31 位是 cpu 字段。
把 MCS 自旋锁放进 4 个字节的关键是:存储处理器编号和数组索引,而不是存储尾部节点的地址。
内核对 MCS 自旋锁做了优化:第 1 个等待自旋锁的处理器直接在锁自身上面自旋等待,不是在自己的 mcs_spinlock 结构体上自旋等待。这个优化带来的好处是:当锁被释放的时候,不需要访问 mcs_spinlock 结构体的缓存行,相当于减少了一次缓存没命中。后续的处理器在自己的 mcs_spinlock 结构体上面自旋等待,直到它们移动到队列的首部为止。
自旋锁的 pending 位进一步扩展这个优化策略。第 1 个等待自旋锁的处理器简单地设置 pending 位,不需要使用自己的 mcs_spinlock 结构体。第 2 个处理器看到 pending 被设置,开始创建等待队列,在自己的 mcs_spinlock 结构体的 locked 字段上自旋等待。这种做法消除了两个等待者之间的缓存同步,而且第 1 个等待者没使用自己的 mcs_spinlock 结构体,减少了一次缓存行没命中。
在多处理器系统中,申请 MCS 自旋锁的代码如下所示:
spin _ lock() -> raw _ spin _ lock() -> _ raw _ spin _ lock() -> __ raw _ spin _ lock() -> do _ raw _ spin _ lock() -> arch _ spin _ lock()
include/asm-generic/qspinlock.h
1 #define arch_spin_lock(l) queued_spin_lock(l)
2
3 static __always_inline void queued_spin_lock(struct qspinlock *lock)
4 {
5 u32 val;
6
7 val = atomic_cmpxchg_acquire(&lock->val, 0, _Q_LOCKED_VAL);
8 if (likely(val == 0))
9 return;
10 queued_spin_lock_slowpath(lock, val);
11 }
第 7 行代码,执行带有获取语义的原子比较交换操作,如果锁的值是 0 ,那么把锁的 locked 字段设置为 1 。获取语义保证后面的加载 / 存储指令必须在函数 atomic_cmpxchg_acquire() 完成之后开始执行。函数 atomic_cmpxchg_acquire() 返回锁的旧值。
第 8~9 行代码,如果锁的旧值是 0 ,说明申请锁的时候锁处于空闲状态,那么成功地获得锁。
第 10 行代码,如果锁的旧值不是 0 ,说明锁不是处于空闲状态,那么执行申请自旋锁的慢速路径。
申请 MCS 自旋锁的慢速路径如下所示:
kernel/locking/qspinlock.c
1 void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
2 {
3 struct mcs_spinlock *prev, *next, *node;
4 u32 new, old, tail;
5 int idx;
6
7 ...
8 if (val == _Q_PENDING_VAL) {
9 while ((val = atomic_read(&lock->val)) == _Q_PENDING_VAL)
10 cpu_relax();
11
12
13 for (;;) {
14 if (val & ~_Q_LOCKED_MASK)
15 goto queue;
16
17 new = _Q_LOCKED_VAL;
18 if (val == new)
19 new |= _Q_PENDING_VAL;
20
21 old = atomic_cmpxchg_acquire(&lock->val, val, new);
22 if (old == val)
23 break;
24
25 val = old;
26
27
28 if (new == _Q_LOCKED_VAL)
29 return;
30
31 smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_MASK));
32
33 clear_pending_set_locked(lock);
34 return;
35
36 queue:
37 node = this_cpu_ptr(&mcs_nodes[0]);
38 idx = node->count++;
39 tail = encode_tail(smp_processor_id(), idx);
40
41 node += idx;
42 node->locked = 0;
43 node->next = NULL;
44
45
46 if (queued_spin_trylock(lock))
47 goto release;
48
49 old = xchg_tail(lock, tail);
50 next = NULL;
51
52 if (old & _Q_TAIL_MASK) {
53 prev = decode_tail(old);
54 smp_read_barrier_depends();
55
56 WRITE_ONCE(prev->next, node);
57
58 ...
59 arch_mcs_spin_lock_contended(&node->locked);
60
61 next = READ_ONCE(node->next);
62 if (next)
63 prefetchw(next);
64
65
66
67 val = smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_PENDING_MASK));
68
69 locked:
70 for (;;) {
71 if ((val & _Q_TAIL_MASK) != tail) {
72 set_locked(lock);
73 break;
74 }
75
76 old = atomic_cmpxchg_relaxed(&lock->val, val, _Q_LOCKED_VAL);
77 if (old == val)
78 goto release;
79
80 val = old;
81
82
83 if (!next) {
84 while (!(next = READ_ONCE(node->next)))
85 cpu_relax();
86
87
88 arch_mcs_spin_unlock_contended(&next->locked);
89
90
91 release:
92 __this_cpu_dec(mcs_nodes[0].count);
93 }
第 8~11 行代码,如果锁的状态是 pending ,即 {tail=0 , pending=1 , locked=0} ,那么等待锁的状态变成 locked ,即 {tail=0 , pending=0 , locked=1} 。
第 14~15 行代码,如果锁的 tail 字段不是 0 或者 pending 位是 1 ,说明已经有处理器在等待自旋锁,那么跳转到标号 queue ,本处理器加入等待队列。
第 17~21 行代码,如果锁处于 locked 状态,那么把锁的状态设置为 locked & pending ,即 {tail=0 , pending=1 , locked=1} ;如果锁处于空闲状态(占有锁的处理器刚刚释放自旋锁),那么把锁的状态设置为 locked 。
第 28~29 行代码,如果上一步锁的状态从空闲变成 locked ,那么成功地获得锁。
第 31 行代码,等待占有锁的处理器释放自旋锁,即锁的 locked 字段变成 0 。
第 32 行代码,成功地获得锁,把锁的状态从 pending 改成 locked ,即清除 pending 位,把 locked 字段设置为 1 。
从第 2 个等待自旋锁的处理器开始,需要加入等待队列,处理如下:
(1) 第 37~43 行代码,从本处理器的数组 mcs_nodes 分配一个数组项,然后初始化。
(2) 第 46~47 行代码,如果锁处于空闲状态,那么获得锁。
(3) 第 49 行代码,把自旋锁的 tail 字段设置为本处理器的队列节点的信息,并且返回前一个队列节点的信息。
(4) 第 52 行代码,如果本处理器的队列节点不是队列首部,那么处理如下:
1 )第 56 行代码,把前一个队列节点的 next 字段设置为本处理器的队列节点的地址。
2 )第 59 行代码,本处理器在自己的队列节点的 locked 字段上面自旋等待,等待 locked 字段从 0 变成 1 ,也就是等待本处理器的队列节点移动到队列首部。
(5) 第 67 行代码,本处理器的队列节点移动到队列首部以后,在锁自身上面自旋等待,等待自旋锁的 pending 位和 locked 字段都变成 0 ,也就是等待锁的状态变成空闲。
(6) 锁的状态变成空闲以后,本处理器把锁的状态设置为 locked ,分两种情况:
1 )第 71 行代码,如果队列还有其他节点,即还有其他处理器在等待锁,那么处理如下:
q 第 72 行代码,把锁的 locked 字段设置为 1 。
q 第 83~86 行代码,等待下一个等待者设置本处理器的队列节点的 next 字段。
q 第 88 行代码,把下一个队列节点的 locked 字段设置为 1 。
2 )第 76 行代码,如果队列只有一个节点,即本处理器是唯一的等待者,那么把锁的 tail 字段设置为 0 ,把 locked 字段设置为 1 。
(7) 第 92 行代码,释放本处理器的队列节点。
释放 MCS 自旋锁的代码如下所示:
spin _ unlock() -> raw _ spin _ unlock() -> _ raw _ spin _ unlock() -> __ raw _ spin _ unlock() -> do _ raw _ spin _ unlock() -> arch _ spin _ unlock()
include/asm-generic/qspinlock.h
1 #define arch_spin_unlock(l) queued_spin_unlock(l)
2
3 static __always_inline void queued_spin_unlock(struct qspinlock *lock)
4 {
5 (void)atomic_sub_return_release(_Q_LOCKED_VAL, &lock->val);
6 }
第 5 行代码,执行带释放语义的原子减法操作,把锁的 locked 字段设置为 0 ,释放语义保证前面的加载 / 存储指令在函数 atomic_sub_return_release() 开始执行之前执行完。
MCS 自旋锁的配置宏是 CONFIG_ARCH_USE_QUEUED_SPINLOCKS 和 CONFIG_QUEUED_SPINLOCKS ,目前只有 x86 处理器架构使用 MCS 自旋锁,默认开启 MCS 自旋锁的配置宏,如下所示:
arch/x86/kconfig
config X86
def_bool y
...
select ARCH_USE_QUEUED_SPINLOCKS
...
kernel/kconfig.locks
config ARCH_USE_QUEUED_SPINLOCKS
bool
config QUEUED_SPINLOCKS
def_bool y if ARCH_USE_QUEUED_SPINLOCKS
depends on SMP
以上所述就是小编给大家介绍的《Linux内核的自旋锁》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- golang 自旋锁
- 自旋锁和互斥锁区别 --- 经典
- 轻松搞懂Java中的自旋锁 原 荐
- 面试官:你说说互斥锁、自旋锁、读写锁、悲观锁、乐观锁的应用场景
- 内核必须懂(六): 使用kgdb调试内核
- Linux内核如何替换内核函数并调用原始函数
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。