内容简介:相信大家对线程锁和线程阻塞都很了解,无非就是 synchronized, wait/notify 等, 但是你有仔细想过 Java 虚拟机是如何实现锁和阻塞的呢?它们之间又有哪些联系呢?如果感兴趣的话请接着往下看。为保障多线程下处理共享数据的安全性,Java 语言给我们提供了线程锁,保证同一时刻只有一个线程能处理共享数据。当一个锁被某个线程持有的时候,另一个线程尝试去获取这个锁将产生线程阻塞,直到持有锁的线程释放了该锁。除了抢占锁的时候会出现线程阻塞,另外还有一些方法也会产生线程阻塞,比如: Object.
相信大家对线程锁和线程阻塞都很了解,无非就是 synchronized, wait/notify 等, 但是你有仔细想过 Java 虚拟机是如何实现锁和阻塞的呢?它们之间又有哪些联系呢?如果感兴趣的话请接着往下看。
为保障多线程下处理共享数据的安全性,Java 语言给我们提供了线程锁,保证同一时刻只有一个线程能处理共享数据。当一个锁被某个线程持有的时候,另一个线程尝试去获取这个锁将产生线程阻塞,直到持有锁的线程释放了该锁。
除了抢占锁的时候会出现线程阻塞,另外还有一些方法也会产生线程阻塞,比如: Object.wait(), Thread.sleep(), ArrayBlockingQueue.put() 等等,他们都有一个共同特点:不消耗 CPU 时间片。另外值得指出的是 Object.wait() 会释放持有的锁,而 Thread.sleep() 不会,相信这点大家都清楚。 当然 while(true){ } 也能产生阻塞线程的效果,自旋锁就是使用循环,配合 CAS (compareAndSet) 实现的,这个不在我们的讨论之列。
相信大家对线程锁都很熟悉,目前有两种方法,准确来说是三种,synchronized 方法,synchronized 区块,ReentrantLock。先说 synchronized,代码如下:
public class Lock { public static void synchronized print() { System.out.println("method synchronized"); } public static void print2() { synchronized(Lock.class) { System.out.println("synchronized"); } } public static void main(String[] args) { Lock.print(); Lock.print2(); } }
编译后通过如下命令查看其字节码
javap -c -v Lock
其中节选方法一(Lock.print)的字节码如下:
public static synchronized void print(); descriptor: ()V flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED Code: stack=2, locals=0, args_size=0 0: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream; 3: ldc #3 // String method synchronized 5: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 8: return }
可以看到方法表的访问标志位 (flags) 中多了个 ACC_SYNCHRONIZED,然后看字节码指令区域 (Code) ,和普通方法没任何差别, 猜测 Java 虚拟机通过检查方法表中是否存在标志位 ACC_SYNCHRONIZED 来决定是否需要获取锁,至于获取锁的原理后文会提到。
然后看第二个使用 synchronized 区块的方法(Lock.print2)字节码:
public static void print2(); descriptor: ()V flags: ACC_PUBLIC, ACC_STATIC Code: stack=2, locals=2, args_size=0 0: ldc #5 // 将锁对象 Lock.class 入栈 2: dup // 复制一份,此时栈中有两个 Lock.class 3: astore_0 // 出栈一个 Lock.class 对象保存到局部变量表 Slot 1 中 4: monitorenter // 以栈顶元素 Lock.class 作为锁,开始同步 5: getstatic #2 // 5-10 调用 System.out.println("synchronized"); 8: ldc #6 10: invokevirtual #4 13: aload_0 // 将局部变量表 Slot 1 中的数据入栈,即 Lock.class 14: monitorexit // 使用栈顶数据退出同步 15: goto 23 // 方法结束,跳转到 23 返回 18: astore_1 // 从这里开始是异常路径,将异常信息保存至局部变量表 Slot 2 中,查看异常表 19: aload_0 // 将局部变量表 Slot 1 中的 Lock.class 入栈 20: monitorexit // 使用栈顶数据退出同步 21: aload_1 // 将局部变量表 Slot 2 中的异常信息入栈 22: athrow // 把异常对象重新抛出给方法的调用者 23: return // 方法正常返回 Exception table: // 异常表 from to target type 5 15 18 any // 5-15 出现任何(any)异常跳转到 18 18 21 18 any // 18-21 出现任何(any)异常跳转到 18
synchronized 区块的字节码相比较 synchronized 方法复杂了许多。每一行字节码的含义我都作了详细注释,可以看到此时是通过字节码指令 monitorenter,monitorexit 来进入和退出同步的。特别值得注意的是,我们并没有写 try.catch 捕获异常,但是字节码指令中存在异常处理的代码,其实为了保证在方法异常完成时 monitorenter 和 monitorexit 指令依然可以正确配对执行,编译器会自动产生一个异常处理器,这个异常处理器声明可处理所有的异常,它的目的就是用来执行 monitorexit 指令。这个机制确保在 synchronized 区块中产生任何异常都可以正常退出同步,释放锁资源。
不管是检查标志位中的 ACC_SYNCHRONIZED,还是字节码指令 monitorenter,monitorexit,锁机制的实现最终肯定存在于 JVM 中,后面我们会再提到这点。
接下来继续看 ReentrantLock 的实现,鉴于篇幅有限,ReentrantLock 的原理不会讲的很详细,感兴趣的可以自行研究。ReentrantLock 是基于并发基础组件 AbstractQueuedSynchronizer 实现的,内部有一个 int 类型的 state 变量来控制同步状态,为 0 时表示无线程占用锁资源,等于 1 时表示则说明有线程占用,由于 ReentrantLock 是可重入锁,state 也可能大于 1 表示该线程有多次获取锁。AQS 内部还有一个由内部类 Node 构成的队列用来完成线程获取锁的排队。本文只是简单的介绍一下 lock 和 unLock 方法。
下面先看 ReentrantLock.lock 方法:
// ReentrantLock.java public void lock() { this.sync.lock(); } // ReentrantLock.NonfairSync.class final void lock() { // 使用 cas 设置 state,如果设置成功表示当前无其他线程竞争锁,优先获取锁资源 if (this.compareAndSetState(0, 1)) { // 保存当前线程由于后续重入锁的判断 this.setExclusiveOwnerThread(Thread.currentThread()); } else { this.acquire(1); } } // AbstractQueuedSynchronizer.java public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); // 如果阻塞被中断,重新设置中断通知调用者 } // 判断是否是重入 protected final boolean tryAcquire(int var1) { return this.nonfairTryAcquire(var1); } // 处理等待队列 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 阻塞线程 return Thread.interrupted(); }
对于锁竞争的情况,最终会调用 LockSupport.park(this) 阻塞当前线程,同样的 ReentrantLock.unlock 方法会调用 LockSupport.unpark(thread) 来恢复阻塞的线程。继续看 LockSupport 的实现:
public static void unpark(Thread thread) { if (var0 != null) { UNSAFE.unpark(thread); } } public static void park(Object obj) { Thread thread = Thread.currentThread(); setBlocker(thread, obj); UNSAFE.park(false, 0L); setBlocker(thread, (Object)null); }
LockSupport 内部调用了 UnSafe 类的 park 和 unpark, 是 native 代码,该类由虚拟机实现,以 Hotspot 虚拟机为例,查看 park 方法:
// unsafe.cpp UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) UnsafeWrapper("Unsafe_Park"); #ifndef USDT2 HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time); #else /* USDT2 */ HOTSPOT_THREAD_PARK_BEGIN( (uintptr_t) thread->parker(), (int) isAbsolute, time); #endif /* USDT2 */ JavaThreadParkedState jtps(thread, time != 0); thread->parker()->park(isAbsolute != 0, time); #ifndef USDT2 HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker()); #else /* USDT2 */ HOTSPOT_THREAD_PARK_END( (uintptr_t) thread->parker()); #endif /* USDT2 */ UNSAFE_END
调用了: thread->parker()->park(isAbsolute != 0, time); 我们可以猜测是这句代码阻塞了当前线程。HotSpot 虚拟机里的 Thread 类对应着一个 OS 的 Thread, JavaThread 类继承于 Thread, JavaThread 实例对应着一个 Java 层的 Thread.
简而言之,Java 层的 Thread 对应着一个 OS 的 Thread。使用如下代码创建线程:
//linux_os.cpp pthread_t tid; int ret = pthread_create(&tid, &attr, (void* (*)(void*)) thread_native_entry, thread);
回到 Thread 类中的 Park,我们查看 HotSpot 的 thread.hpp, 找到了如下三个 Park:
public: ParkEvent * _ParkEvent ; // for synchronized() ParkEvent * _SleepEvent ; // for Thread.sleep // JSR166 per-thread parker private: Parker* _parker;
从注释上可以看出分别是用于 synchronized 的阻塞,Thread.sleep 的阻塞还有用于 UnSafe 的线程阻塞,继续查看 park.hpp 节选:
// A word of caution: The JVM uses 2 very similar constructs: // 1. ParkEvent are used for Java-level "monitor" synchronization. // 2. Parkers are used by JSR166-JUC park-unpark. class Parker : public os::PlatformParker { // 略 } class ParkEvent : public os::PlatformEvent { // 略 }
注释上更近一步解释了两种 Parker 的区别,他们的实现非常相似,那为什么会存在两个呢?网络上有解释说是只是没重构而已。下面只看 Parker 的实现,发现 park.cpp 中并没有实现 park 方法,猜测应该是父类中实现了,因为这是和系统相关的操作,以 Linux 系统为例,查看 linux_os.cpp 找到了 park 的实现,截取了主要部分:
void Parker::park(bool isAbsolute, jlong time) { // 省略了前置判断 // 获取锁 if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { return; } if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed // 调用 pthread_cond_wait 阻塞线程 status = pthread_cond_wait (&_cond[_cur_index], _mutex) ; } else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (&_cond[_cur_index]) ; pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr()); } } _cur_index = -1; // 已从 block 中恢复,释放锁 _counter = 0 ; status = pthread_mutex_unlock(_mutex) ; // 略 }
总共分三步走,先获取锁,再调用 pthread_cond_wait 阻塞线程,最后阻塞恢复了之后释放锁,是不是和我们使用 Object.wait 十分类似,事实上 Object.wait 底层也是这种方式实现的。为了更清楚的了解底层的实现,写了一段 c 代码看一下线程的创建和锁的使用:
int counter = 0; // 互斥锁对象 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; void* add() { for(int i = 0;i < 2;++i) { // 获取锁 pthread_mutex_lock( &mutex ); ++counter; sleep(1); // 释放锁 pthread_mutex_unlock( &mutex ); printf("counter = %d\n", counter); } pthread_exit(NULL); } int main() { pthread_t thread_1, thread_2; // 创建线程 pthread_create(&thread_1, NULL, add, NULL); pthread_create(&thread_2, NULL, add, NULL); pthread_join(thread_1, NULL); pthread_join(thread_2, NULL); return 0; }
使用 pthread_create 创建线程,使用 pthread_mutex_lock 获取锁,使用 pthread_mutex_unlock 释放锁。那既然 pthread_mutex_lock 和 pthread_mutex_unlock 就能实现锁了,那为什么锁实现的时候还要使用 pthread_cond_wait 来阻塞线程呢?回过头看 PlatformParker :
//os_linux.hpp class PlatformParker { pthread_mutex_t _mutex[1]; //一个是给park用, 另一个是给parkUntil用 pthread_cond_t _cond[2]; // one for relative times and one for abs. //略... };
每个 JavaThread 实例都有自己的 mutex,在上述自己写的例子中是多个线程竞争同一个 mutex,阻塞线程队列管理的逻辑直接由 mutex 实现,而此处的 mutex 线程私有,不存在直接竞争关系,事实上,JVM 为了提升平台通用性(?),只提供了线程阻塞和恢复操作,阻塞线程队列的管理工作交给了 Java 层,也就是前面提到的 AQS。对于 Java 层来说 JVM 只需要提供 「阻塞」 和 「唤醒」 的操作即可。
在 Java 中讲解 Object.wait, Object.notify 的时候通常会用生产者-消费者作为例子,这里我也简单的写了一个 c 的例子,让大家了解底层线程阻塞的原理:
#define TRUE 1 #define FALSE 0 #define BUFFER_SIZE 10 pthread_cond_t msg_cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; char* msgBuffer[BUFFER_SIZE] = {0}; int bufferIndex = -1; int counter = 0; void* readMsg() { while (TRUE) { // 获取锁 pthread_mutex_lock( &mutex ); if (bufferIndex < 0) { printf("wait for message\n"); // 消息队列如果为空则阻塞等待 pthread_cond_wait( &msg_cond, &mutex); } for(; bufferIndex >= 0; --bufferIndex){ char* msg = msgBuffer[bufferIndex]; msgBuffer[bufferIndex] = 0; printf("read message = %s, %d\n", msg, counter++); // 通知生产者线程 pthread_cond_signal(&msg_cond); } sleep(1); // 释放锁 pthread_mutex_unlock( &mutex ); } return 0; } void* writeMsg() { // 获取锁 pthread_mutex_lock( &mutex ); if (bufferIndex < BUFFER_SIZE - 1) { char* msg = "haha!"; msgBuffer[++bufferIndex] = msg; // 通知消费者线程 pthread_cond_signal(&msg_cond); // notify(); // pthread_cond_broadcast(&msg_cond); // notifyAll(); } else { printf("message buffer is full!\n"); // 缓冲队列已满阻塞等待 pthread_cond_wait( &msg_cond, &mutex); } // 释放锁 pthread_mutex_unlock( &mutex ); return 0; } int main(int argc, char const *argv[]) { pthread_t thread_r; // 创建后台消费者线程 pthread_create(&thread_r, NULL, readMsg, NULL); for(int i = 0; i < 50; i++){ printf("send message %d \n", i); // 生产消息 writeMsg(); } pthread_join(thread_r, NULL); return 0; }
其中消费者线程是一个循环,在循环中先获取锁,然后判断队列是否为空,如果为空则调用 pthread_cond_wait 阻塞线程,这个阻塞操作会自动释放持有的锁并出让 cpu 时间片,恢复的时候自动获取锁,消费完队列之后会调用 pthread_cond_signal 通知生产者线程,另外还有一个通知所有线程恢复的 pthread_cond_broadcast,与 notifyAll 类似。
最后再简单谈一下阻塞中断,Java 层 Thread 中有个 interrupt 方法,它的作用是在线程收到阻塞的时候抛出一个中断信号,这样线程就会退出阻塞状态,但是并不是我们遇到的所有阻塞都会中断,要看是否会响应中断信号,Object.wait, Thread.join,Thread.sleep,ReentrantLock.lockInterruptibly 这些会抛出受检异常 InterruptedException 的都会被中断。synchronized,ReentrantLock.lock 的锁竞争阻塞是不会被中断的,interrupt 并不会强制终止线程,而是会将线程设置成 interrupted 状态,我们可以通过判断 isInterrupted 或 interrupted 来获取中断状态,区别在于后者会重置中断状态为 false。看一下底层线程中断的代码:
// os_linux.cpp void os::interrupt(Thread* thread) { OSThread* osthread = thread->osthread(); if (!osthread->interrupted()) { osthread->set_interrupted(true); OrderAccess::fence(); ParkEvent * const slp = thread->_SleepEvent ; if (slp != NULL) slp->unpark() ; } // For JSR166. Unpark even if interrupt status already was set if (thread->is_Java_thread()) ((JavaThread*)thread)->parker()->unpark(); ParkEvent * ev = thread->_ParkEvent ; if (ev != NULL) ev->unpark() ; }
可以看到,线程中断也是由 unpark 实现的, 即恢复了阻塞的线程。并且对之前提到的三个 Parker (_ParkEvent,_SleepEvent,_parker) 都进行了 unpark。
说到这里相信大家对 Java 线程锁与线程阻塞有个大体的了解了吧,由于本人水平实在有限,有些地方讲的不好或者有错误的地方请多包涵,如果发现任何问题,请提出讨论,我会及时修改。
>> 转载请注明来源: 深入理解 Java 锁与线程阻塞●非常感谢您的阅读,欢迎订阅 微信公众号 (右边扫一扫)以表达对我的认可与支持,我会在第一时间同步文章到公众号上。当然也可点击下方打赏按钮为我打赏。
免费分享,随意打赏
感谢打赏!
微信
支付宝
以上所述就是小编给大家介绍的《深入理解 Java 锁与线程阻塞》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Callable,阻塞队列,线程池问题
- Class.forName 造成的线程阻塞
- 聊聊线程阻塞与恢复 LockSupport类
- 15分钟读懂进程线程、同步异步、阻塞非阻塞、并发并行,太实用了!
- 多线程应用--Http请求阻塞回调处理
- Java阻塞问题:为什么JVM会在许多不同的类/方法中阻塞线程?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Music Recommendation and Discovery
Òscar Celma / Springer / 2010-9-7 / USD 49.95
With so much more music available these days, traditional ways of finding music have diminished. Today radio shows are often programmed by large corporations that create playlists drawn from a limited......一起来看看 《Music Recommendation and Discovery》 这本书的介绍吧!