OkHttp3.0解析——谈谈内部任务分发器dispatcher

栏目: Java · 发布时间: 6年前

内容简介:OkHttp之所以能够高效处理任务的一个很重要原因在于其内部维护了三个任务队列(readyAsyncCalls、runningAsyncCalls、runningSyncCalls)和一个线程池(ThreadPoolExecutor)。这四个东西由内部的任务分发器dispathcer来进行调控处理,从而达到高效处理多任务的效果。线程池的作用不言而喻,他的主要作用在于可以避免我们在使用线程进行耗时任务的时候每次都开启线程,用完之后又销毁线程所带来的效率与性能问题。他可以对线程进行多次的操作并复用空闲线程,从

OkHttp之所以能够高效处理任务的一个很重要原因在于其内部维护了三个任务队列(readyAsyncCalls、runningAsyncCalls、runningSyncCalls)和一个线程池(ThreadPoolExecutor)。这四个东西由内部的任务分发器dispathcer来进行调控处理,从而达到高效处理多任务的效果。

线程池的作用不言而喻,他的主要作用在于可以避免我们在使用线程进行耗时任务的时候每次都开启线程,用完之后又销毁线程所带来的效率与性能问题。他可以对线程进行多次的操作并复用空闲线程,从而达到不需要每次都开启以及销毁线程的目的。关于线程的知识,如果有不了解的可以去参考我写的这篇文章 Java 中的线程详解,里面对线程池的各种类型还有内部操作有详尽的介绍。

OkHttp的任务队列

okHttp中的任务队列由两部分组成:

  • 任务分发器dispatcher:负责帮助需要执行任务找到合适的任务队列
  • 线程池ThreadPoolExecutor,用于执行dispatcher分配的任务 来看下任务调度器dispatcher的源码:
public final class Dispatcher {
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private Runnable idleCallback;

  /** Executes calls. Created lazily. */
  private ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
  
  ...
}
 

复制代码

可以看到,dispatcher里面实例化了三个任务队列readyAsyncCalls、runningAsyncCalls与runningSyncCalls还有一个线程池ThreadPoolExecutor。

readyAsyncCalls:等待执行异步任务队列。当有任务需要dispatcher将其添加进入线程池时,会先判断线程池是否还有可以执行的线程,如果发现没有执行的线程,此时先将任务放入到这个任务队列中等待,等到线程池有空闲线程可以执行任务的时候再从这个任务队列中取出任务交给线程池去处理。

runningAsyncCalls:运行中异步任务队列。存储dispatcher将任务交给线程池去处理的任务。

runningSyncCalls:运行中的同步队列。同步队列和异步队列不同,他是一个串行的,而不是并行的,所以这个代表在同步操作的情况下运行的队列。

我们看到在executeService()方法中创建了一个线程池ThreadPoolExecutor,里面第一个参数核心线程数设置为了0,代表在空闲一段时间后线程将会被全部销毁。

可以看出,在Okhttp中,构建了一个阀值为[0, Integer.MAX_VALUE]的线程池,它不保留任何最小线程数,随时创建更多的线程数,当线程空闲时只能活60秒,它使用了一个不存储元素的阻塞工作队列,一个叫做"OkHttp Dispatcher"的线程工厂。

也就是说,在实际运行中,当收到10个并发请求时,线程池会创建十个线程,当工作完成后,线程池会在60s后相继关闭所有线程。

Dispatcher分发器

dispatcher分发器类似于Ngnix中的反向代理,通过Dispatcher将任务分发到合适的空闲线程,实现非阻塞,高可用,高并发连接。

OkHttp3.0解析——谈谈内部任务分发器dispatcher

同步请求

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }

复制代码

可以看到同步请求总共做了四件事

  1. 判断任务是否正在执行executed,如果正在执行则抛出异常。这代表同一个任务一次只能被执行一次,而并不能被执行多次。

  2. 将任务交给任务调用器,dispatcher调用executed去执行这个任务。

  3. 通过getResponseWithInterceptorChain()链调用拦截器,之后将任务执行的结果返回Response。

4.之后在任务执行完成调用dispatcher将其finish掉。

至此一个同步请求任务就算完成了。这里关于getResponseWithInterceptorChain()中执行的一些拦截器的操作,以后我会专门写一篇文章来讲解OkHttp的拦截器的原理。

异步操作

synchronized void enqueue(AsyncCall call) {
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      //添加正在运行的请求
    runningAsyncCalls.add(call);
       //线程池执行请求
    executorService().execute(call);
  } else {
      //添加到缓存队列排队等待
    readyAsyncCalls.add(call);
  }
}

复制代码

在异步操作中,会先去判断runningAsyncCalls队列中的任务数量是否会大于最大请求数量(maxRequest),这个的最大请求数量为64,然后在判断是否runningCallsForHost是否小于maxRequestsPerHost(单一host请求)。如果两个当中有一个不满足,则代表线程池中可执行的线程数不够,不能将任务添加到线程中去执行。此时则将任务直接添加到缓存队列排队等待(readyAsyncCalls),等到有可执行的线程的时候再将任务添加到正在运行的队列中,再调用线程池去执行call任务。

接下来看看execute里面的源码

@Override protected void execute() {
  boolean signalledCallback = false;
  try {
      //执行耗时IO任务
    Response response = getResponseWithInterceptorChain(forWebSocket);
    if (canceled) {
      signalledCallback = true;
      //回调,注意这里回调是在线程池中,而不是想当然的主线程回调
      responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
    } else {
      signalledCallback = true;
      //回调,同上
      responseCallback.onResponse(RealCall.this, response);
    }
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      logger.log(Level.INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      responseCallback.onFailure(RealCall.this, e);
    }
  } finally {
      //最关键的代码
    client.dispatcher().finished(this);
  }
}


复制代码

可以看到里面有调用了拦截器链getResponseWithInterceptorChain(),并将任务的结果又一次返回Response。里面会根据任务是否被Cancled而去回调不同的方法。被Canceled就去调用onFailure(0方法,在里面处理失败的逻辑,成功就去调用成功的方法Response(),并将返回值交给他去处理。最后无论成功还是失败都会去调用dispatcher的finish方法来结束掉这个任务。

我们在来深入看下finish的方法里面做了哪些操作:

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }
 
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }


复制代码
  • 空闲出多余线程,调用promoteCalls调用待执行的任务
  • 如果当前整个线程池都空闲下来,执行空闲通知回调线程(idleCallback)

接下来看看promoteCalls:

private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
 
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();
 
      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }
 
      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }


复制代码

promoteCalls的逻辑也很简单:扫描待执行任务队列,将任务放入正在执行任务队列,并执行该任务。

总结

以上就是整个任务队列的实现细节,总结起来有以下几个特点:

  1. OkHttp采用Dispatcher技术,类似于Nginx,与线程池配合实现了高并发、地阻塞的操作。
  2. OkHttp采用队列进行缓存,按照入列的特点先进先出来执行任务
  3. OkHttp最出彩的地方就是在try/finally中调用了finish函数,可以主动控制等待队列的移动,而不是采用锁或者wait/notify,极大的减少了编码的复杂性。

有兴趣可以关注我的小专栏,学习更多知识:小专栏

OkHttp3.0解析——谈谈内部任务分发器dispatcher

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

游戏编程权威指南

游戏编程权威指南

Mike McShaffry 麦克沙福瑞、David “Rez” Graham 格雷海姆 / 师蓉、李静、李青翠 / 人民邮电 / 2016-3 / 99.00元

全书分为4个部分共24章。首部分是游戏编程基础,主要介绍了游戏编程的定义、游戏架构等基础知识。 第二部分是让游戏跑起来,主要介绍了初始化和关闭代码、主循环、游戏主题和用户界面等。 第三部分是核心游戏技术,主要介绍了一些*为复杂的代码 示例,如3D编程、游戏音频、物理和AI编程等。 第四部分是综合应用,主要介绍了网络编程、多道程序设计和用C#创建工具等,并利用前面所讲的 知识开发出......一起来看看 《游戏编程权威指南》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

MD5 加密
MD5 加密

MD5 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器