OkHttp【四】任务调度Dispatcher

栏目: Java · 发布时间: 8年前

内容简介:OkHttp【四】任务调度Dispatcher

OkHttp【四】任务调度Dispatcher

前言

在发起HTTP请求后, OkHttpRealCall 封装的相关逻辑内执行了请求发起动作,而负责记录和调度 Call 的则是 Dispatcher 。 本文一起分析 OkHttpClient#Dispatcher 的相关实现。

/**
 * Policy on when async requests are executed.
 *
 * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
 * own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
 * of calls concurrently.
 */
 public final class Dispatcher {
     // ...
 }

任务调度

在构造 OkHttpClient 实例的时候,通过构造函数,创建了一个请求调度类 Dispatcher 。该类会在 RealCall 的异步请求接口 enqueue 和同步请求接口 execute 中被执行

client.dispatcher().executed(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));

先看下 Dispatcher 的内部主要接口和相关关系

OkHttp【四】任务调度Dispatcher

可以看到, Dispatcher 主要用于处理异步请求,同步请求只是简单加入了 Deque

并发限制

执行网络请求用的是 Java 并发包提供的API, 这里实例化一个 ThreadPoolExecutor 来处理多线程任务:

public synchronized ExecutorService executorService() {
    if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
}

从参数的选用上来看,上面的线程池声明也可以用并发包中 newCachedThreadPool 方法,配置上自身的 ThreadFactory

/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>(),
                                    threadFactory);
}

根据 ThreadPoolExecutor 的定义,我们知道这里实例化了一个没有限制的无限队列来承载请求任务,按需创建/复用线程。无限制队列的特点就是,理论上提交的任务不断积累时,最终将耗尽内存。因此相对来说我们其实更常用的是有限队列,通过舍弃一些任务或者拒绝新增任务来保证机器不会耗尽内存。

/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default rejected execution handler.
*
* @param corePoolSize the number of threads to keep in the pool, even
*        if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
*        pool
* @param keepAliveTime when the number of threads is greater than
*        the core, this is the maximum time that excess idle threads
*        will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
*        executed.  This queue will hold only the {@code Runnable}
*        tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
*        creates a new thread
* @throws IllegalArgumentException if one of the following holds:<br>
*         {@code corePoolSize < 0}<br>
*         {@code keepAliveTime < 0}<br>
*         {@code maximumPoolSize <= 0}<br>
*         {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
*         or {@code threadFactory} is null
*/
public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
        threadFactory, defaultHandler);
}

如果直接向原生上述ThreadPoolExecutor的提交10000个任务,并且每个任务休眠5秒钟来模拟耗时操作,那么很快就会发生OOM。

由于瞬间提交的任务数非常大,且每个任务都耗时,导致按需创建/复用线程的策略基本上无法复用已经实例化的线程。在为新任务实例化线程时需要大量内存,因此OOM就再说难免了。

那么 OkHttp 是这样处理的么?

在前面的UML中,看到 Dispatcher 有三个Deque来存放任务,通过控制 maxRequestsmaxRequestsPerHost 来限制最大并发数,和相主机同域名下的并发数。 默认的最大并发数是64,同域名并发数是5,支持个性化配置。

所以这里通过多个Deque来缓存尚未获得执行的任务,以及正在执行的任务,实现并发任务的调度。

双队列缓存监测

还记得在 RealCall 中执行任务时调用的一些接口么?

client.dispatcher().finished(this);

在每个任务结束后,通过接口通知 Dispatcher ,再次检查任务队列,如果未触发最大并发数,则将新任务从等待队列已入执行队列。

/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
}

/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
}

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    if (promoteCalls) promoteCalls();
    runningCallsCount = runningCallsCount();
    idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
    idleCallback.run();
    }
}

具体移入操作由 promoteCalls 完成, 遍历 readyAsyncCalls 队列,加入 runningAsyncCalls 并提交给 executorService :

private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
    AsyncCall call = i.next();

    if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
    }

    if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

复杂网络理论及其应用

复杂网络理论及其应用

汪小帆、李翔、陈关荣 / 清华大学出版社 / 2006 / 45.00元

国内首部复杂网络专著 【图书目录】 第1章 引论 1.1 引言 1.2 复杂网络研究简史 1.3 基本概念 1.4 本书内容简介 参考文献 第2章 网络拓扑基本模型及其性质 2.1 引言 2.2 规则网络 2.3 随机图 2.4 小世界网络模型 2.5 无标度网络模型 ......一起来看看 《复杂网络理论及其应用》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具