内容简介:在上文已经说明,委托是构造线程安全类的一个最有效策略,也就是让现有的线程安全类管理所有的状态即可。以下将介绍这些基础构建模块。同步容器类包括Vector和Hashtable以及由同步容器类都是线程安全的,但是在某些情况下可能需要额外的客户端加锁来保护复合操作。
在上文已经说明,委托是构造线程安全类的一个最有效策略,也就是让现有的线程安全类管理所有的状态即可。以下将介绍这些基础构建模块。
同步容器类
同步容器类包括Vector和Hashtable以及由 Collections.synchronizedXxx
等工厂方法创建的同步封装器类。这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。同步容器对所有容器状态的访问都串行化,严重降低了并发性;当多个线程竞争锁时,吞吐量严重下降。
同步容器类存在的问题
同步容器类都是线程安全的,但是在某些情况下可能需要额外的客户端加锁来保护复合操作。
比如,在Vecotr中,getLast()和deleteLast()操作,如果是在多线程的环境下运行,如果不加锁,会产生异常情况。一个线程在getLast()后,另一个线程deleteLast(),然后该线程继续执行,进行deleteLast()操作,此时会抛出下标越界的异常。
又比如,在迭代的过程中,使用get(index)的操作,如果有多个线程运行,可能会删除其中元素,同样会造成异常。
对于如上的情况,我们需要通过客户端加锁来解决线程安全的问题。如在迭代时加锁:
synchronized(vector){ for(int i=0;i<vector.size();i++){ vector.get(i); } } 复制代码
迭代器
在迭代或者for-each循环语法时,对容器类进行迭代的标准方式都是使用Iterator。然而,在设计同步容器类的迭代器时并没有考虑到并发修改的问题,并且它们表现出的行为时“及时失败”的,也就是当它们发现容器在迭代过程中被修改时,就会抛出ConcurrentModificationException。
如果在迭代期间,对容器加锁,首先会降低效率,提高线程的等待时间;然后还可能会产生死锁;降低了吞吐量和CPU的利用率。
如果不希望在迭代期间加锁,可以使用克隆容器的方法,并在克隆副本上进行迭代。
加锁可以防止迭代器抛出ConcurrentModificationException,但是要在所有对容器进行迭代的地方都要加锁。如hashCode,equals,containsAll,removeAll,retainAll等方法,在以容器为参数时,都会对容器进行迭代。这些间接的迭代操作可能抛出ConcurrentModificationException。
并发容器
Java 5.0提供了多种并发容器类来改进同步容器的性能。同步容器对所有容器状态的访问都串行化,严重降低了并发性;当多个线程竞争锁时,吞吐量严重下降。
并发容器是针对多个线程并发访问设计的。通过并发容器来替代同步容器,可以极大地提高伸缩性并降低风险。并发容器包括ConcurrentHashMap(替代Map),CopyOnWriteArrayList(替代List),ConcurrentLinkedQueue,BlockingQueue等等。
ConcurrentHashMap
同步容器类在执行每个操作期间都持有一个锁。ConcurrentHashMap采用了不同的加锁策略来提供更高的并发性和伸缩性。它并不是将每个方法都在同一个锁上同步,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁。
分段锁机制使得任意数量的读取线程可以并发访问Map,执行读取操作的线程和执行写入操作的线程可以并发访问Map,并且一定数量的写入线程可以并发地修改Map,因此提高了并发访问的吞吐量。
并发容器增强了同步容器类,它们提供的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。其迭代器具有弱一致性,可以容忍并发的修改,在创建迭代器时会遍历已有元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。size(),isEmpty()等方法返回的是一个近似值。
由于ConcurrentHashMap与Hashtable和synchronizedMap有更多的优势,因此大多数情况应该使用并发容器类,至于当需要对整个容器加锁进行独占访问时,才应该放弃使用并发容器。
注意,此时不能再通过客户端加锁新建新的原子操作了,客户端只能对并发容器自身加锁,但并发容器内部使用的并不是自身锁。
CopyOnWriteArrayList
写入时复制容器,在每次修改时都会加锁并创建和重新发布一个新的容器副本,直接修改容器引用,从而实现可见性。 写操作在一个复制的数组上进行,读操作还是在原始数组中进行,读写分离,互不影响。写操作需要加锁,防止并发写入时导致写入数据丢失。写操作结束之后需要把原始数组指向新的复制数组。
CopyOnWriteArrayList 在写操作的同时允许读操作,大大提高了读操作的性能,因此很适合读多写少的应用场景。 但是 CopyOnWriteArrayList 有其缺陷:
- 内存占用:在写操作时需要复制一个新的数组,使得内存占用为原来的两倍左右;
- 数据不一致:读操作不能读取实时性的数据,因为部分写操作的数据还未同步到读数组中。
阻塞队列
阻塞队列支持生产者-消费者模式。简化了开发过程,消除了生产者和消费者之间的代码依赖性。阻塞队列简化了生产者-消费者设计的实现过程。一种常见的生产者-消费者 设计模式 就是线程池与工作队列的组合。
阻塞队列提供了四种处理方法:
- 抛出异常,使用add(e)插入,remove()删除,element()查询。当阻塞队列满时,插入元素;当队列空,删除元素都会抛出异常。
- 返回特殊值,使用offer(e)插入,poll()删除,peek()查询。插入时,如果成功返回true,移除时,如果没有对应的元素返回null。
- 阻塞,使用put(e)插入,take()删除。队列满,插入元素时会阻塞;队列空,取元素会阻塞。
- 超时退出:使用offer(e,time,unit)插入,poll(time,unit)删除。当队列满时,会阻塞,超过一定的时间,线程会退出。
阻塞队列有多种实现。
- ArrayBlokcingQueue和LinkedBlockingQueue分别是数组和链表结构组成的有界的FIFO阻塞队列。
- PriorityBlockingQueue是一个支持优先级 排序 的无界阻塞队列。
- SynchronousQueue是一个不存储元素的阻塞队列,它不会为队列中元素维护存储空间。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
双端队列与工作密取
Java 6提供了Dqueue和BlockingDeque,是双端队列,实现了在队列头和队列尾的高效插入和移除。双端队列适用于工作密取模式。在工作密取中,每个消费者都有各自的双端队列。如果一个消费者完成了自己的双端队列的全部工作,可以从其他消费者双端队列末尾秘密的获取工作。因为工作者线程不会再单个共享的任务队列上发生竞争。适用于既是生产者又是消费者问题。
阻塞方法与中断方法
线程会阻塞或暂停执行。被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行。当在代码中调用一个可以抛出InterruptedException的方法时,自己的方法就编程了阻塞方法,必须处理中断的响应。如果这个方法被中断,那么它将努力提前结束状态。
处理中断的响应有两种基本选择:
- 传递InterruptedException,把该异常抛出给方法的调用者。
- 恢复中断,捕获异常,并调用当前线程的interrupt方法恢复中断,引发更高层的代码中断。
public void run(){ try{ something(); }catch(InterruptedException e){ Thread.currentThread().interrupt(); } } 复制代码
同步 工具 类
同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。包括阻塞队列,信号量,栅栏以及闭锁。
闭锁
闭锁用来确保某些活动直到其他活动都完成了才继续执行。如果有多个线程,其中一个线程需要等到其他 所有 线程活动结束后才继续执行,使用闭锁。
CountDownLatch是一种闭锁的实现,可以使得一个或者多个线程等待一组事情发生。包括一个计数器,表示需要等待的事件数量;countDown方法用来递减计数器,表示有一个事件已经发生了;await方法等待计数器为0,表示所有需要等待的事情已经发生。
// 初始化闭锁,并设置资源个数 CountDownLatch latch = new CountDownLatch(2); Thread t1 = new Thread( new Runnable(){ public void run(){ // 加载资源1 加载资源的代码…… // 本资源加载完后,闭锁-1 latch.countDown(); } } ).start(); Thread t2 = new Thread( new Runnable(){ public void run(){ // 加载资源2 资源加载代码…… // 本资源加载完后,闭锁-1 latch.countDown(); } } ).start(); Thread t3 = new Thread( new Runnable(){ public void run(){ // 本线程必须等待所有资源加载完后才能执行 latch.await(); // 当闭锁数量为0时,await返回,执行接下来的任务 任务代码…… } } ).start(); 复制代码
栅栏(同步屏障)
闭锁是一次性对象,一旦进入终止状态,就不能被重置。栅栏类似于闭锁,能阻塞一组进程直到某个时间发生。栅栏与闭锁的区别在于,所有线程必须同时到达栅栏位置,才能继续执行。
若有多条线程,他们到达屏障时将会被阻塞,只有当所有线程都到达屏障时才能打开屏障,所有线程同时执行,若有这样的需求可以使用同步屏障。此外,当屏障打开的同时还能指定执行的任务。
闭锁只会阻塞一条线程,目的是为了让该条任务线程满足条件后执行; 而同步屏障会阻塞所有线程,目的是为了让所有线程同时执行(实际上并不会同时执行,而是尽量把线程启动的时间间隔降为最少)。
// 创建同步屏障对象,并制定需要等待的线程个数 和 打开屏障时需要执行的任务 CyclicBarrier barrier = new CyclicBarrier(3,new Runnable(){ public void run(){ //当所有线程准备完毕后触发此任务 } }); // 启动三条线程 for( int i=0; i<3; i++ ){ new Thread( new Runnable(){ public void run(){ // 等待,(每执行一次barrier.await,同步屏障数量-1,直到为0时,打开屏障) barrier.await(); // 任务 任务代码…… } } ).start(); } 复制代码
信号量
信号量用于控制同时访问某个特定资源的操作数量,或者执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
信号量可以用于实现资源池,也可以用于将容器变为有界阻塞容器。信号量管理着一组虚拟的许可,在执行操作时首先获取许可,并在使用以后释放许可。如果没有许可,将阻塞直到有许可或被中断,超时。
信号量的使用场景是,有m个资源,n个线程,且n>m,同一时刻只能允许m条线程访问资源。
// 创建信号量对象,并给予3个资源 Semaphore semaphore = new Semaphore(3); // 开启10条线程 for ( int i=0; i<10; i++ ) { new Thread( new Runnbale(){ public void run(){ // 获取资源,若此时资源被用光,则阻塞,直到有线程归还资源 semaphore.acquire(); // 任务代码 …… // 释放资源 semaphore.release(); } } ).start(); } 复制代码
FutureTask
可以用作闭锁,是一种可以生成结果的Runnable,可以处于以下三种状态:等待运行,正在运行和运行完成。当FutureTask进入完成状态后,它会停止在这个状态上。
FutureTask在Executor框架中表示异步任务,此外还可以用来表示一些时间较长的运算,这些计算可以在使用计算结构之前启动。
实战:构建缓存
首先,使用HashMap和同步机制来初始化缓存。
public interface Computable<A,V> { V compute(A arg) throws InterruptedException; } public class ExpensiveFunc implements Computable<String,BigInteger> { @Override public BigInteger compute(String arg) throws InterruptedException { return new BigInteger(arg); } } public class Memoizer1<A,V> implements Computable<A,V> { private final Map<A,V> cache=new HashMap<>(); private final Computable<A,V> c; public Memoizer1(Computable<A,V> c){ this.c=c; } @Override public synchronized V compute(A arg) throws InterruptedException { V result=cache.get(arg); if(result==null){ result=c.compute(arg); cache.put(arg,result); } return result; } } 复制代码
在这种实现方法中,使用HashMap保存之前计算的结果。首先检查需要的结果是否已经在缓存中,如果存在则返回之前计算,否则将计算结果缓存到HashMap再返回。
为了确保线程安全,将整个compute方法进行同步。但是这样伸缩性差,缓存的性能并没有得到提升。
下面使用ConcurrentHashMap替换HashMap。但是,这种方法存在一些不足,当两个线程同时调用compute时,可能会导致计算得到相同的值。这样是低效的,因为缓存的作用就是避免相同的数据被计算多次。其问题在于,如果某个线程启动了一个计算,而其他线程并不知道这个计算正在进行,很可能会重复这个计算。
针对如上问题,我们考虑可以使用FutureTask来解决。使用该类来表示计算的过程,如果有结果可用,则返回结果,否则一直阻塞。
public class Memo2 <A,V> implements Computable<A,V>{ private final Map<A,Future<V>> cache=new ConcurrentHashMap<>(); private final Computable<A,V>c; public Memo2(Computable<A,V>c){ this.c=c; } @Override public V compute(A arg) throws InterruptedException { Future<V> future=cache.get(arg); if(future==null){ Callable<V> eval=new Callable<V>() { @Override public V call() throws Exception { return c.compute(arg); } }; FutureTask<V> ft=new FutureTask<>(eval); future=cache.putIfAbsent(arg,ft); if(future==null){ future=ft; ft.run(); } } try{ return future.get(); }catch (ExecutionException e){ e.printStackTrace(); } return null; } } 复制代码
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
程序员代码面试指南:IT名企算法与数据结构题目最优解
左程云 / 电子工业出版社 / 2015-9 / 79.00元
这是一本程序员面试宝典!书中对IT名企代码面试各类题目的最优解进行了总结,并提供了相关代码实现。针对当前程序员面试缺乏权威题目汇总这一痛点,本书选取将近200道真实出现过的经典代码面试题,帮助广大程序员的面试准备做到万无一失。“刷”完本书后,你就是“题王”!__eol__本书采用题目+解答的方式组织内容,并把面试题类型相近或者解法相近的题目尽量放在一起,读者在学习本书时很容易看出面试题解法之间的联......一起来看看 《程序员代码面试指南:IT名企算法与数据结构题目最优解》 这本书的介绍吧!