内容简介:在更多文章见个人博客:ReentrantLock是jdk中常用的锁实现,其实现逻辑主语基于AQS(juc包中的大多数同步类实现都是基于AQS);接下来会简单介绍AQS的大致原理,关于其实现细节以及各种应用,之后会写一篇文章具体分析。
在 <关于同步的一点思考-上> 中介绍了几种实现锁的方式以及 linux 底层futex的实现原理 ReentrantLock的实现网上有很多文章了,本篇文章会简单介绍下其 java 层实现,重点放在分析竞争锁失败后如何阻塞线程。 因篇幅有限,synchronized的内容将会放到下篇文章。
更多文章见个人博客: github.com/farmerjohng…
Java Lock的实现
ReentrantLock是jdk中常用的锁实现,其实现逻辑主语基于AQS(juc包中的大多数同步类实现都是基于AQS);接下来会简单介绍AQS的大致原理,关于其实现细节以及各种应用,之后会写一篇文章具体分析。
AQS
AQS是类AbstractQueuedSynchronizer.java的简称,JUC包下的ReentrantLock、CyclicBarrier、CountdownLatch都使用到了AQS。
其大致原理如下:
- AQS维护一个叫做state的int型变量和一个双向链表,state用来表示同步状态,双向链表存储的是等待锁的线程
- 加锁时首先调用tryAcquire尝试获得锁,如果获得锁失败,则将线程插入到双向链表中,并调用LockSupport.park()方法阻塞当前线程。
- 释放锁时调用LockSupport.unpark()唤起链表中的第一个节点的线程。被唤起的线程会重新走一遍竞争锁的流程。
其中tryAcquire方法是抽象方法,具体实现取决于实现类,我们常说的公平锁和非公平锁的区别就在于该方法的实现。
ReentrantLock
ReentrantLock分为公平锁和非公平锁,我们只看公平锁。 ReentrantLock.lock会调用到ReentrantLock#FairSync.lock中:
FairSync.java
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
复制代码
AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
可以看到FairSync.lock调用了AQS的 acquire
方法,而在 acquire
中首先调用 tryAcquire
尝试获得锁,以下两种情况返回true:
重入
如果 tryAcquire
失败则调用 acquireQueued
阻塞当前线程。 acquireQueued
最终会调用到 LockSupport.park()
阻塞线程。
LockSupport.park
个人认为,要深入理解锁机制,一个很重要的点是理解系统是如何阻塞线程的。
LockSupport.java
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
复制代码
park
方法的参数blocker是用于负责这次阻塞的同步对象,在AQS的调用中,这个对象就是AQS本身。我们知道synchronized关键字是需要指定一个对象的(如果作用于方法上则是当前对象或当前类),与之类似blocker就是LockSupport指定的对象。
park
方法调用了native方法 UNSAFE.park
,第一个参数代表第二个参数是否是绝对时间,第二个参数代表最长阻塞时间。
其实现如下,只保留核心代码,完整代码看查看unsafe.cpp
Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time){
...
thread->parker()->park(isAbsolute != 0, time);
...
}
复制代码
park方法在os_linux.cpp中(其他操作系统的实现在os_xxx中)
void Parker::park(bool isAbsolute, jlong time) {
...
//获得当前线程
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
//如果当前线程被设置了interrupted标记,则直接返回
if (Thread::is_interrupted(thread, false)) {
return;
}
if (time > 0) {
//unpacktime中根据isAbsolute的值来填充absTime结构体,isAbsolute为true时,time代表绝对时间且单位是毫秒,否则time是相对时间且单位是纳秒
//absTime.tvsec代表了对于时间的秒
//absTime.tv_nsec代表对应时间的纳秒
unpackTime(&absTime, isAbsolute, time);
}
//调用mutex trylock方法
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}
//_counter是一个许可的数量,跟ReentrantLock里定义的许可变量基本都是一个原理。 unpack方法调用时会将_counter赋值为1。
//_counter>0代表已经有人调用了unpark,所以不用阻塞
int status ;
if (_counter > 0) { // no wait needed
_counter = 0;
//释放mutex锁
status = pthread_mutex_unlock(_mutex);
return;
}
//设置线程状态为CONDVAR_WAIT
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
...
//等待
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
...
//释放mutex锁
status = pthread_mutex_unlock(_mutex) ;
}
复制代码
park
方法用POSIX的 pthread_cond_timedwait
方法阻塞线程,调用 pthread_cond_timedwait
前需要先获得锁,因此 park
主要流程为:
pthread_mutex_trylock pthread_cond_timedwait pthread_mutex_unlock
另外,在阻塞当前线程前,会调用 OSThreadWaitState
的构造方法将线程状态设置为 CONDVAR_WAIT
,在Jvm中Thread状态枚举如下
enum ThreadState {
ALLOCATED, // Memory has been allocated but not initialized
INITIALIZED, // The thread has been initialized but yet started
RUNNABLE, // Has been started and is runnable, but not necessarily running
MONITOR_WAIT, // Waiting on a contended monitor lock
CONDVAR_WAIT, // Waiting on a condition variable
OBJECT_WAIT, // Waiting on an Object.wait() call
BREAKPOINTED, // Suspended at breakpoint
SLEEPING, // Thread.sleep()
ZOMBIE // All done, but not reclaimed yet
};
复制代码
Linux的timedwait
由上文我们可以知道LockSupport.park方法最终是由POSIX的 pthread_cond_timedwait
的方法实现的。
我们现在就进一步看看 pthread_mutex_trylock
, pthread_cond_timedwait
, pthread_mutex_unlock
这几个方法是如何实现的。
Linux系统中相关代码在glibc库中。
pthread_mutex_trylock
先看trylock的实现,
代码在glibc的 pthread_mutex_trylock.c
文件中,该方法代码很多,我们只看主要代码
//pthread_mutex_t是posix中的互斥锁结构体
int
__pthread_mutex_trylock (mutex)
pthread_mutex_t *mutex;
{
int oldval;
pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
switch (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex),
PTHREAD_MUTEX_TIMED_NP))
{
case PTHREAD_MUTEX_ERRORCHECK_NP:
case PTHREAD_MUTEX_TIMED_NP:
case PTHREAD_MUTEX_ADAPTIVE_NP:
/* Normal mutex. */
if (lll_trylock (mutex->__data.__lock) != 0)
break;
/* Record the ownership. */
mutex->__data.__owner = id;
++mutex->__data.__nusers;
return 0;
}
}
//以下代码在lowlevellock.h中
#define __lll_trylock(futex) \
(atomic_compare_and_exchange_val_acq (futex, 1, 0) != 0)
#define lll_trylock(futex) __lll_trylock (&(futex))
复制代码
mutex默认用的是 PTHREAD_MUTEX_NORMAL
类型(与 PTHREAD_MUTEX_TIMED_NP
相同);
因此会先调用 lll_trylock
方法, lll_trylock
实际上是一个cas操作,如果mutex->__data.__lock==0则将其修改为1并返回0,否则返回1。
如果成功,则更改mutex中的owner为当前线程。
pthread_mutex_unlock
pthread_mutex_unlock.c
int
internal_function attribute_hidden
__pthread_mutex_unlock_usercnt (mutex, decr)
pthread_mutex_t *mutex;
int decr;
{
if (__builtin_expect (type, PTHREAD_MUTEX_TIMED_NP)
== PTHREAD_MUTEX_TIMED_NP)
{
/* Always reset the owner field. */
normal:
mutex->__data.__owner = 0;
if (decr)
/* One less user. */
--mutex->__data.__nusers;
/* Unlock. */
lll_unlock (mutex->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex));
return 0;
}
}
复制代码
pthread_mutex_unlock
将mutex中的owner清空,并调用了 lll_unlock
方法
lowlevellock.h
#define __lll_unlock(futex, private) \
((void) ({ \
int *__futex = (futex); \
int __val = atomic_exchange_rel (__futex, 0); \
\
if (__builtin_expect (__val > 1, 0)) \
lll_futex_wake (__futex, 1, private); \
}))
#define lll_unlock(futex, private) __lll_unlock(&(futex), private)
#define lll_futex_wake(ftx, nr, private) \
({ \
DO_INLINE_SYSCALL(futex, 3, (long) (ftx), \
__lll_private_flag (FUTEX_WAKE, private), \
(int) (nr)); \
_r10 == -1 ? -_retval : _retval; \
})
复制代码
lll_unlock
分为两个步骤:
- 将futex设置为0并拿到设置之前的值(用户态操作)
-
如果futex之前的值>1,代表存在锁冲突,也就是说有线程调用了
FUTEX_WAIT在休眠,所以通过调用系统函数FUTEX_WAKE唤醒休眠线程
FUTEX_WAKE
在上一篇文章有分析,futex机制的核心是当获得锁时,尝试cas更改一个int型变量(用户态操作),如果integer原始值是0,则修改成功,该线程获得锁,否则就将当期线程放入到 wait queue中,wait queue中的线程不会被系统调度(内核态操作)。
futex变量的值有3种:0代表当前锁空闲,1代表有线程持有当前锁,2代表存在锁冲突。futex的值初始化时是0;当调用try_lock的时候会利用cas操作改为1(见上面的trylock函数);当调用 lll_lock
时,如果不存在锁冲突,则将其改为1,否则改为2。
#define __lll_lock(futex, private) \
((void) ({ \
int *__futex = (futex); \
if (__builtin_expect (atomic_compare_and_exchange_bool_acq (__futex, \
1, 0), 0)) \
{ \
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
__lll_lock_wait_private (__futex); \
else \
__lll_lock_wait (__futex, private); \
} \
}))
#define lll_lock(futex, private) __lll_lock (&(futex), private)
void
__lll_lock_wait_private (int *futex)
{
//第一次进来的时候futex==1,所以不会走这个if
if (*futex == 2)
lll_futex_wait (futex, 2, LLL_PRIVATE);
//在这里会把futex设置成2,并调用futex_wait让当前线程等待
while (atomic_exchange_acq (futex, 2) != 0)
lll_futex_wait (futex, 2, LLL_PRIVATE);
}
复制代码
pthread_cond_timedwait
pthread_cond_timedwait
用于阻塞线程,实现线程等待,
代码在glibc的 pthread_cond_timedwait.c
文件中,代码较长,你可以先简单过一遍,看完下面的分析再重新读一遍代码
int
int
__pthread_cond_timedwait (cond, mutex, abstime)
pthread_cond_t *cond;
pthread_mutex_t *mutex;
const struct timespec *abstime;
{
struct _pthread_cleanup_buffer buffer;
struct _condvar_cleanup_buffer cbuffer;
int result = 0;
/* Catch invalid parameters. */
if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)
return EINVAL;
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;
//1.获得cond锁
lll_lock (cond->__data.__lock, pshared);
//2.释放mutex锁
int err = __pthread_mutex_unlock_usercnt (mutex, 0);
if (err)
{
lll_unlock (cond->__data.__lock, pshared);
return err;
}
/* We have one new user of the condvar. */
//每执行一次wait(pthread_cond_timedwait/pthread_cond_wait),__total_seq就会+1
++cond->__data.__total_seq;
//用来执行futex_wait的变量
++cond->__data.__futex;
//标识该cond还有多少线程在使用,pthread_cond_destroy需要等待所有的操作完成
cond->__data.__nwaiters += 1 << COND_NWAITERS_SHIFT;
/* Remember the mutex we are using here. If there is already a
different address store this is a bad user bug. Do not store
anything for pshared condvars. */
//保存mutex锁
if (cond->__data.__mutex != (void *) ~0l)
cond->__data.__mutex = mutex;
/* Prepare structure passed to cancellation handler. */
cbuffer.cond = cond;
cbuffer.mutex = mutex;
/* Before we block we enable cancellation. Therefore we have to
install a cancellation handler. */
__pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer);
/* The current values of the wakeup counter. The "woken" counter
must exceed this value. */
//记录futex_wait前的__wakeup_seq(为该cond上执行了多少次sign操作+timeout次数)和__broadcast_seq(代表在该cond上执行了多少次broadcast)
unsigned long long int val;
unsigned long long int seq;
val = seq = cond->__data.__wakeup_seq;
/* Remember the broadcast counter. */
cbuffer.bc_seq = cond->__data.__broadcast_seq;
while (1)
{
//3.计算要wait的相对时间
struct timespec rt;
{
#ifdef __NR_clock_gettime
INTERNAL_SYSCALL_DECL (err);
int ret;
ret = INTERNAL_VSYSCALL (clock_gettime, err, 2,
(cond->__data.__nwaiters
& ((1 << COND_NWAITERS_SHIFT) - 1)),
&rt);
# ifndef __ASSUME_POSIX_TIMERS
if (__builtin_expect (INTERNAL_SYSCALL_ERROR_P (ret, err), 0))
{
struct timeval tv;
(void) gettimeofday (&tv, NULL);
/* Convert the absolute timeout value to a relative timeout. */
rt.tv_sec = abstime->tv_sec - tv.tv_sec;
rt.tv_nsec = abstime->tv_nsec - tv.tv_usec * 1000;
}
else
# endif
{
/* Convert the absolute timeout value to a relative timeout. */
rt.tv_sec = abstime->tv_sec - rt.tv_sec;
rt.tv_nsec = abstime->tv_nsec - rt.tv_nsec;
}
#else
/* Get the current time. So far we support only one clock. */
struct timeval tv;
(void) gettimeofday (&tv, NULL);
/* Convert the absolute timeout value to a relative timeout. */
rt.tv_sec = abstime->tv_sec - tv.tv_sec;
rt.tv_nsec = abstime->tv_nsec - tv.tv_usec * 1000;
#endif
}
if (rt.tv_nsec < 0)
{
rt.tv_nsec += 1000000000;
--rt.tv_sec;
}
/*---计算要wait的相对时间 end---- */
//是否超时
/* Did we already time out? */
if (__builtin_expect (rt.tv_sec < 0, 0))
{
//被broadcast唤醒,这里疑问的是,为什么不需要判断__wakeup_seq?
if (cbuffer.bc_seq != cond->__data.__broadcast_seq)
goto bc_out;
goto timeout;
}
unsigned int futex_val = cond->__data.__futex;
//4.释放cond锁,准备wait
lll_unlock (cond->__data.__lock, pshared);
/* Enable asynchronous cancellation. Required by the standard. */
cbuffer.oldtype = __pthread_enable_asynccancel ();
//5.调用futex_wait
/* Wait until woken by signal or broadcast. */
err = lll_futex_timed_wait (&cond->__data.__futex,
futex_val, &rt, pshared);
/* Disable asynchronous cancellation. */
__pthread_disable_asynccancel (cbuffer.oldtype);
//6.重新获得cond锁,因为又要访问&修改cond的数据了
lll_lock (cond->__data.__lock, pshared);
//__broadcast_seq值发生改变,代表发生了有线程调用了广播
if (cbuffer.bc_seq != cond->__data.__broadcast_seq)
goto bc_out;
//判断是否是被sign唤醒的,sign会增加__wakeup_seq
//第二个条件cond->__data.__woken_seq != val的意义在于
//可能两个线程A、B在wait,一个线程调用了sign导致A被唤醒,这时B因为超时被唤醒
//对于B线程来说,执行到这里时第一个条件也是满足的,从而导致上层拿到的result不是超时
//所以这里需要判断下__woken_seq(即该cond已经被唤醒的线程数)是否等于__wakeup_seq(sign执行次数+timeout次数)
val = cond->__data.__wakeup_seq;
if (val != seq && cond->__data.__woken_seq != val)
break;
/* Not woken yet. Maybe the time expired? */
if (__builtin_expect (err == -ETIMEDOUT, 0))
{
timeout:
/* Yep. Adjust the counters. */
++cond->__data.__wakeup_seq;
++cond->__data.__futex;
/* The error value. */
result = ETIMEDOUT;
break;
}
}
//一个线程已经醒了所以这里__woken_seq +1
++cond->__data.__woken_seq;
bc_out:
//
cond->__data.__nwaiters -= 1 << COND_NWAITERS_SHIFT;
/* If pthread_cond_destroy was called on this variable already,
notify the pthread_cond_destroy caller all waiters have left
and it can be successfully destroyed. */
if (cond->__data.__total_seq == -1ULL
&& cond->__data.__nwaiters < (1 << COND_NWAITERS_SHIFT))
lll_futex_wake (&cond->__data.__nwaiters, 1, pshared);
//9.cond数据修改完毕,释放锁
lll_unlock (cond->__data.__lock, pshared);
/* The cancellation handling is back to normal, remove the handler. */
__pthread_cleanup_pop (&buffer, 0);
//10.重新获得mutex锁
err = __pthread_mutex_cond_lock (mutex);
return err ?: result;
}
复制代码
上面的代码虽然加了注释,但相信大多数人第一次看都看不懂。
我们来简单梳理下,上面代码有两把锁,一把是mutex锁,一把cond锁。另外,在调用 pthread_cond_timedwait
前后必须调用 pthread_mutex_lock(&mutex);
和 pthread_mutex_unlock(&mutex);
加/解mutex锁。
因此 pthread_cond_timedwait
的使用大致分为几个流程:
pthread_cond_timedwait pthread_cond_timedwait
看到这里,你可能有几点疑问:为什么需要两把锁?mutex锁和cond锁的作用是什么?
mutex锁
说mutex锁的作用之前,我们回顾一下java的Object.wait的使用。Object.wait必须是在synchronized同步块中使用。试想下如果不加synchronized也能运行Object.wait的话会存在什么问题?
Object condObj=new Object();
voilate int flag = 0;
public void waitTest(){
if(flag == 0){
condObj.wait();
}
}
public void notifyTest(){
flag=1;
condObj.notify();
}
复制代码
如上代码,A线程调用waitTest,这时flag==0,所以准备调用wait方法进行休眠,这时B线程开始执行,调用notifyTest将flag置为1,并调用notify方法,注意:此时A线程还没调用wait,所以notfiy没有唤醒任何线程。然后A线程继续执行,调用wait方法进行休眠,而之后不会有人来唤醒A线程,A线程将永久wait下去!
Object condObj=new Object();
voilate int flag = 0;
public void waitTest(){
synchronized(condObj){
if(flag == 0){
condObj.wait();
}
}
}
public void notifyTest(){
synchronized(condObj){
flag=1;
condObj.notify();
}
}
复制代码
在有锁保护下的情况下, 当调用condObj.wait时,flag一定是等于0的,不会存在一直wait的问题。
回到 pthread_cond_timedwait
,其需要加mutex锁的原因就呼之欲出了: 保证wait和其wait条件的原子性
不管是glibc的 pthread_cond_timedwait
/ pthread_cond_signal
还是java层的 Object.wait
/ Object.notify
,Jdk AQS的 Condition.await
/ Condition.signal
,所有的Condition机制都需要在加锁环境下才能使用,其根本原因就是要保证进行线程休眠时,条件变量是没有被篡改的。
注意下mutex锁释放的时机,回顾上文中 pthread_cond_timedwait
的流程,在第2步时就释放了mutex锁,之后调用 futex_wait
进行休眠,为什么要在休眠前就释放mutex锁呢?原因也很简单:如果不释放mutex锁就开始休眠,那其他线程就永远无法调用signal方法将休眠线程唤醒(因为调用signal方法前需要获得mutex锁)。
在线程被唤醒之后还要在第10步中重新获得mutex锁是为了保证锁的语义(思考下如果不重新获得mutex锁会发生什么)。
cond锁
cond锁的作用其实很简单: 保证对象 cond->data
的线程安全。
在 pthread_cond_timedwait
时需要修改 cond->data
的数据,如增加__total_seq(在这个cond上一共执行过多少次wait)增加__nwaiters(现在还有多少个线程在wait这个cond),所有在修改及访问 cond->data
时需要加cond锁。
这里我没想明白的一点是,用mutex锁也能保证 cond->data
修改的线程安全,只要晚一点释放mutex锁就行了。为什么要先释放mutex,重新获得cond来保证线程安全? 是为了避免mutex锁住的范围太大吗?
如何唤醒休眠线程
唤醒休眠线程的代码比较简单,主要就是调用lll_futex_wake。
int
__pthread_cond_signal (cond)
pthread_cond_t *cond;
{
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;
//因为要操作cond的数据,所以要加锁
lll_lock (cond->__data.__lock, pshared);
/* Are there any waiters to be woken? */
if (cond->__data.__total_seq > cond->__data.__wakeup_seq)
{
//__wakeup_seq为执行sign与timeout次数的和
++cond->__data.__wakeup_seq;
++cond->__data.__futex;
...
//唤醒wait的线程
lll_futex_wake (&cond->__data.__futex, 1, pshared);
}
/* We are done. */
lll_unlock (cond->__data.__lock, pshared);
return 0;
}
复制代码
End
本文对Java简单介绍了ReentrantLock实现原理,对LockSupport.park底层实现 pthread_cond_timedwait
机制做了详细分析。
看完这篇文章,你可能还会有疑问:Synchronized锁的实现和ReentrantLock是一样的吗?Thread.sleep/Object.wait休眠线程的原理和LockSupport.park有什么区别?linux内核层的futex的具体是如何实现的?
这些问题,之后的文章会一一解答,尽请期待~
以上所述就是小编给大家介绍的《关于同步的一点思考-下》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Inside Larry's and Sergey's Brain
Richard Brandt / Portfolio / 17 Sep 2009 / USD 24.95
You’ve used their products. You’ve heard about their skyrocketing wealth and “don’t be evil” business motto. But how much do you really know about Google’s founders, Larry Page and Sergey Brin? Inside......一起来看看 《Inside Larry's and Sergey's Brain》 这本书的介绍吧!