ThreadPoolExecutor 线程池源码分析-基于jdk8

栏目: Java · 发布时间: 7年前

内容简介:测试demo, ThreadPoolExecutorTest:ThreadPoolExecutor 抽出来的一些核心方法:

测试demo, ThreadPoolExecutorTest:

public class ThreadPoolExecutorTest {
	public static void main(String[] args) throws InterruptedException {
		final boolean isFair = false;
		ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(10, isFair);
//		arrayBlockingQueue.add(new MyThreadTask(10086));
		
		final int corePoolSize = 3;
		final int maximumPoolSize = 6;
		ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
				corePoolSize, maximumPoolSize,
		        1, TimeUnit.SECONDS,
		        arrayBlockingQueue,
		        new ThreadPoolExecutor.CallerRunsPolicy());
		
//		threadPool.allowCoreThreadTimeOut(true);
//		Integer result = 9;
		for (int index = 1; index <= 10; index++) {
			Thread tempNewThread = new MyThreadTask(index);
		    threadPool.execute(tempNewThread);
//			result = threadPool.submit(new MyThreadTask(i), result);
		}
	    
//	    threadPool.shutdown();
	}
}

ThreadPoolExecutor 抽出来的一些核心方法:

public class ThreadPoolExecutor extends AbstractExecutorService {

	private final BlockingQueue<Runnable> workQueue;
	private final ReentrantLock mainLock = new ReentrantLock();
	private final HashSet<Worker> workers = new HashSet<Worker>();
	
	/***
	 * 线程中真正执行的Worker线程
	 */
	private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
	/** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        
		Worker(Runnable firstTask) {
			setState(-1); // inhibit interrupts until runWorker
			this.firstTask = firstTask;
			this.thread = getThreadFactory().newThread(this);
		}

		/***
		 * 代理, 执行上层的runWorker方法;
		 */
		public void run() {
			runWorker(this);
		}
	}

	/***
	 * 把firstTask 添加到核心线程, 并启动;
	 * @param firstTask
	 * @param core 是否是核心线程
	 * @return
	 */
	private boolean addWorker(Runnable firstTask, boolean core) {
		boolean workerStarted = false;
		boolean workerAdded = false;
		Worker w = null;
		try {
			w = new Worker(firstTask);
			final Thread t = w.thread;
			if (t != null) {
				int rs = runStateOf(ctl.get());
				if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
					workers.add(w);
					int s = workers.size();
					if (s > largestPoolSize)
						largestPoolSize = s;
					workerAdded = true;
				}
				if (workerAdded) {
					t.start();
					workerStarted = true;
				}
			}
		} finally {
			if (!workerStarted)
				addWorkerFailed(w);
		}
		return workerStarted;
	}

	/***
	 * 从 workQueue 的线程等待队列中获取线程(后面准备执行);
	 * @return
	 */
	private Runnable getTask() {
		boolean timedOut = false; // Did the last poll() time out?
		for (;;) {
			try {
				Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
				if (r != null)
					return r;
				timedOut = true;
			} catch (InterruptedException retry) {
				timedOut = false;
			}
		}
	}

	/***
	 * 运行Worker线程; 
	 * 
	 * while (task != null || (task = getTask()) != null) 
	 * 第一次判断是当前的核心线程;
	 * 第二个判断是核心线程第一次执行完毕, 则从workQueue中获取线程继续执行;
	 * 
	 * task.run();
	 * 直接调用的run方法(外层已经有worker的线程包装起的)
	 */
	final void runWorker(Worker w) {
		Thread wt = Thread.currentThread();
		Runnable task = w.firstTask;
		w.firstTask = null;
		w.unlock(); // allow interrupts
		boolean completedAbruptly = true;
		try {
			while (task != null || (task = getTask()) != null) {
				w.lock();
				// If pool is stopping, ensure thread is interrupted;
				// if not, ensure thread is not interrupted. This
				// requires a recheck in second case to deal with
				// shutdownNow race while clearing interrupt
				if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
						&& !wt.isInterrupted())
					wt.interrupt();
				try {
					beforeExecute(wt, task);
					Throwable thrown = null;
					try {
						task.run();
					} catch (RuntimeException x) {
						thrown = x;
						throw x;
					} catch (Error x) {
						thrown = x;
						throw x;
					} catch (Throwable x) {
						thrown = x;
						throw new Error(x);
					} finally {
						afterExecute(task, thrown);
					}
				} finally {
					task = null;
					w.completedTasks++;
					w.unlock();
				}
			}
			completedAbruptly = false;
		} finally {
			processWorkerExit(w, completedAbruptly);
		}
	}

	/***
	 * 执行, 线程池执行线程的总入口
	 * @param command
	 */
	public void execute(Runnable command) {
		int c = ctl.get();
		// 核心线程执行
		if (workerCountOf(c) < corePoolSize) {
			if (addWorker(command, true))
				return;
			c = ctl.get();
		}
		// 核心已经多了, 添加到 workQueue
		if (isRunning(c) && workQueue.offer(command)) {
			int recheck = ctl.get();
			if (!isRunning(recheck) && remove(command))
				reject(command);
			else if (workerCountOf(recheck) == 0)
				addWorker(null, false);
		} else if (!addWorker(command, false))
			reject(command);
	}

	/**
	 * 其中一个拒绝策略
	 */
	public static class CallerRunsPolicy implements RejectedExecutionHandler {
		public CallerRunsPolicy() {
		}
		public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
			if (!e.isShutdown()) {
				r.run();
			}
		}
	}
}

以上所述就是小编给大家介绍的《ThreadPoolExecutor 线程池源码分析-基于jdk8》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Trading and Exchanges

Trading and Exchanges

Larry Harris / Oxford University Press, USA / 2002-10-24 / USD 95.00

This book is about trading, the people who trade securities and contracts, the marketplaces where they trade, and the rules that govern it. Readers will learn about investors, brokers, dealers, arbit......一起来看看 《Trading and Exchanges》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具