内容简介:自旋锁用于处理器之间的互斥,适合保护很短的临界区,并且不允许在临界区睡眠。申请自旋锁的时候,如果自旋锁被其他处理器占有,本处理器自旋等待(也称为忙等待)。进程、软中断和硬中断都可以使用自旋锁。
作者简介:余华兵,在网络通信行业工作十多年,负责 IPv4 协议栈 、 IPv6 协议栈和 Linux 内核。在工作中看着 2.6 版本的专业书籍维护 3.x 和 4.x 版本的 Linux 内核,感觉不方便,于是自己分析 4.x 版本的 Linux 内核整理出一本书,书名叫《 Linux 内核深度解析》, 2019 年 5 月出版,希望对同行有帮助。
自旋锁用于处理器之间的互斥,适合保护很短的临界区,并且不允许在临界区睡眠。申请自旋锁的时候,如果自旋锁被其他处理器占有,本处理器自旋等待(也称为忙等待)。
进程、软中断和硬中断都可以使用自旋锁。
自旋锁的实现经历了 3 个阶段:
(1) 最早的自旋锁是无序竞争的,不保证先申请的进程先获得锁。
(2) 第 2 个阶段是入场券自旋锁,进程按照申请锁的顺序排队,先申请的进程先获得锁。
(3) 第 3 个阶段是 MCS 自旋锁。入场券自旋锁存在性能问题:所有申请锁的处理器在同一个变量上自旋等待,缓存同步的开销大,不适合处理器很多的系统。 MCS 自旋锁的策略是为每个处理器创建一个变量副本,每个处理器在自己的本地变量上自旋等待,解决了性能问题。
入场券自旋锁和 MCS 自旋锁都属于排队自旋锁( queued spinlock ),进程按照申请锁的顺序排队,先申请的进程先获得锁。
1. 数据结构
自旋锁的定义如下:
include/linux/spinlock _ types.h
typedef struct spinlock {
union {
struct raw _ spinlock rlock;
…
};
} spinlock _ t;
typedef struct raw _ spinlock {
arch _ spinlock _ t raw _ lock;
…
} raw _ spinlock _ t;
可以看到,数据类型 spinlock 对 raw_spinlock 做了封装,然后数据类型 raw_spinlock 对 arch_spinlock_t 做了封装,各种处理器架构需要自定义数据类型 arch_spinlock_t 。
spinlock 和 raw_spinlock (原始自旋锁)有什么关系?
Linux 内核有一个实时内核分支(开启配置宏 CONFIG_PREEMPT_RT )来支持硬实时特性,内核主线只支持软实时。
对于没有打上实时内核补丁的内核, spinlock 只是封装 raw_spinlock ,它们完全一样。如果打上实时内核补丁,那么 spinlock 使用实时互斥锁保护临界区,在临界区内可以被抢占和睡眠,但 raw_spinlock 还是自旋锁。
目前 主线 版本还没有合并实时内核补丁,说不定哪天就会合并进来,为了使代码可以兼容实时内核,最好坚持 3 个原则:
( 1 )尽可能使用 spinlock 。
( 2 )绝对不允许被抢占和睡眠的地方,使用 raw_spinlock ,否则使用 spinlock 。
( 3 )如果临界区足够小,使用 raw_spinlock 。
2. 使用方法
定义并且初始化静态自旋锁的方法是:
DEFINE _ SPINLOCK(x);
在运行时动态初始化自旋锁的方法是:
spin _ lock _ init(x);
申请自旋锁的函数是:
( 1 ) void spin_lock(spinlock_t *lock);
申请自旋锁,如果锁被其他处理器占有,当前处理器自旋等待。
( 2 ) void spin_lock_bh(spinlock_t *lock);
申请自旋锁,并且禁止当前处理器的软中断。
( 3 ) void spin_lock_irq(spinlock_t *lock);
申请自旋锁,并且禁止当前处理器的硬中断。
( 4 ) spin_lock_irqsave(lock, flags);
申请自旋锁,保存当前处理器的硬中断状态,并且禁止当前处理器的硬中断。
( 5 ) int spin_trylock(spinlock_t *lock);
申请自旋锁,如果申请成功,返回 1 ;如果锁被其他处理器占有,当前处理器不等待,立即返回 0 。
释放自旋锁的函数是:
( 1 ) void spin_unlock(spinlock_t *lock);
( 2 ) void spin_unlock_bh(spinlock_t *lock);
释放自旋锁,并且开启当前处理器的软中断。
( 3 ) void spin_unlock_irq(spinlock_t *lock);
释放自旋锁,并且开启当前处理器的硬中断。
( 4 ) void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
释放自旋锁,并且恢复当前处理器的硬中断状态。
定义并且初始化静态原始自旋锁的方法是:
DEFINE _ RAW _ SPINLOCK(x);
在运行时动态初始化原始自旋锁的方法是:
raw _ spin _ lock _ init (x);
申请原始自旋锁的函数是:
( 1 ) raw_spin_lock(lock)
申请原始自旋锁,如果锁被其他处理器占有,当前处理器自旋等待。
( 2 ) raw_spin_lock_bh(lock)
申请原始自旋锁,并且禁止当前处理器的软中断。
( 3 ) raw_spin_lock_irq(lock)
申请原始自旋锁,并且禁止当前处理器的硬中断。
( 4 ) raw_spin_lock_irqsave(lock, flags)
申请原始自旋锁,保存当前处理器的硬中断状态,并且禁止当前处理器的硬中断。
( 5 ) raw_spin_trylock(lock)
申请原始自旋锁,如果申请成功,返回 1 ;如果锁被其他处理器占有,当前处理器不等待,立即返回 0 。
释放原始自旋锁的函数是:
( 1 ) raw_spin_unlock(lock)
( 2 ) raw_spin_unlock_bh(lock)
释放原始自旋锁,并且开启当前处理器的软中断。
( 3 ) raw_spin_unlock_irq(lock)
释放原始自旋锁,并且开启当前处理器的硬中断。
( 4 ) raw_spin_unlock_irqrestore(lock, flags)
释放原始自旋锁,并且恢复当前处理器的硬中断状态。
3. 入场券自旋锁
入场券自旋锁( ticket spinlock )的算法类似于银行柜台的排队叫号:
( 1 )锁拥有排队号和服务号,服务号是当前占有锁的进程的排队号。
( 2 )每个进程申请锁的时候,首先申请一个排队号,然后轮询锁的服务号是否等于自己的排队号,如果等于,表示自己占有锁,可以进入临界区,否则继续轮询。
( 3 )当进程释放锁时,把服务号加一,下一个进程看到服务号等于自己的排队号,退出自旋,进入临界区。
ARM64 架构定义的数据类型 arch_spinlock_t 如下所示:
arch/arm64/include/asm/spinlock _ types.h
typedef struct {
#ifdef __ AARCH64EB __ /* 大端字节序(高位存放在低地址) */
u16 next;
u16 owner;
#else /* 小端字节序(低位存放在低地址) */
u16 owner;
u16 next;
#endif
} __ aligned(4) arch _ spinlock _ t;
成员 next 是排队号,成员 owner 是服务号。
在多处理器系统中,函数 spin_lock() 负责申请自旋锁, ARM64 架构的代码如下所示:
spin _ lock() -> raw _ spin _ lock() -> _ raw _ spin _ lock() -> __ raw _ spin _ lock() -> do _ raw _ spin _ lock() -> arch _ spin _ lock()
arch/arm64/include/asm/spinlock.h
1 static inline void arch _ spin _ lock(arch _ spinlock _ t *lock)
2 {
3 unsigned int tmp;
4 arch _ spinlock _ t lockval, newval;
5
6 asm volatile(
7 ARM64 _ LSE _ ATOMIC _ INSN(
8 /* LL/SC */
9 " prfm pstl1strm, %3\n"
10 "1: ldaxr %w0, %3\n"
11 " add %w1, %w0, %w5\n"
12 " stxr %w2, %w1, %3\n"
13 " cbnz %w2, 1b\n",
14 /* 大系统扩展的原子指令 */
15 " mov %w2, %w5\n"
16 " ldadda %w2, %w0, %3\n"
17 __ nops(3)
18 )
19
20 /* 我们得到锁了吗? */
21 " eor %w1, %w0, %w0, ror #16\n"
22 " cbz %w1, 3f\n"
23 " sevl\n"
24 "2: wfe\n"
25 " ldaxrh %w2, %4\n"
26 " eor %w1, %w2, %w0, lsr #16\n"
27 " cbnz %w1, 2b\n"
28 /* 得到锁,临界区从这里开始 */
29 "3:"
30 : "=&r" (lockval), "=&r" (newval), "=&r" (tmp), "+Q" (*lock)
31 : "Q" (lock->owner), "I" (1 << TICKET _ SHIFT)
32 : "memory");
33 }
第 6 ~ 18 行代码,申请排队号,然后把自旋锁的排队号加 1 ,这是一个原子操作,有两种实现方法:
1 )第 9 ~ 13 行代码,使用指令 ldaxr (带有获取语义的独占加载)和 stxr (独占存储)实现,指令 ldaxr 带有获取语义,后面的加载 / 存储指令必须在指令 ldaxr 完成之后开始执行。
2 )第 15 ~ 16 行代码,如果处理器支持大系统扩展,那么使用带有获取语义的原子加法指令 ldadda 实现,指令 ldadda 带有获取语义,后面的加载 / 存储指令必须在指令 ldadda 完成之后开始执行。
第 21 ~ 22 行代码,如果服务号等于当前进程的排队号,进入临界区。
第 24 ~ 27 行代码,如果服务号不等于当前进程的排队号,那么自旋等待。使用指令 ldaxrh (带有获取语义的独占加载, h 表示 halfword ,即 2 字节)读取服务号,指令 ldaxrh 带有获取语义,后面的加载 / 存储指令必须在指令 ldaxrh 完成之后开始执行。
第 23 行代码, sevl ( send event local )指令的功能是发送一个本地事件,避免错过其他处理器释放自旋锁时发送的事件。
第 24 行代码, wfe ( wait for event )指令的功能是使处理器进入低功耗状态,等待事件。
函数 spin_unlock() 负责释放自旋锁, ARM64 架构的代码如下所示:
spin _ unlock() -> raw _ spin _ unlock() -> _ raw _ spin _ unlock() -> __ raw _ spin _ unlock() -> do _ raw _ spin _ unlock() -> arch _ spin _ unlock()
arch/arm64/include/asm/spinlock.h
1 static inline void arch _ spin _ unlock(arch _ spinlock _ t *lock)
2 {
3 unsigned long tmp;
4
5 asm volatile(ARM64 _ LSE _ ATOMIC _ INSN(
6 /* LL/SC */
7 " ldrh %w1, %0\n"
8 " add %w1, %w1, #1\n"
9 " stlrh %w1, %0",
10 /* 大多统扩展的原子指令 */
11 " mov %w1, #1\n"
12 " staddlh %w1, %0\n"
13 __ nops(1))
14 : "=Q" (lock->owner), "=&r" (tmp)
15 :
16 : "memory");
17 }
把自旋锁的服务号加 1 ,有两种实现方法:
( 1 )第 7 ~ 9 行代码,使用指令 ldrh (加载, h 表示 halfword ,即 2 字节)和 stlrh (带有释放语义的存储)实现,指令 stlrh 带有释放语义,前面的加载 / 存储指令必须在指令 stlrh 开始执行之前执行完。因为一次只能有一个进程进入临界区,所以只有一个进程把自旋锁的服务号加 1 ,不需要是原子操作。
( 2 )第 11 ~ 12 行代码,如果处理器支持大系统扩展,那么使用带有释放语义的原子加法指令 staddlh 实现,指令 staddlh 带有释放语义,前面的加载 / 存储指令必须在指令 staddlh 开始执行之前执行完。
在单处理器系统中,自旋锁是空的。
include/linux/spinlock _ types _ up.h
typedef struct { } arch _ spinlock _ t;
函数 spin_lock() 只是禁止内核抢占。
spin _ lock() -> raw _ spin _ lock() -> _ raw _ spin _ lock()
include/linux/spinlock _ api _ up.h
#define _ raw _ spin _ lock(lock) __ LOCK(lock)
#define __ LOCK(lock) \
do { preempt _ disable(); ___ LOCK(lock); } while (0)
#define ___ LOCK(lock) \
do { __ acquire(lock); (void)(lock); } while (0)
4. MCS自旋锁
入场券自旋锁存在性能问题:所有等待同一个自旋锁的处理器在同一个变量上自旋等待,申请或者释放锁的时候会修改锁,导致其他处理器存放自旋锁的缓存行失效,在拥有几百甚至几千个处理器的大型系统中,处理器申请自旋锁时竞争可能很激烈,缓存同步的开销很大,导致系统性能大幅度下降。
MCS ( MCS 是“ Mellor-Crummey ”和“ Scott ”这两个发明人的名字的首字母缩写)自旋锁解决了这个缺点,它的策略是为每个处理器创建一个变量副本,每个处理器在申请自旋锁的时候在自己的本地变量上自旋等待,避免缓存同步的开销。
4.1. 传统的 MCS 自旋锁
传统的 MCS 自旋锁包含:
( 1 )一个指针 tail 指向队列的尾部。
( 2 )每个处理器对应一个队列节点,即 mcs_lock_node 结构体,其中成员 next 指向队列的下一个节点,成员 locked 指示锁是否被其他处理器占有,如果成员 locked 的值为 1 ,表示锁被其他处理器占有。
结构体的定义如下所示:
typedef struct __mcs_lock_node {
struct __mcs_lock_node *next;
int locked;
} ____cacheline_aligned_in_smp mcs_lock_node;
typedef struct {
mcs_lock_node *tail;
mcs_lock_node nodes[NR_CPUS];/* NR_CPUS 是处理器的数量 */
} spinlock_t;
其中“ ____cacheline_aligned_in_smp ”的作用是:在多处理器系统中,结构体的起始地址和长度都是一级缓存行长度的整数倍。
当没有处理器占有或者等待自旋锁的时候,队列是空的, tail 是空指针。
图 4 . 1 处理器 0 申请 MCS 自旋锁
如图 4 . 1 所示,当处理器 0 申请自旋锁的时候,执行原子交换操作,使 tail 指向处理器 0 的 mcs_lock_node 结构体,并且返回 tail 的旧值。 tail 的旧值是空指针,说明自旋锁处于空闲状态,那么处理器 0 获得自旋锁。
图 4 . 2 处理器 1 申请 MCS 自旋锁
如图 4 . 2 所示,当处理器 0 占有自旋锁的时候,处理器 1 申请自旋锁,执行原子交换操作,使 tail 指向处理器 1 的 mcs_lock_node 结构体,并且返回 tail 的旧值。 tail 的旧值是处理器 0 的 mcs_lock_node 结构体的地址,说明自旋锁被其他处理器占有,那么使处理器 0 的 mcs_lock_node 结构体的成员 next 指向处理器 1 的 mcs_lock_node 结构体,把处理器 1 的 mcs_lock_node 结构体的成员 locked 设置为 1 ,然后处理器 1 在自己的 mcs_lock_node 结构体的成员 locked 上面自旋等待,等待成员 locked 的值变成 0 。
图 4 . 3 处理器 0 释放 MCS 自旋锁
如图 4 . 3 所示,处理器 0 释放自旋锁,发现自己的 mcs_lock_node 结构体的成员 next 不是空指针,说明有申请者正在等待锁,于是把下一个节点的成员 locked 设置为 0 ,处理器 1 获得自旋锁。
处理器 1 释放自旋锁,发现自己的 mcs_lock_node 结构体的成员 next 是空指针,说明自己是最后一个申请者,于是执行原子比较交换操作:如果 tail 指向自己的 mcs_lock_node 结构体,那么把 tail 设置为空指针。
4.2. 小巧的 MCS 自旋锁
传统的 MCS 自旋锁存在的缺陷是:结构体的长度太大,因为 mcs_lock_node 结构体的起始地址和长度都必须是一级缓存行长度的整数倍,所以 MCS 自旋锁的长度是(一级缓存行长度 + 处理器数量 * 一级缓存行长度),而入场券自旋锁的长度只有 4 字节。自旋锁被嵌入到内核的很多结构体中,如果自旋锁的长度增加,会导致这些结构体的长度增加。
经过内核社区技术专家的努力,成功地把 MCS 自旋锁放进 4 个字节,实现了小巧的 MCS 自旋锁。自旋锁的定义如下所示:
include/asm-generic/qspinlock_types.h
typedef struct qspinlock {
atomic_t val;
} arch_spinlock_t;
另外,为每个处理器定义 1 个队列节点数组,如下所示:
kernel/locking/qspinlock.c
#ifdef CONFIG_PARAVIRT_SPINLOCKS
#define MAX_NODES 8
#else
#define MAX_NODES 4
#endif
static DEFINE_PER_CPU_ALIGNED(struct mcs_spinlock, mcs_nodes[MAX_NODES]);
配置宏 CONFIG_PARAVIRT_SPINLOCKS 用来启用半虚拟化的自旋锁,给虚拟机使用,本文不考虑这种使用场景。每个处理器需要 4 个队列节点,原因如下:
(1) 申请自旋锁的函数禁止内核抢占,所以进程在等待自旋锁的过程中不会被其他进程抢占。
(2) 进程在等待自旋锁的过程中可能被软中断抢占,然后软中断等待另一个自旋锁。
(3) 软中断在等待自旋锁的过程中可能被硬中断抢占,然后硬中断等待另一个自旋锁。
(4) 硬中断在等待自旋锁的过程中可能被不可屏蔽中断抢占,然后不可屏蔽中断等待另一个自旋锁。
综上所述,一个处理器最多同时等待 4 个自旋锁。
和入场券自旋锁相比, MCS 自旋锁增加的内存开销是数组 mcs_nodes 。
队列节点的定义如下所示:
kernel/locking/mcs_spinlock.h
struct mcs_spinlock {
struct mcs_spinlock *next;
int locked;
int count;
};
其中成员 next 指向队列的下一个节点;成员 locked 指示锁是否被前一个等待者占有,如果值为 1 ,表示锁被前一个等待者占有;成员 count 是嵌套层数,也就是数组 mcs_nodes 已分配的数组项的数量。
自旋锁的 32 个二进制位被划分成 4 个字段:
(1) locked 字段,指示锁已经被占有,长度是一个字节,占用第 0~7 位。
(2) 一个 pending 位,占用第 8 位,第 1 个等待自旋锁的处理器设置 pending 位。
(3) index 字段,是数组索引,指示队列的尾部节点使用数组 mcs_nodes 的哪一项。
(4) cpu 字段,存放队列的尾部节点的处理器编号,实际存储的值是处理器编号加上 1 , cpu 字段减去 1 才是真实的处理器编号。
index 字段和 cpu 字段合起来称为 tail 字段,存放队列的尾部节点的信息,布局分两种情况:
(1) 如果处理器的数量小于,那么第 9~15 位没有使用,第 16~17 位是 index 字段,第 18~31 位是 cpu 字段。
(2) 如果处理器的数量大于或等于,那么第 9~10 位是 index 字段,第 11~31 位是 cpu 字段。
把 MCS 自旋锁放进 4 个字节的关键是:存储处理器编号和数组索引,而不是存储尾部节点的地址。
内核对 MCS 自旋锁做了优化:第 1 个等待自旋锁的处理器直接在锁自身上面自旋等待,不是在自己的 mcs_spinlock 结构体上自旋等待。这个优化带来的好处是:当锁被释放的时候,不需要访问 mcs_spinlock 结构体的缓存行,相当于减少了一次缓存没命中。后续的处理器在自己的 mcs_spinlock 结构体上面自旋等待,直到它们移动到队列的首部为止。
自旋锁的 pending 位进一步扩展这个优化策略。第 1 个等待自旋锁的处理器简单地设置 pending 位,不需要使用自己的 mcs_spinlock 结构体。第 2 个处理器看到 pending 被设置,开始创建等待队列,在自己的 mcs_spinlock 结构体的 locked 字段上自旋等待。这种做法消除了两个等待者之间的缓存同步,而且第 1 个等待者没使用自己的 mcs_spinlock 结构体,减少了一次缓存行没命中。
在多处理器系统中,申请 MCS 自旋锁的代码如下所示:
spin _ lock() -> raw _ spin _ lock() -> _ raw _ spin _ lock() -> __ raw _ spin _ lock() -> do _ raw _ spin _ lock() -> arch _ spin _ lock()
include/asm-generic/qspinlock.h
1 #define arch_spin_lock(l) queued_spin_lock(l)
2
3 static __always_inline void queued_spin_lock(struct qspinlock *lock)
4 {
5 u32 val;
6
7 val = atomic_cmpxchg_acquire(&lock->val, 0, _Q_LOCKED_VAL);
8 if (likely(val == 0))
9 return;
10 queued_spin_lock_slowpath(lock, val);
11 }
第 7 行代码,执行带有获取语义的原子比较交换操作,如果锁的值是 0 ,那么把锁的 locked 字段设置为 1 。获取语义保证后面的加载 / 存储指令必须在函数 atomic_cmpxchg_acquire() 完成之后开始执行。函数 atomic_cmpxchg_acquire() 返回锁的旧值。
第 8~9 行代码,如果锁的旧值是 0 ,说明申请锁的时候锁处于空闲状态,那么成功地获得锁。
第 10 行代码,如果锁的旧值不是 0 ,说明锁不是处于空闲状态,那么执行申请自旋锁的慢速路径。
申请 MCS 自旋锁的慢速路径如下所示:
kernel/locking/qspinlock.c
1 void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
2 {
3 struct mcs_spinlock *prev, *next, *node;
4 u32 new, old, tail;
5 int idx;
6
7 ...
8 if (val == _Q_PENDING_VAL) {
9 while ((val = atomic_read(&lock->val)) == _Q_PENDING_VAL)
10 cpu_relax();
11
12
13 for (;;) {
14 if (val & ~_Q_LOCKED_MASK)
15 goto queue;
16
17 new = _Q_LOCKED_VAL;
18 if (val == new)
19 new |= _Q_PENDING_VAL;
20
21 old = atomic_cmpxchg_acquire(&lock->val, val, new);
22 if (old == val)
23 break;
24
25 val = old;
26
27
28 if (new == _Q_LOCKED_VAL)
29 return;
30
31 smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_MASK));
32
33 clear_pending_set_locked(lock);
34 return;
35
36 queue:
37 node = this_cpu_ptr(&mcs_nodes[0]);
38 idx = node->count++;
39 tail = encode_tail(smp_processor_id(), idx);
40
41 node += idx;
42 node->locked = 0;
43 node->next = NULL;
44
45
46 if (queued_spin_trylock(lock))
47 goto release;
48
49 old = xchg_tail(lock, tail);
50 next = NULL;
51
52 if (old & _Q_TAIL_MASK) {
53 prev = decode_tail(old);
54 smp_read_barrier_depends();
55
56 WRITE_ONCE(prev->next, node);
57
58 ...
59 arch_mcs_spin_lock_contended(&node->locked);
60
61 next = READ_ONCE(node->next);
62 if (next)
63 prefetchw(next);
64
65
66
67 val = smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_PENDING_MASK));
68
69 locked:
70 for (;;) {
71 if ((val & _Q_TAIL_MASK) != tail) {
72 set_locked(lock);
73 break;
74 }
75
76 old = atomic_cmpxchg_relaxed(&lock->val, val, _Q_LOCKED_VAL);
77 if (old == val)
78 goto release;
79
80 val = old;
81
82
83 if (!next) {
84 while (!(next = READ_ONCE(node->next)))
85 cpu_relax();
86
87
88 arch_mcs_spin_unlock_contended(&next->locked);
89
90
91 release:
92 __this_cpu_dec(mcs_nodes[0].count);
93 }
第 8~11 行代码,如果锁的状态是 pending ,即 {tail=0 , pending=1 , locked=0} ,那么等待锁的状态变成 locked ,即 {tail=0 , pending=0 , locked=1} 。
第 14~15 行代码,如果锁的 tail 字段不是 0 或者 pending 位是 1 ,说明已经有处理器在等待自旋锁,那么跳转到标号 queue ,本处理器加入等待队列。
第 17~21 行代码,如果锁处于 locked 状态,那么把锁的状态设置为 locked & pending ,即 {tail=0 , pending=1 , locked=1} ;如果锁处于空闲状态(占有锁的处理器刚刚释放自旋锁),那么把锁的状态设置为 locked 。
第 28~29 行代码,如果上一步锁的状态从空闲变成 locked ,那么成功地获得锁。
第 31 行代码,等待占有锁的处理器释放自旋锁,即锁的 locked 字段变成 0 。
第 32 行代码,成功地获得锁,把锁的状态从 pending 改成 locked ,即清除 pending 位,把 locked 字段设置为 1 。
从第 2 个等待自旋锁的处理器开始,需要加入等待队列,处理如下:
(1) 第 37~43 行代码,从本处理器的数组 mcs_nodes 分配一个数组项,然后初始化。
(2) 第 46~47 行代码,如果锁处于空闲状态,那么获得锁。
(3) 第 49 行代码,把自旋锁的 tail 字段设置为本处理器的队列节点的信息,并且返回前一个队列节点的信息。
(4) 第 52 行代码,如果本处理器的队列节点不是队列首部,那么处理如下:
1 )第 56 行代码,把前一个队列节点的 next 字段设置为本处理器的队列节点的地址。
2 )第 59 行代码,本处理器在自己的队列节点的 locked 字段上面自旋等待,等待 locked 字段从 0 变成 1 ,也就是等待本处理器的队列节点移动到队列首部。
(5) 第 67 行代码,本处理器的队列节点移动到队列首部以后,在锁自身上面自旋等待,等待自旋锁的 pending 位和 locked 字段都变成 0 ,也就是等待锁的状态变成空闲。
(6) 锁的状态变成空闲以后,本处理器把锁的状态设置为 locked ,分两种情况:
1 )第 71 行代码,如果队列还有其他节点,即还有其他处理器在等待锁,那么处理如下:
q 第 72 行代码,把锁的 locked 字段设置为 1 。
q 第 83~86 行代码,等待下一个等待者设置本处理器的队列节点的 next 字段。
q 第 88 行代码,把下一个队列节点的 locked 字段设置为 1 。
2 )第 76 行代码,如果队列只有一个节点,即本处理器是唯一的等待者,那么把锁的 tail 字段设置为 0 ,把 locked 字段设置为 1 。
(7) 第 92 行代码,释放本处理器的队列节点。
释放 MCS 自旋锁的代码如下所示:
spin _ unlock() -> raw _ spin _ unlock() -> _ raw _ spin _ unlock() -> __ raw _ spin _ unlock() -> do _ raw _ spin _ unlock() -> arch _ spin _ unlock()
include/asm-generic/qspinlock.h
1 #define arch_spin_unlock(l) queued_spin_unlock(l)
2
3 static __always_inline void queued_spin_unlock(struct qspinlock *lock)
4 {
5 (void)atomic_sub_return_release(_Q_LOCKED_VAL, &lock->val);
6 }
第 5 行代码,执行带释放语义的原子减法操作,把锁的 locked 字段设置为 0 ,释放语义保证前面的加载 / 存储指令在函数 atomic_sub_return_release() 开始执行之前执行完。
MCS 自旋锁的配置宏是 CONFIG_ARCH_USE_QUEUED_SPINLOCKS 和 CONFIG_QUEUED_SPINLOCKS ,目前只有 x86 处理器架构使用 MCS 自旋锁,默认开启 MCS 自旋锁的配置宏,如下所示:
arch/x86/kconfig
config X86
def_bool y
...
select ARCH_USE_QUEUED_SPINLOCKS
...
kernel/kconfig.locks
config ARCH_USE_QUEUED_SPINLOCKS
bool
config QUEUED_SPINLOCKS
def_bool y if ARCH_USE_QUEUED_SPINLOCKS
depends on SMP
以上所述就是小编给大家介绍的《Linux内核的自旋锁》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- golang 自旋锁
- 自旋锁和互斥锁区别 --- 经典
- 轻松搞懂Java中的自旋锁 原 荐
- 面试官:你说说互斥锁、自旋锁、读写锁、悲观锁、乐观锁的应用场景
- 内核必须懂(六): 使用kgdb调试内核
- Linux内核如何替换内核函数并调用原始函数
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Numerical Recipes 3rd Edition
William H. Press、Saul A. Teukolsky、William T. Vetterling、Brian P. Flannery / Cambridge University Press / 2007-9-6 / GBP 64.99
Do you want easy access to the latest methods in scientific computing? This greatly expanded third edition of Numerical Recipes has it, with wider coverage than ever before, many new, expanded and upd......一起来看看 《Numerical Recipes 3rd Edition》 这本书的介绍吧!
XML、JSON 在线转换
在线XML、JSON转换工具
RGB CMYK 转换工具
RGB CMYK 互转工具