OkHttp【四】任务调度Dispatcher

栏目: Java · 发布时间: 6年前

内容简介:OkHttp【四】任务调度Dispatcher

OkHttp【四】任务调度Dispatcher

前言

在发起HTTP请求后, OkHttpRealCall 封装的相关逻辑内执行了请求发起动作,而负责记录和调度 Call 的则是 Dispatcher 。 本文一起分析 OkHttpClient#Dispatcher 的相关实现。

/**
 * Policy on when async requests are executed.
 *
 * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
 * own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
 * of calls concurrently.
 */
 public final class Dispatcher {
     // ...
 }

任务调度

在构造 OkHttpClient 实例的时候,通过构造函数,创建了一个请求调度类 Dispatcher 。该类会在 RealCall 的异步请求接口 enqueue 和同步请求接口 execute 中被执行

client.dispatcher().executed(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));

先看下 Dispatcher 的内部主要接口和相关关系

OkHttp【四】任务调度Dispatcher

可以看到, Dispatcher 主要用于处理异步请求,同步请求只是简单加入了 Deque

并发限制

执行网络请求用的是 Java 并发包提供的API, 这里实例化一个 ThreadPoolExecutor 来处理多线程任务:

public synchronized ExecutorService executorService() {
    if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
}

从参数的选用上来看,上面的线程池声明也可以用并发包中 newCachedThreadPool 方法,配置上自身的 ThreadFactory

/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>(),
                                    threadFactory);
}

根据 ThreadPoolExecutor 的定义,我们知道这里实例化了一个没有限制的无限队列来承载请求任务,按需创建/复用线程。无限制队列的特点就是,理论上提交的任务不断积累时,最终将耗尽内存。因此相对来说我们其实更常用的是有限队列,通过舍弃一些任务或者拒绝新增任务来保证机器不会耗尽内存。

/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default rejected execution handler.
*
* @param corePoolSize the number of threads to keep in the pool, even
*        if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
*        pool
* @param keepAliveTime when the number of threads is greater than
*        the core, this is the maximum time that excess idle threads
*        will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
*        executed.  This queue will hold only the {@code Runnable}
*        tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
*        creates a new thread
* @throws IllegalArgumentException if one of the following holds:<br>
*         {@code corePoolSize < 0}<br>
*         {@code keepAliveTime < 0}<br>
*         {@code maximumPoolSize <= 0}<br>
*         {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
*         or {@code threadFactory} is null
*/
public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
        threadFactory, defaultHandler);
}

如果直接向原生上述ThreadPoolExecutor的提交10000个任务,并且每个任务休眠5秒钟来模拟耗时操作,那么很快就会发生OOM。

由于瞬间提交的任务数非常大,且每个任务都耗时,导致按需创建/复用线程的策略基本上无法复用已经实例化的线程。在为新任务实例化线程时需要大量内存,因此OOM就再说难免了。

那么 OkHttp 是这样处理的么?

在前面的UML中,看到 Dispatcher 有三个Deque来存放任务,通过控制 maxRequestsmaxRequestsPerHost 来限制最大并发数,和相主机同域名下的并发数。 默认的最大并发数是64,同域名并发数是5,支持个性化配置。

所以这里通过多个Deque来缓存尚未获得执行的任务,以及正在执行的任务,实现并发任务的调度。

双队列缓存监测

还记得在 RealCall 中执行任务时调用的一些接口么?

client.dispatcher().finished(this);

在每个任务结束后,通过接口通知 Dispatcher ,再次检查任务队列,如果未触发最大并发数,则将新任务从等待队列已入执行队列。

/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
}

/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
}

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    if (promoteCalls) promoteCalls();
    runningCallsCount = runningCallsCount();
    idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
    idleCallback.run();
    }
}

具体移入操作由 promoteCalls 完成, 遍历 readyAsyncCalls 队列,加入 runningAsyncCalls 并提交给 executorService :

private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
    AsyncCall call = i.next();

    if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
    }

    if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

上瘾

上瘾

[美] 尼尔·埃亚尔、[美] 瑞安·胡佛 / 钟莉婷、杨晓红 / 中信出版集团 / 2017-5 / 49.00元

——为什么我们会习惯性地点开某个App? ——这种使用习惯到底是如何养成的? ——为什么有些产品能让我们戒不掉,而其他的产品却不行? ——是否有什么秘诀能让用户对你的产品形成使用习惯,欲罢不能? 《上瘾》揭示了很多让用户形成使用习惯,甚至“上瘾”的互联网产品服务背后的基 本设计原理,告诉你怎样打造一款让用户欲罢不能的产品。作者根据自己多年的研究、咨询及实际经验,提出了新颖而......一起来看看 《上瘾》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换