内容简介:java线程详解及高并发编程庖丁解牛
java线程详解及高并发编程庖丁解牛
线程概述:
祖宗:
说起 java 高并发编程,就不得不提起一位老先生Doug Lea,这位老先生可不得了,看看百度百科对他的评价,一点也不为过:
如果IT的历史,是以人为主体串接起来的话,那么肯定少不了Doug Lea。这个鼻梁挂着眼镜,留着德王威廉二世的胡子,脸上永远挂着谦逊腼腆笑容,服务于 纽约州立大学 Oswego分校计算机科学系的老大爷。
说他是这个世界上对Java影响力最大的个人,一点也不为过。因为两次Java历史上的大变革,他都间接或直接的扮演了举足轻重的角色。一次是由JDK 1.1到JDK 1.2,JDK1.2很重要的一项新创举就是Collections,其Collections的概念可以说承袭自Doug Lea于1995年发布的第一个被广泛应用的collections;一次是2004年所推出的Tiger。Tiger广纳了15项JSRs(Java Specification Requests)的语法及标准,其中一项便是JSR-166。JSR-166是来自于Doug编写的util.concurrent包。
值得一提的是: Doug Lea也是JCP ( Java 社区项目)中的一员。
Doug是一个无私的人,他深知分享知识和分享苹果是不一样的,苹果会越分越少,而自己的知识并不会因为给了别人就减少了,知识的分享更能激荡出不一样的火花。《Effective JAVA》这本Java经典之作的作者Joshua Bloch便在书中特别感谢Doug Lea是此书中许多构想的共鸣板,感谢Doug Lea大方分享丰富而又宝贵的知识。
我记住了两句话:他是这个世界上对Java影响力最大的个人和几乎所有的java高并发编程核心包都是他写的。
线程和进程:
进程:每个进程都有独立的代码和数据空间(进程上下文),进程间的切换会有较大的开销,一个进程包含1--n个线程。
线程:同一类线程共享代码和数据空间,每个线程有独立的运行栈和程序计数器(PC),线程切换开销小。
线程和进程一样分为五个阶段:创建、就绪、运行、阻塞、终止。
多进程是指操作系统能同时运行多个任务(程序)。
多线程是指在同一程序中有多个顺序流在执行。
多线程的优势:
进程之间不能共享内存,但线程可以。
系统创建进程需要为该进程重新分配系统资源,开销大,但线程则小得多,所以使用多线程实现并发比用多进程实现并发的性能要高得多。
线程的创建和启动:
继承Thread类创建线程:
public class FirstThread extends Thread{ private int i; @Override public void run() { for(;i<100;i++){ System.out.println(getName()+" "+i); } } public static void main(String[] args) { for(int i=0;i<100;i++){ System.out.println(Thread.currentThread().getName()+" "+i); if(i==20){ new FirstThread().start(); new FirstThread().start(); } } } }
如果希望调用子线程start()后子线程立马执行,则可以在当前运行的线程休眠1ms;
实现Runnable接口创建线程:
public class SecondRunnable implements Runnable{ private int i; @Override public void run() { for(;i<100;i++){ System.out.println(Thread.currentThread().getName()+" "+i); } } public static void main(String[] args) { for(int i=0;i<100;i++){ System.out.println(Thread.currentThread().getName()+" "+i); if(i==20){ SecondRunnable sr=new SecondRunnable(); new Thread(sr,"thread-1").start(); new Thread(sr,"thread-2").start(); } } } }
使用Callable和Future穿创建线程:
Java5开始,Java提供了Callable接口,该接口提供了一个call方法作为线程执行体,但是call方法比run方法更强大。call方法可以有返回值,可以抛异常。
Java5还提供了Future接口来代表Callable接口里的call方法的返回值,并为Future接口提供了一个FutureTask实现类,这个类还实现了Runnable接口,可以作为Thread类的target,在Future接口里定义了如下几个公共方法来控制它关联的Callable任务。
boolean cancle(boolean mayInterruptIfRunning):试图取消Future里关联的Callable任务。
V get()返回Callable任务里call方法的返回值,调用该方法会导致程序阻塞,必须等到子线程结束才会得到返回值,如图所示可以看出。
V get(long timeout,TimeUnit unit)返回Callable任务里call方法的返回值,该方法让程序最多阻塞timeout和unit指定的时间,如果超时依然没有返回值,则会抛出TimeOutExecption。
boolean isCancelled()如果在Callable任务正常完成之前被取消,返回true。
boolean isDone()如果Callable任务已完成,则返回true。
public class ThirdFutureTask { public static void main(String[] args) { FutureTask<Integer>task=new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { int i=0; for(;i<100;i++){ System.out.println(Thread.currentThread().getName()+" "+i); } return i; } }); for(int i=0;i<100;i++){ System.out.println(Thread.currentThread().getName()+" "+i); if(i==20){ new Thread(task,"return").start(); } } try { System.out.println("子线程的返回值"+task.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
创建线程的三种方式对比:
采用继承Thread的方式创建线程优缺点:
优势:编写简单,访问当前线程无需Thread.currentThread()
劣势:出于java单继承的原因,只能继承一个类
采用实现Runnable,Callable接口创建线程优缺点:
优势:可以继承其他类,多个线程可以共享同一个target,非常适合多个线程来处理同一份资源,体现了面向对象思想。
劣势:编写更复杂。要访问当前线程需要Thread.currentThread()
线程的生命周期:
新建和就绪状态:
当程序使用new关键字创建了一个线程后,该线程就处于新建状态,此时和其它java对象一样,仅仅由虚拟机为其分配内存,初始化成员变量。
当程序对象调用了start后,该线程就处于就绪状态,虚拟机会为其创建方法调用栈和程序计数器,处于这个状态的线程并没有开始运行,只是说可以运行了,至于何时运行,取决于jvm线程调度器里的调度。
运行和阻塞状态:
如果处于就绪状态的线程获取了cpu,开始执行run()的线程执行体,则该线程处于运行状态
如果发生以下情况,则进入阻塞状态:
线程调用sleep(),阻塞;
线程调用了一个阻塞式的io方法,在方法返回之前,阻塞;
线程试图获得一个同步监视器,当该同步监视器正在被其它线程所持有,阻塞;
线程在等待通知notify(),阻塞;
程序调用suspend()将线程挂起,阻塞;但是该方法容易导致死锁,尽量不用!
针对上面的几种情况,如发生下面特定的情况可以解除阻塞,重新进入就绪状态:
调用sleep方法的线程过了指定的时间
io方法返回
获得同步监视器
收到通知
被挂起后resume()
线程死亡:
run()或call()执行完毕,死亡
线程抛出一个未捕获的异常或error,死亡
调用stop()容易死锁,不建议使用。死亡
控制线程死亡可以用valotile的状态标记量。
想要知道线程的生死,调用isAlive();就绪,运行,阻塞返回true,其他返回false
控制线程:
join线程:
当在某个程序执行流中调用其它线程的join方法,调用线程将阻塞,直到被join的线程执行完毕为止
join有三种重载方式:
join()等待被join的线程执行完毕
join(long millis)等待被join的线程最多millis毫秒,否则不再等待。
join(long millis,int nanos)等待被join的线程最多millis毫秒+nanos毫微秒,否则不再等待。
public class TestJoin implements Runnable{ @Override public void run() { for(int i=0;i<100;i++){ System.out.println(Thread.currentThread().getName()+" "+i); } } public static void main(String[] args) throws InterruptedException { for(int i=0;i<100;i++){ System.out.println(Thread.currentThread().getName()+" "+i); if(i==20){ TestJoin tj=new TestJoin(); Thread thread=new Thread(tj); thread.start(); thread.join(1);//放在start之后,不然有问题 } } } }
后台/守护线程:
调用Thread对象的setMaemon(true)设置为后台线程。如果前台线程都死亡,则后台线程不管执没执行完,都会死亡。
public class TestDaemonThread implements Runnable{ @Override public void run() { for(int i=0;i<1000;i++){ System.out.println(Thread.currentThread().getName()+" "+i); } } public static void main(String[] args) { for(int i=0;i<100;i++){ System.out.println(Thread.currentThread().getName()+" "+i); if(i==20){ TestDaemonThread tdt=new TestDaemonThread(); Thread thread=new Thread(tdt); thread.setDaemon(true);//设置为后台线程,则前台线程都死亡,这个线程会自动死亡,必须放在start之前 thread.start(); } } } }
线程睡眠sleep:
让当前线程暂停一段时间,进入阻塞状态。当调用该方法暂停后,就绪状态的线程获得执行的机会。不理会优先级
sleep(long millis)
sleep(long millis,int nanos)
线程让步yield:
让当前线程暂停一段时间,进入就绪状态,让系统的线程调度器重新调度一次,当调用该方法暂停后,只有优先级大于等于当前线程的就绪状态的线程才会获得执行的机会。
并且该方法无需抛异常,不太建议用yield()。
改变线程优先级:
Thread类提供了setPriority(int priority)来设置优先级。参数范围是1-10,也可以用Thread的三个静态常量
->MAX_PRIORITY 10
->NORM_PRIORITY 5
->MIN_PRIORITY 1
线程同步:
线程安全问题:
经典例子就是取款问题,如果取款方法不是线程安全的,那么当两个线程同时进来取款时很有可能会发生明明余额不足,却把钱给取出来了的情况。
同步代码块:
synchronized(account){ if(account.getBalance>=drawAmmount){ 取钱成功 }else{ 取钱失败 } }
同步方法:
public synchronized void draw(double drawAmmout){ if(balance>=drawAmmount){ 取钱成功 }else{ 取钱失败 } }
释放同步监视器的锁定:
释放同步监视器的情况:
当前线程的同步方法,同步代码块执行完毕
当前线程的同步方法,同步代码块出现了未处理的error,exception,导致了改代码块,方法异常结束
当前线程的同步方法,同步代码块遇到了break,return终止了代码块,方法的执行
执行了wait()
不会释放同步监视器的情况:
sleep(),yield(),suspend()
同步锁Lock:
Java5开始,java提供了功能更加强大的线程同步机制,通过显示定义同步锁对象来实现同步,由Lock对象充当同步锁。
在实现线程安全的控制中,使用较多的是ReentrantLock(可重入锁)。
package com.lry.java扎实基础; import java.util.concurrent.locks.ReentrantLock; public class Account { static Account account=new Account("123456", 1000); private ReentrantLock lock=new ReentrantLock(); private String accountNo; private double money; public Account(String accountNo, double money) { super(); this.accountNo = accountNo; this.money = money; } /** * 取钱 * @param drawAmount 要取多少钱 */ public void drawMoney(double drawAmount){ lock.lock(); try{ if(money>=drawAmount){ System.out.println(Thread.currentThread().getName()+"取钱成功,吐出钞票:"+drawAmount); try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } money-=drawAmount; System.out.println("账户余额为"+money); }else{ System.out.println(Thread.currentThread().getName()+"取钱失败,钞票不足!"); } }finally{ lock.unlock(); } } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((accountNo == null) ? 0 : accountNo.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Account other = (Account) obj; if (accountNo == null) { if (other.accountNo != null) return false; } else if (!accountNo.equals(other.accountNo)) return false; return true; } static class DrawThread implements Runnable{ @Override public void run() { account.drawMoney(600); } } public static void main(String[] args) { DrawThread dt=new DrawThread(); new Thread(dt).start(); new Thread(dt).start(); new Thread(dt).start(); new Thread(dt).start(); } }
死锁:
死锁原因:
当两个线程相互等待对方释放同步监视器是发生死锁,虚拟机没有监测,没有采取措施处理死锁,一旦出现死锁,不会报异常,不会有提示,只会阻塞,因此自己要防止这种情况的发生。
死锁必要条件:
从以上分析可见,如果在计算机系统中同时具备下面四个必要条件时,那麽会发生死锁。换句话说,只要下面四个条件有一个不具备,系统就不会出现死锁。
〈1〉互斥条件。即某个资源在一段时间内只能由一个进程占有,不能同时被两个或两个以上的进程占有。这种独占资源如CD-ROM驱动器,打印机等等,必须在占有该资源的进程主动释放它之后,其它进程才能占有该资源。这是由资源本身的属性所决定的。如独木桥就是一种独占资源,两方的人不能同时过桥。
〈2〉不可抢占条件。进程所获得的资源在未使用完毕之前,资源申请者不能强行地从资源占有者手中夺取资源,而只能由该资源的占有者进程自行释放。如过独木桥的人不能强迫对方后退,也不能非法地将对方推下桥,必须是桥上的人自己过桥后空出桥面(即主动释放占有资源),对方的人才能过桥。
〈3〉占有且申请条件。进程至少已经占有一个资源,但又申请新的资源;由于该资源已被另外进程占有,此时该进程阻塞;但是,它在等待新资源之时,仍继续占用已占有的资源。还以过独木桥为例,甲乙两人在桥上相遇。甲走过一段桥面(即占有了一些资源),还需要走其余的桥面(申请新的资源),但那部分桥面被乙占有(乙走过一段桥面)。甲过不去,前进不能,又不后退;乙也处于同样的状况。
〈4〉循环等待条件。存在一个进程等待序列{P1,P2,...,Pn},其中P1等待P2所占有的某一资源,P2等待P3所占有的某一源,......,而Pn等待P1所占有的的某一资源,形成一个进程循环等待环。就像前面的过独木桥问题,甲等待乙占有的桥面,而乙又等待甲占有的桥面,从而彼此循环等待。
死锁预防:
死锁发生时的四个必要条件,只要破坏这四个必要条件中的任意一个条件,死锁就不会发生。这就为我们解决死锁问题提供了可能。一般地,解决死锁的方法分为死锁的预防,避免,检测与恢复三种(注意:死锁的检测与恢复是一个方法)。我们将在下面分别加以介绍。
死锁的预防是保证系统不进入死锁状态的一种策略。它的基本思想是要求进程申请资源时遵循某种协议,从而打破产生死锁的四个必要条件中的一个或几个,保证系统不会进入死锁状态。
1打破互斥条件。即允许进程同时访问某些资源。但是,有的资源是不允许被同时访问的,像打印机等等,这是由资源本身的属性所决定的。所以,这种办法并无实用价值。
2打破不可抢占条件。即允许进程强行从占有者那里夺取某些资源。就是说,当一个进程已占有了某些资源,它又申请新的资源,但不能立即被满足时,它必须释放所占有的全部资源,以后再重新申请。它所释放的资源可以分配给其它进程。这就相当于该进程占有的资源被隐蔽地强占了。这种预防死锁的方法实现起来困难,会降低系统性能。
3打破占有且申请条件。可以实行资源预先分配策略。即进程在运行前一次性地向系统申请它所需要的全部资源。如果某个进程所需的全部资源得不到满足,则不分配任何资源,此进程暂不运行。只有当系统能够满足当前进程的全部资源需求时,才一次性地将所申请的资源全部分配给该进程。由于运行的进程已占有了它所需的全部资源,所以不会发生占有资源又申请资源的现象,因此不会发生死锁。但是,这种策略也有如下缺点:
(1)在许多情况下,一个进程在执行之前不可能知道它所需要的全部资源。这是由于进程在执行时是动态的,不可预测的;
(2)资源利用率低。无论所分资源何时用到,一个进程只有在占有所需的全部资源后才能执行。即使有些资源最后才被该进程用到一次,但该进程在生存期间却一直占有它们,造成长期占着不用的状况。这显然是一种极大的资源浪费;
(3)降低了进程的并发性。因为资源有限,又加上存在浪费,能分配到所需全部资源的进程个数就必然少了。
4 打破循环等待条件 ,实行资源有序分配策略。采用这种策略,即把资源事先分类编号,按号分配,使进程在申请,占用资源时不会形成环路。所有进程对资源的请求必须严格按资源序号递增的顺序提出。进程占用了小号资源,才能申请大号资源,就不会产生环路,从而预防了死锁。这种策略与前面的策略相比,资源的利用率和系统吞吐量都有很大提高,但是也存在以下缺点:
(1)限制了进程对资源的请求,同时给系统中所有资源合理编号也是件困难事,并增加了系统开销;
(2)为了遵循按编号申请的次序,暂不使用的资源也需要提前申请,从而增加了进程对资源的占用时间。
死锁的避免:
上面我们讲到的死锁预防是排除死锁的静态策略,它使产生死锁的四个必要条件不能同时具备,从而对进程申请资源的活动加以限制,以保证死锁不会发生。下面我们介绍排除死锁的动态策略--死锁的避免,它不限制进程有关申请资源的命令,而是对进程所发出的每一个申请资源命令加以动态地检查,并根据检查结果决定是否进行资源分配。就是说,在资源分配过程中若预测有发生死锁的可能性,则加以避免。这种方法的关键是确定资源分配的安全性。
死锁的例子:
public class DeadLock implements Runnable { public int flag = 1; // 静态对象是类的所有对象共享的 private static Object o1 = new Object(), o2 = new Object(); @Override public void run() { System.out.println("flag=" + flag); if (flag == 1) { synchronized (o1) { try { Thread.sleep(500); } catch (Exception e) { e.printStackTrace(); } synchronized (o2) { System.out.println("1"); } } } if (flag == 0) { synchronized (o2) { try { Thread.sleep(500); } catch (Exception e) { e.printStackTrace(); } synchronized (o1) { System.out.println("0"); } } } } public static void main(String[] args) { DeadLock td1 = new DeadLock(); DeadLock td2 = new DeadLock(); td1.flag = 1; td2.flag = 0; // td1,td2都处于可执行状态,但JVM线程调度先执行哪个线程是不确定的。 // td2的run()可能在td1的run()之前运行 new Thread(td1).start(); new Thread(td2).start(); } }
一个简单的死锁类 当DeadLock类的对象flag==1时(td1),先锁定o1,睡眠500毫秒
* 而td1在睡眠的时候另一个flag==0的对象(td2)线程启动,先锁定o2,睡眠500毫秒
* td1睡眠结束后需要锁定o2才能继续执行,而此时o2已被td2锁定; td2睡眠结束后需要锁定o1才能继续执行,而此时o1已被td1锁定;
* td1、td2相互等待,都需要得到对方锁定的资源才能继续执行,从而死锁。
线程通信:
传统的线程通信:
首先介绍三个方法:
Object类下面的wait(),notify(),notifyAll()
这三个方法必须由同步监视器对象来调用,可以分为两种情况:
synchronized修饰的同步方法,该类的默认实例(this)就是同步监视器,所以可以在同步方法直接调用这三个方法
synchronized修饰的同步代码块,同步监视器synchronized后括号里的对象,所以必须使用该对象调用这三个方法
关于这三个方法的解释:
wait():导致当前线程等待,直到其它线程调用该同步监视器的notify或notifyAll()来唤醒该线程
notify():唤醒在此同步监视器等待的单个线程,如果多个线程在此同步监视器等待,则会任意选择一个唤醒
notify()唤醒在此同步监视器等待的所有线程。
银行取钱案例:
账户类:取钱方法,存钱方法
public class Account { private String accountNo; private double balance; private boolean flag=false;//已有存款标志 public Account(String accountNo, double balance) { super(); this.accountNo = accountNo; this.balance = balance; } public synchronized void draw(double drawAmount){ try{ if(!flag){//为假,所以没有人存钱进去,取钱阻塞 wait(); }else{//可以取钱 if(balance>=drawAmount){ System.out.println(Thread.currentThread().getName()+"取钱:"+drawAmount); balance-=drawAmount; System.out.println("取钱成功,账户余额:"+balance); flag=false; notifyAll();//唤醒存钱线程 }else{//余额不足 System.out.println("想取"+drawAmount+"账户余额不足:"+balance); flag=false; notifyAll();//唤醒存钱线程 } } }catch(InterruptedException ex){ ex.printStackTrace(); } } public synchronized void deposit(double depositAmount){ try{ if(flag){ wait();//没人取钱,则存钱阻塞 }else{ System.out.println(Thread.currentThread().getName()+"存钱:"+depositAmount); balance+=depositAmount; System.out.println("存钱成功,账户余额:"+balance); flag=true; notifyAll();//唤醒取钱线程 } }catch(InterruptedException ex){ ex.printStackTrace(); } } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((accountNo == null) ? 0 : accountNo.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Account other = (Account) obj; if (accountNo == null) { if (other.accountNo != null) return false; } else if (!accountNo.equals(other.accountNo)) return false; return true; } }
取钱线程(消费者)
public class DrawThread extends Thread{ private Account account; private double drawAmount;//取钱数 public DrawThread(String name,Account account,double drawAmount){ super(name); this.account=account; this.drawAmount=drawAmount; } public void run() { for(int i=0;i<10;i++){ account.draw(drawAmount); } } }
存钱线程(生产者)
public class DepositThread extends Thread{ private Account account; private double depositAmount;//取钱数 public DepositThread(String name,Account account,double depositAmount){ super(name); this.account=account; this.depositAmount=depositAmount; } public void run() { for(int i=0;i<10;i++){ account.deposit(depositAmount); } } }
测试类:
public class AccountTest { public static void main(String[] args) { Account account=new Account("123456", 0); new DrawThread("取款者lry", account, 1200).start(); new DepositThread("存款者甲", account, 1000).start(); new DepositThread("存款者乙", account, 1000).start(); new DepositThread("存款者丙", account, 1000).start(); } }
测试结果:
测试结果分析:
不难发现取钱线程和存钱线程交替执行,只有当取钱者取钱后,存款者才可以存款,同理,只有当存款者存款后,取钱者才可以取钱,程序最终显示被阻塞无法继续运行,这是因为有三个存款线程,但是取钱只有一个线程。这并不是死锁,这种情况只是取钱线程已经执行完毕,而存款线程并没有,她在等待其它线程来取钱而已,并不是等待其它线程释放同步监视器。
使用Lock+Condition控制线程通信:
如果程序中不使用synchronized来保证同步,而是直接使用Lock来保证同步,则系统中不存在隐式的同步监视器,也就不能用wait,notify,notifyAll来进行线程通信了。
当使用lock时,java提供了一个Conditin类保持协调,使用lock对象可以让那些已经得到lock对象却无法继续执行的线程释放lock对象,conditin对象也可以唤醒其他处于等待的线程。condition提供了三个方法用法分别对应Object类的wait,notify,notifyAll,分别是await,signal,signalAll。用法相似,不再赘述。
还是引用上个取款案例:
只是修改了Account账户类:
public class Account1 { private String accountNo; private double balance; private boolean flag=false;//已有存款标志 private final Lock lock=new ReentrantLock(); private final Condition cond=lock.newCondition(); public Account1(String accountNo, double balance) { super(); this.accountNo = accountNo; this.balance = balance; } public void draw(double drawAmount){ lock.lock(); try{ if(!flag){//为假,所以没有人存钱进去,取钱阻塞 cond.await(); }else{//可以取钱 if(balance>=drawAmount){ System.out.println(Thread.currentThread().getName()+"取钱:"+drawAmount); balance-=drawAmount; System.out.println("取钱成功,账户余额:"+balance); flag=false; cond.signalAll();//唤醒存钱线程 }else{//余额不足 System.out.println("想取"+drawAmount+"账户余额不足:"+balance); flag=false; cond.signalAll();//唤醒存钱线程 } } }catch(InterruptedException ex){ ex.printStackTrace(); }finally { lock.unlock(); } } public void deposit(double depositAmount){ lock.lock(); try{ if(flag){ cond.await();//没人取钱,则存钱阻塞 }else{ System.out.println(Thread.currentThread().getName()+"存钱:"+depositAmount); balance+=depositAmount; System.out.println("存钱成功,账户余额:"+balance); flag=true; cond.signalAll();//唤醒取钱线程 } }catch(InterruptedException ex){ ex.printStackTrace(); }finally { lock.unlock(); } } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((accountNo == null) ? 0 : accountNo.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Account1 other = (Account1) obj; if (accountNo == null) { if (other.accountNo != null) return false; } else if (!accountNo.equals(other.accountNo)) return false; return true; } }
测试结果是一样的。
使用阻塞队列(BlockingQueue)控制线程通信:
java5提供了一个BlockingQueue接口,虽然这个接口是queue的子接口,但它的主要用途不是作为容器,而是作为线程同步的工具。BolckingQueue具有一个特征,当生产者试图向BolckingQueue里put元素,如果队列已满,则该线程会被阻塞,直到消费者消费了一个。当消费者试图从blockingQueue里take元素时,如果队列为空,则会阻塞,直到生产者生产了一个。
在队列尾部插入元素,包括add,offer,put,当队列已满时,这三个方法分别抛出异常,返回false,阻塞
在队列头部删除并返回删除的元素,包括remove,poll,take,当队列已空时,这三个方法分别会抛出异常,返回false,阻塞
在队列头部取出元素,不删除。包括element,peek,当队列已空时,分别抛出异常,返回false
经典生产者-消费者案例:
生产者:
public class Producer extends Thread{ private BlockingQueue<String> bq; public Producer(BlockingQueue<String> bq){ this.bq=bq; } @Override public void run() { String[] strArr=new String[]{"java","structs","spring"}; for(int i=0;i<1000;i++){ System.out.println(getName()+"生产者准备生产集合元素"); try { Thread.sleep(200); //尝试put元素,如果队列已满,则阻塞 bq.put(strArr[i%3]); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(getName()+"生产者生产完成"+bq); } } }
消费者:
public class Consumer extends Thread{ private BlockingQueue<String> bq; public Consumer(BlockingQueue<String> bq){ this.bq=bq; } @Override public void run() { for(int i=0;i<1000;i++){ System.out.println(getName()+"消费者准备消费集合元素"); try { Thread.sleep(200); //尝试put元素,如果队列已满,则阻塞 bq.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(getName()+"消费者消费完成"+bq); } } }
测试类:
public class BlockingQueueTest { public static void main(String[] args) { BlockingQueue<String>bq=new ArrayBlockingQueue<String>(); new Producer(bq).start(); new Producer(bq).start(); new Producer(bq).start(); new Consumer(bq).start(); } }
测试结果:
测试结果分析:
可以看出,3个生产者线程都想向队列中put元素,但只要其中任意线程put元素后,其它生产者必须等待(因为阻塞),等待消费者消费完。
线程组和未处理的异常:
线程组:
java使用ThreadGroup来表示线程组,他可以对一批线程进行分类管理,java允许程序直接对线程组进行控制,对线程组的控制相当于同时控制这批线程。
如果程序没有显示指定线程属于哪个线程组,则该线程属于默认线程组。在默认情况下,子线程和创建它的父线程处于同一个线程组内。
一旦但线程加入了指定线程组后,该线程一直属于该线程组,直到死亡,线程运行中途不能改变他所属的线程组。
Thread类提供了如下几个构造器来设置新创建的线程属于哪个线程组。
Thread(ThreadGroup group,Runnable tartget):
Thread(ThreadGroup group,Rannable target,String name);
Thread(ThreadGrop group,String name)
Thread没有提供setThreadGroup(),但是提供getThreadGroup返回ThreadGroup对象。
ThreadGroup提供了如下两个简单的构造器来创建实例。
ThreadGroup(String name)以指定的线程组名字来创建新的线程组。
ThreadGroup(ThreadGroup parent,String name)以指定的名字,指定的父线程组创建一个线程组
ThreadGrop还提供了几个常用的方法来操作整个线程组的所有线程。
int activeCount()返回此线程组中活动线程的数目
interrupt()中断此线程组的所有线程
isDaemon()判断该线程组是否是后台线程组
setDaemon(boolean daemon)把该线程组设为后台线程组
setMaxPriority(int pri)设置线程组的最高优先级
案例:
public class TestThreadGroup extends Thread{ public TestThreadGroup(String name){ super(name); } public TestThreadGroup(ThreadGroup group,String name){ super(group,name); } @Override public void run() { for(int i=0;i<20;i++){ System.out.println(getName()+"-线程的i变量"+i); } } public static void main(String[] args) { ThreadGroup mainGroup=Thread.currentThread().getThreadGroup(); System.out.println("主线程组的名字:"+mainGroup.getName()); new TestThreadGroup("主线程组的线程").start(); ThreadGroup tg=new ThreadGroup("新线程组"); tg.setDaemon(true); System.out.println("新线程组是否是后台线程组"+tg.isDaemon()); new TestThreadGroup(tg,"新线程组").start(); } }
结果:
未处理的异常:
Thread里还定义一个很有用的方法:void uncaughtExecptin(Thread t,Throwable e),该方法可以处理该线程组内任意线程所抛出未处理异常。
从java5开始,java加强了线程的异常处理,如果线程执行过程中抛出了一个未处理异常,jvm在结束该线程之前会自动查找是否有对应的Thread.UncaughtExecptionHandler对象,如果找到该处理器的对象,则会调用该对象的uncaughtExecption(Thread t,Throwable e)方法来处理该异常。
如果该线程组有父线程组,则调用父线程组的uncaughtException来处理异常
如果该线程实例所属的线程类有默认的异常处理器(由setDefaultUncaughtExecptionHandler()设置),那么就调用该异常处理器来处理该异常
如果该异常对象是ThreadDeath的对象,则不做任何处理,否则将异常跟踪栈的信息打印到system.err错误输出流,并结束该线程。
案例:
public class TestThreadExHandler implements UncaughtExceptionHandler{ @Override public void uncaughtException(Thread t, Throwable e) { System.out.println(t+"线程出现异常:"+e); } public static void main(String[] args) { Thread.currentThread().setUncaughtExceptionHandler(new TestThreadExHandler()); int i=1/0; System.out.println("程序正常结束"); } }
结果:
分析:
不难看出异常处理器捕获到异常后,程序仍然不会正常退出。
这说明异常处理器与通过catch捕获异常是不同的,当使用catch捕获异常时,异常不会向上传递给上一级调用者,但是异常处理器会。
线程池:
系统启动一个新线程的成本是比较高的,因为他涉及与操作系统交互,在这种情况下,就诞生了线程池。
Java8改进的线程池:
java5之前开发者需要自己实现自己的线程池,java5后,新增了一个Executors工厂类来产生线程池,该工厂类包含如下几个静态方法来创建线程池。
newCacheThreadPool()创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会被缓存在线程池中。
newFixedThreadPool(int nThreads)创建一个可重用的,具有固定线程数的线程池
newSingleThreadExecutor()创建一个只有单线程的线程池,它相当于调用newFixedThreadPool(1)
newScheduledThreadPool(int corePoolSize)创建具有指定线程数的线程池
newSingleThreadScheduledExecutor()创建一个只有一个线程的线程池
ExecutorService newWorkStealingPool(int parallelism)创建持有足够的线程的线程池来支持给定的并行级别,该方法会使用多个队列来减少竞争。(后台线程池)
ExecutorService newWorkStealingPool()该方法是前一个方法的简化版,如果当前机器有4个cpu,则目标并行级别被设置为4。(后台线程池)
案例1实现自己的线程池:
public class ThreadPool { LinkedBlockingQueue<Runnable>workQueue=new LinkedBlockingQueue<>(100); public static void main(String[] args) { new ThreadPool().threadPool(); } public void threadPool(){ ThreadFactory threadFactory=new ThreadFactory() { AtomicInteger atomic=new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread thread=new Thread(r); thread.setName("MyThread"+atomic.getAndIncrement()); return thread; } }; /**corePoolSize核心池最大数量 * maximumPoolSize最大线程池上限个数 * keepAliveTime任务执行完,销毁线程的延时 * unit时间单位 TimeUnit.SECONDS; * workQueue 用于储存任务的工作队列 * threadFactory */ ThreadPoolExecutor pool=new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, workQueue, threadFactory); for(int i=0;i<100;i++){ pool.execute(new Runnable() { public void run() { method(); } }); } } private void method(){ System.out.println("ThreadName:"+Thread.currentThread().getName()+"进来了"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("ThreadName:"+Thread.currentThread().getName()+"出去了"); } }
结果:
案例二用java自带的线程池:
public class TestJavaThreadPool { public static void main(String[] args) { ExecutorService pool=Executors.newFixedThreadPool(6); Runnable target=new Runnable() { @Override public void run() { for(int i=0;i<100;i++){ System.out.println(Thread.currentThread().getName()+"的i的值:"+i); } } }; pool.submit(target); pool.submit(target); pool.shutdown(); } }
Java8增强的ForkJoinPool:
java7提供了ForkJoinPool来支持将一个任务拆分成多个小任务并行计算,ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。它提供了两种常用的构造器:
ForkJoinPool(int parallelism)创建一个包含parallelism个并行线程的ForkJoinPool.
ForkJoinPool()以Runtime.availableProcessors()方法的返回值作为parallelism参数来创建ForkJoinPool.(取决于机器的cpu核数)
java8进一步扩展了ForkJoinPool的功能,增加了通用池功能,ForkJoinPool类通过如下两个静态方法提供通用池功能:
ForkJoinPool commonPool()该方法返回一个通用池,通用池的运行状态不会受shutdown()或showdownNow()的影响。
int getCommonPoolParalelism()该方法返回通用池的并行级别
创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit或invoke方法来执行指定任务了。ForkJoinPool是一个抽象类,他还有两个抽象子类:
RecursiveAction和RecursiveTask,其中RecursiveAction代表无返回值的任务,RecursiveTask代表有返回值的任务。
无返回值的案例(打印0-300的数字,将一个大任务分解成多个小任务并行运行):
public class 无返回值的ForkJoinPool extends RecursiveAction{ private static final int smallTask=50; private int start; private int end; public 无返回值的ForkJoinPool(int start,int end){ this.start=start; this.end=end; } @Override protected void compute() { if(end-start<smallTask){//任务足够小,可以运行 for(int i=start;i<end;i++){ System.out.println(Thread.currentThread().getName()+"的i的值"+i); } }else{ //当任务不够小的时候,分解任务 int middle=(start+end)/2; 无返回值的ForkJoinPool left=new 无返回值的ForkJoinPool(start, middle); 无返回值的ForkJoinPool right=new 无返回值的ForkJoinPool(middle, end); left.fork(); right.fork(); } } public static void main(String[] args) throws InterruptedException { ForkJoinPool pool=new ForkJoinPool();//cpu核数 pool.submit(new 无返回值的ForkJoinPool(0, 300));//提交要分解的任务 pool.awaitTermination(2, TimeUnit.SECONDS); pool.shutdown(); } }
结果:
结果分析:可以看出启动了四个线程,这是因为我的cpu是四核的
有返回值的案例对一个长度为100的数组进行求和:
public class 有返回值的ForkJoinPool extends RecursiveTask<Integer>{ private static final int smallTask=20; private int arr[]; private int start; private int end; public 有返回值的ForkJoinPool(int[] arr, int start, int end) { super(); this.arr = arr; this.start = start; this.end = end; } @Override protected Integer compute() { int sum=0; if(end-start<smallTask){ for(int i=start;i<end;i++){ sum+=arr[i]; } return sum; } else{ int middle =(start+end)/2; 有返回值的ForkJoinPool left=new 有返回值的ForkJoinPool(arr, start, middle); 有返回值的ForkJoinPool right=new 有返回值的ForkJoinPool(arr, middle, end); left.fork(); right.fork(); return left.join()+right.join(); } } public static void main(String[] args) throws InterruptedException, ExecutionException { int []arr=new int[100]; Random random=new Random(); int total=0; for(int i=0,len=arr.length;i<len;i++){ int temp=random.nextInt(20); total+=(arr[i]=temp); } System.out.println("普通计算总量:"+total); ForkJoinPool pool=ForkJoinPool.commonPool(); Future<Integer>future=pool.submit(new 有返回值的ForkJoinPool(arr, 0, arr.length)); System.out.println("任务分解返回值"+future.get()); pool.shutdown(); } }
结果:
普通计算总量:887 任务分解返回值887
结果分析:
可以看出两种计算结果一样。
线程相关类:
ThreadLocal类:
ThreadLocal,是Thread Local Variable(线程局部变量)的意思,他的功用是为每一个使用该变量的线程都提供一个变量值的副本,是每一个线程可以独立的改变自己的个副本,而不会和其他线程的副本冲突
ThreadLocal用法很简单:
T get()返回线程副本的值
void remove()删除此线程局部变量中当前线程的值
void set(T value)修改此线程局部变量中当前线程副本的值
包装线程不安全的集合:
ArrayList,LinkedList,HashSet,TreeSet,HashMap,TreeMap等
如果程序中有多个线程可能访问这些集合,则可以使用Collections提供的类方法把这些集合包装成线程安全的集合
<T>Collection<T>synchronizedCollection(Collection<T>c)返回指定collection对应的线程安全的collection
线程安全的集合类:
以Concurrent开头的集合类(Doug Lea)
ConcurrenthashMap, ConcurrentSkipListMap, ConcurrentSkipListSet,ConcurrentLinkedQueue, ConcurrentLinkedDeque
以CopyOnWrite开头的集合类
CopyOnWriteArrayLsit CoprOnWriteArraySet
其中以Concurrent开头的代表支持并发访问的集合,他们支持多个线程并发写入访问,这些操作是线程安全的。有较好的性能。不允许存null
CopyOnWrite开头的集合类会在底层复制一份新的数组,接下来对新的数组执行写入操作,是线程安全的。需要频繁的复制数组,性能较差。读操作都很安全
总结:
写了大概有6到7个小时,真的累了,但是如果你能受益,我就是值得的,喜欢就赞我吧!
以上所述就是小编给大家介绍的《java线程详解及高并发编程庖丁解牛》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Redis为什么是单线程、及高并发快的大原因详解
- Prometheus 联邦及高可用详解
- 秒杀抢购思路以及高并发下数据安全
- Kubernetes Ingress Controller的使用介绍及高可用落地
- java中线程安全,线程死锁,线程通信快速入门
- ObjC 多线程简析(一)-多线程简述和线程锁的基本应用
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Programming Amazon Web Services
James Murty / O'Reilly Media / 2008-3-25 / USD 49.99
Building on the success of its storefront and fulfillment services, Amazon now allows businesses to "rent" computing power, data storage and bandwidth on its vast network platform. This book demonstra......一起来看看 《Programming Amazon Web Services》 这本书的介绍吧!