内容简介:半年前阅读了Volley源码,但是现在主流网络请求都是使用OkHttp + Retrofit + RxJava,因此打算好好研究下OkHttp的源码(基于OkHttp3.14.1),记录一下这里只例举基本的Get请求,详细的请看官方文档由上可知,发送一个基本的get请求需要如下几步
半年前阅读了Volley源码,但是现在主流网络请求都是使用OkHttp + Retrofit + RxJava,因此打算好好研究下OkHttp的源码(基于OkHttp3.14.1),记录一下
val client = OkHttpClient() fun syncGet(url: String) { val request = Request.Builder() .url(url) .build() val call = client.newCall(request) val response = call.execute() if (response.isSuccessful) { Log.d(TAG, "请求成功: " + response.body()!!.string()) } else { Log.d(TAG, "请求失败: " + response.message()) } } fun asyncGet(url: String) { val request = Request.Builder() .url(url) .build() val call = client.newCall(request) call.enqueue(object : Callback { override fun onFailure(call: Call, e: IOException) { Log.d(TAG, "错误: " + e.message) } override fun onResponse(call: Call, response: Response) { if (response.isSuccessful) { Log.d(TAG, "请求成功: " + response.body()!!.string()) } else { Log.d(TAG, "请求失败: " + response.message()) } } }) } 复制代码
- 创建OkHttpClient实例
- 创建Request实例
- 创建Call实例
- 调用call.execute或者call.enqueue执行同步或者异步请求
public OkHttpClient() { this(new Builder()); } OkHttpClient(Builder builder) { // 主要是从builder中取出对应字段进行赋值,忽略 ... } 复制代码
public Builder() { // 创建异步执行策略 dispatcher = new Dispatcher(); // 默认协议列表Http1.1、Http2.0 protocols = DEFAULT_PROTOCOLS; // 连接规格,包括TLS(用于https)、CLEARTEXT(未加密用于http) connectionSpecs = DEFAULT_CONNECTION_SPECS; // 事件监听,默认没有 eventListenerFactory = EventListener.factory(EventListener.NONE); // 代理选择器 proxySelector = ProxySelector.getDefault(); // 使用空对象设计模式 if (proxySelector == null) { proxySelector = new NullProxySelector(); } // 提供Cookie策略的持久性 cookieJar = CookieJar.NO_COOKIES; // socket工厂 socketFactory = SocketFactory.getDefault(); // hostname验证器 hostnameVerifier = OkHostnameVerifier.INSTANCE; // 证书标签 certificatePinner = CertificatePinner.DEFAULT; // 代理认证 proxyAuthenticator = Authenticator.NONE; // 认证 authenticator = Authenticator.NONE; // 连接池 connectionPool = new ConnectionPool(); // dns dns = Dns.SYSTEM; // 跟随ssl重定向 followSslRedirects = true; // 跟随重定向 followRedirects = true; // 当连接失败时尝试 retryOnConnectionFailure = true; callTimeout = 0; // 连接、读取、写超时10秒 connectTimeout = 10_000; readTimeout = 10_000; writeTimeout = 10_000; pingInterval = 0; } Builder(OkHttpClient okHttpClient) { // 从已有的OkHttpClient实例中取出对应的参数赋值给当前实例 ... } 复制代码
public Builder() { this.method = "GET"; this.headers = new Headers.Builder(); } Builder(Request request) { this.url = request.url; this.method = request.method; this.body = request.body; this.tags = request.tags.isEmpty() ? Collections.emptyMap() : new LinkedHashMap<>(request.tags); this.headers = request.headers.newBuilder(); } 复制代码
public Builder url(String url) { if (url == null) throw new NullPointerException("url == null"); // 默默的将web socket urls替换成http urls if (url.regionMatches(true, 0, "ws:", 0, 3)) { url = "http:" + url.substring(3); } else if (url.regionMatches(true, 0, "wss:", 0, 4)) { url = "https:" + url.substring(4); } return url(HttpUrl.get(url)); } public Builder url(HttpUrl url) { if (url == null) throw new NullPointerException("url == null"); this.url = url; return this; } public Request build() { if (url == null) throw new IllegalStateException("url == null"); return new Request(this); } Request(Builder builder) { this.url = builder.url; this.method = builder.method; this.headers = builder.headers.build(); this.body = builder.body; this.tags = Util.immutableMap(builder.tags); } 复制代码
// OkHttpClient.java public Call newCall(Request request) { return RealCall.newRealCall(this, request, false /* for web socket */); } // RealCall.java static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { RealCall call = new RealCall(client, originalRequest, forWebSocket); call.transmitter = new Transmitter(client, call); return call; } // Transmitter.java public Transmitter(OkHttpClient client, Call call) { this.client = client; this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool()); this.call = call; this.eventListener = client.eventListenerFactory().create(call); // 设置了AsyncTimeout的超时时间默认是0,下文execute方法执行时会用到 this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS); } 复制代码
// OkHttpClient.java Internal.instance = new Internal() { ... @Override public RealConnectionPool realConnectionPool(ConnectionPool connectionPool) { return connectionPool.delegate; } ... } 复制代码
// OkHttpClient.java public Builder() { connectionPool = new ConnectionPool(); } public final class ConnectionPool { final RealConnectionPool delegate; public ConnectionPool() { this(5, 5, TimeUnit.MINUTES); } public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) { this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit); } // 返回当前连接池的空闲连接数 public int idleConnectionCount() { return delegate.idleConnectionCount(); } // 返回当前连接池的总共连接数 public int connectionCount() { return delegate.connectionCount(); } // 关闭移除当前连接池中所有空闲的连接 public void evictAll() { delegate.evictAll(); } } // RealConnectionPool.java public RealConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) { this.maxIdleConnections = maxIdleConnections; this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration); if (keepAliveDuration <= 0) { throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration); } } public ConnectionPool connectionPool() { return connectionPool; } public Builder() { eventListenerFactory = EventListener.factory(EventListener.NONE); } static EventListener.Factory factory(EventListener listener) { return call -> listener; } public EventListener.Factory eventListenerFactory() { return eventListenerFactory; } 复制代码
public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } transmitter.timeoutEnter(); transmitter.callStart(); try { client.dispatcher().executed(this); return getResponseWithInterceptorChain(); } finally { client.dispatcher().finished(this); } } // Transmitter.java public final class Transmitter { public Transmitter(OkHttpClient client, Call call) { this.client = client; this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool()); this.call = call; this.eventListener = client.eventListenerFactory().create(call); // 设置了AsyncTimeout的超时时间默认是0 this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS); } // AsyncTimeout具体实现就不深究了,只需知道当调用了enter方法后如果达到了设置的时间还没调用 // 就会调用timedOut方法,而这里因为默认的callTimeout是0,因此不会超时 private final AsyncTimeout timeout = new AsyncTimeout() { @Override protected void timedOut() { cancel(); } }; public void timeoutEnter() { timeout.enter(); } public void callStart() { // 创建了一个Throwable记录了当前栈信息 this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()"); // 回调callStart,默认设置的没处理任何事件 eventListener.callStart(call); } } // Dispatcher.java public final class Dispatcher { private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); synchronized void executed(RealCall call) { runningSyncCalls.add(call); } } 复制代码
Response getResponseWithInterceptorChain() throws IOException { List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(new RetryAndFollowUpInterceptor(client)); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); // 责任链设计模式 Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); boolean calledNoMoreExchanges = false; try { Response response = chain.proceed(originalRequest); if (transmitter.isCanceled()) { closeQuietly(response); throw new IOException("Canceled"); } return response; } catch (IOException e) { calledNoMoreExchanges = true; throw transmitter.noMoreExchanges(e); } finally { if (!calledNoMoreExchanges) { transmitter.noMoreExchanges(null); } } } 复制代码
// RealInterceptorChain public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange) throws IOException { // 默认index = 0,如果index超出了拦截器的总长就抛出错误 if (index >= interceptors.size()) throw new AssertionError(); calls++; // 这里刚才传入的exchange为null if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); } // 同上 if (this.exchange != null && calls > 1) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); } // 又创建了一个RealInterceptorChain实例,不过其index在原来基础上加了1 RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange, index + 1, request, call, connectTimeout, readTimeout, writeTimeout); // 取出一个Interceptor实例调用其intercept方法 Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) { throw new IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); } // 如果Interceptor返回了null那么抛出NPE if (response == null) { throw new NullPointerException("interceptor " + interceptor + " returned null"); } if (response.body() == null) { throw new IllegalStateException( "interceptor " + interceptor + " returned a response with no body"); } return response; } 复制代码
public Response intercept(Chain chain) throws IOException { Request request = chain.request(); RealInterceptorChain realChain = (RealInterceptorChain) chain; Transmitter transmitter = realChain.transmitter(); int followUpCount = 0; Response priorResponse = null; while (true) { transmitter.prepareToConnect(request); if (transmitter.isCanceled()) { throw new IOException("Canceled"); } Response response; boolean success = false; try { response = realChain.proceed(request, transmitter, null); // 表示请求成功但是可能是一个重定向响应 success = true; } catch (RouteException e) { // 尝试通过路由连接失败请求将不会发送,判断是否需要进行重试 if (!recover(e.getLastConnectException(), transmitter, false, request)) { throw e.getFirstConnectException(); } continue; } catch (IOException e) { // 试图与服务器通信失败,请求可能已经发送,判断是否需要进行重试 boolean requestSendStarted = !(e instanceof ConnectionShutdownException); if (!recover(e, transmitter, requestSendStarted, request)) throw e; continue; } finally { // 如果请求没有成功则释放资源 if (!success) { transmitter.exchangeDoneDueToException(); } } // 如果上个响应存在(表示上个响应是一个重定向响应,其不会拥有响应体),构建一个新的Response实例 // 将上个响应赋值给priorResponse属性 if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); } // 暂时不理解这个exchange是干什么用的?? Exchange exchange = Internal.instance.exchange(response); Route route = exchange != null ? exchange.connection().route() : null; // 根据响应头判断是否是重定向,如果是就会新建一个Request实例返回 Request followUp = followUpRequest(response, route); // 如果followUp为空也就是没有重定向那么直接返回响应 if (followUp == null) { if (exchange != null && exchange.isDuplex()) { transmitter.timeoutEarlyExit(); } return response; } RequestBody followUpBody = followUp.body(); // 如果限制只发送一次,那也直接返回响应 if (followUpBody != null && followUpBody.isOneShot()) { return response; } closeQuietly(response.body()); if (transmitter.hasExchange()) { exchange.detachWithViolence(); } // 重定向请求太多了,就抛出异常 if (++followUpCount > MAX_FOLLOW_UPS) { throw new ProtocolException("Too many follow-up requests: " + followUpCount); } // 赋值新建的请求,保存上一个响应,最终的响应会包含所有前面重定向的响应 request = followUp; priorResponse = response; } } 复制代码
RetryAndFollowUpInterceptor主要做的就是重试以及处理重定向,内部会调用realChain.proceed调用下层Interceptor实例的intercept方法,当下层Interceptor抛出异常会判断是否有重试的必要 ,当下层返回了一个Response,其会根据该Response判断是否为重定向响应,如果是就会创建新建一个Request实例,再次请求获取到新的Response实例后将原先的Response赋值给其priorResponse属性,以此循环直到请求成功(不再重定向)、超出最大重定向数、抛出不可重试的异常。然后看看BridgeInterceptor
public final class BridgeInterceptor implements Interceptor { private final CookieJar cookieJar; // 这里的CookieJar就是OkHttpClient的CookieJar,默认是一个空实现 public BridgeInterceptor(CookieJar cookieJar) { this.cookieJar = cookieJar; } @Override public Response intercept(Chain chain) throws IOException { Request userRequest = chain.request(); Request.Builder requestBuilder = userRequest.newBuilder(); RequestBody body = userRequest.body(); // 如果有请求体,并且请求头如果没有Content-Type、Content-length、Host、Connection就加上 if (body != null) { MediaType contentType = body.contentType(); if (contentType != null) { requestBuilder.header("Content-Type", contentType.toString()); } long contentLength = body.contentLength(); if (contentLength != -1) { requestBuilder.header("Content-Length", Long.toString(contentLength)); requestBuilder.removeHeader("Transfer-Encoding"); } else { requestBuilder.header("Transfer-Encoding", "chunked"); requestBuilder.removeHeader("Content-Length"); } } if (userRequest.header("Host") == null) { requestBuilder.header("Host", hostHeader(userRequest.url(), false)); } if (userRequest.header("Connection") == null) { requestBuilder.header("Connection", "Keep-Alive"); } // 然后如果没有Accept-Encoding请求头并且不是请求部分资源,那么加上Accept-Encoding: gzip boolean transparentGzip = false; if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true; requestBuilder.header("Accept-Encoding", "gzip"); } // 从cookieJar中取出Cookie列表 List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url()); if (!cookies.isEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)); } // 没有UserAgrent就添加为okhttp/3.14.1 if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", Version.userAgent()); } // 调用下一个Interceptor Response networkResponse = chain.proceed(requestBuilder.build()); // 使用cookieJar保存cookie HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers()); Response.Builder responseBuilder = networkResponse.newBuilder() .request(userRequest); if (transparentGzip && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding")) && HttpHeaders.hasBody(networkResponse)) { // 设置对应的响应头 GzipSource responseBody = new GzipSource(networkResponse.body().source()); Headers strippedHeaders = networkResponse.headers().newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build(); responseBuilder.headers(strippedHeaders); String contentType = networkResponse.header("Content-Type"); // 自动进行解压,不过contentLength变成了-1 responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody))); } return responseBuilder.build(); } private String cookieHeader(List<Cookie> cookies) { StringBuilder cookieHeader = new StringBuilder(); for (int i = 0, size = cookies.size(); i < size; i++) { if (i > 0) { cookieHeader.append("; "); } Cookie cookie = cookies.get(i); cookieHeader.append(cookie.name()).append('=').append(cookie.value()); } return cookieHeader.toString(); } } 复制代码
public final class CacheInterceptor implements Interceptor { // 这个cache就是OkHttpClient里面的internalCache final InternalCache cache; public CacheInterceptor(@Nullable InternalCache cache) { this.cache = cache; } @Override public Response intercept(Chain chain) throws IOException { // 如果给OkHttpClient设置了InternalCache,那么从里面获取缓存的响应 Response cacheCandidate = cache != null ? cache.get(chain.request()) : null; long now = System.currentTimeMillis(); // 这个里面主要是根据请求和缓存的响应判断缓存是否命中 CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); Request networkRequest = strategy.networkRequest; Response cacheResponse = strategy.cacheResponse; if (cache != null) { cache.trackResponse(strategy); } if (cacheCandidate != null && cacheResponse == null) { closeQuietly(cacheCandidate.body()); } // 客户端设置了only-if-cached,表示只使用缓存而缓存又没有命中因此直接构建一个Response返回 if (networkRequest == null && cacheResponse == null) { return new Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); } // 缓存命中,构造一个Response实例并将去掉了body的cacheResponse赋值给该实例的cacheResponse属性 if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); } Response networkResponse = null; try { networkResponse = chain.proceed(networkRequest); } finally { // 发生了异常需要将缓存响应体关闭 if (networkResponse == null && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } } // 如果有缓存响应并且响应码是304,就根据返回的响应和缓存的响应构造一个新的响应并且更新下缓存 if (cacheResponse != null) { if (networkResponse.code() == HTTP_NOT_MODIFIED) { Response response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers(), networkResponse.headers())) .sentRequestAtMillis(networkResponse.sentRequestAtMillis()) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis()) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close(); cache.trackConditionalCacheHit(); cache.update(cacheResponse, response); return response; } else { closeQuietly(cacheResponse.body()); } } // 响应码不是304,则构造一个新的Response将cacheResponse、networkResponse分别去掉body赋值给 // cacheResponse和networkResponse Response response = networkResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); if (cache != null) { if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) { // 将最终响应放到cache中 CacheRequest cacheRequest = cache.put(response); return cacheWritingResponse(cacheRequest, response); } if (HttpMethod.invalidatesCache(networkRequest.method())) { try { cache.remove(networkRequest); } catch (IOException ignored) { } } } return response; } } public final class CacheStrategy { public static class Factory { public Factory(long nowMillis, Request request, Response cacheResponse) { this.nowMillis = nowMillis; this.request = request; this.cacheResponse = cacheResponse; // 根据缓存响应头提取出一些缓存有关的信息 if (cacheResponse != null) { this.sentRequestMillis = cacheResponse.sentRequestAtMillis(); this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis(); Headers headers = cacheResponse.headers(); for (int i = 0, size = headers.size(); i < size; i++) { String fieldName = headers.name(i); String value = headers.value(i); if ("Date".equalsIgnoreCase(fieldName)) { servedDate = HttpDate.parse(value); servedDateString = value; } else if ("Expires".equalsIgnoreCase(fieldName)) { expires = HttpDate.parse(value); } else if ("Last-Modified".equalsIgnoreCase(fieldName)) { lastModified = HttpDate.parse(value); lastModifiedString = value; } else if ("ETag".equalsIgnoreCase(fieldName)) { etag = value; } else if ("Age".equalsIgnoreCase(fieldName)) { ageSeconds = HttpHeaders.parseSeconds(value, -1); } } } } } } public CacheStrategy get() { CacheStrategy candidate = getCandidate(); if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) { // 如果设置了onlyIfCached那么在CacheInterceptor的intercept方法内部会直接构造一个响应码为504的Response return new CacheStrategy(null, null); } return candidate; } private CacheStrategy getCandidate() { // 没有缓存响应就直接创建一个没响应的CacheStrategy实例 if (cacheResponse == null) { return new CacheStrategy(request, null); } // 丢弃缓存响应,如果请求是https并且缺少必要的握手 if (request.isHttps() && cacheResponse.handshake() == null) { return new CacheStrategy(request, null); } // 如果不应该使用缓存也丢弃缓存响应 if (!isCacheable(cacheResponse, request)) { return new CacheStrategy(request, null); } CacheControl requestCaching = request.cacheControl(); // 如果Request包含noCache请求头,或者带上了 If-Modified-Since、If-None-Match两个请求头也丢弃响应 // 应该带上这两个请求头表示客户端在循环服务端资源是否发生变化,没变化会返回304,因此不应该使用缓存 if (requestCaching.noCache() || hasConditions(request)) { return new CacheStrategy(request, null); } CacheControl responseCaching = cacheResponse.cacheControl(); long ageMillis = cacheResponseAge(); long freshMillis = computeFreshnessLifetime(); if (requestCaching.maxAgeSeconds() != -1) { freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds())); } long minFreshMillis = 0; if (requestCaching.minFreshSeconds() != -1) { minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds()); } long maxStaleMillis = 0; if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) { maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds()); } // 根据时间判断是否缓存命中,不知道具体是怎么判断的? if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) { Response.Builder builder = cacheResponse.newBuilder(); if (ageMillis + minFreshMillis >= freshMillis) { builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\""); } long oneDayMillis = 24 * 60 * 60 * 1000L; if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) { builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\""); } return new CacheStrategy(null, builder.build()); } // 添加If-None-Match、If-Modified-Since请求头 String conditionName; String conditionValue; if (etag != null) { conditionName = "If-None-Match"; conditionValue = etag; } else if (lastModified != null) { conditionName = "If-Modified-Since"; conditionValue = lastModifiedString; } else if (servedDate != null) { conditionName = "If-Modified-Since"; conditionValue = servedDateString; } else { return new CacheStrategy(request, null); } Headers.Builder conditionalRequestHeaders = request.headers().newBuilder(); Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue); Request conditionalRequest = request.newBuilder() .headers(conditionalRequestHeaders.build()) .build(); return new CacheStrategy(conditionalRequest, cacheResponse); } 复制代码
val client = OkHttpClient.Builder().cache(Cache(cacheFile, 50 * 1000)).build() 复制代码
public final class ConnectInterceptor implements Interceptor { public final OkHttpClient client; public ConnectInterceptor(OkHttpClient client) { this.client = client; } @Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); Transmitter transmitter = realChain.transmitter(); boolean doExtensiveHealthChecks = !request.method().equals("GET"); Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks); return realChain.proceed(request, transmitter, exchange); } } 复制代码
public final class CallServerInterceptor implements Interceptor { private final boolean forWebSocket; public CallServerInterceptor(boolean forWebSocket) { this.forWebSocket = forWebSocket; } @Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Exchange exchange = realChain.exchange(); // 根据Request生成对应的字节数组并且写入到Buffer中 Request request = realChain.request(); long sentRequestMillis = System.currentTimeMillis(); exchange.writeRequestHeaders(request); boolean responseHeadersStarted = false; Response.Builder responseBuilder = null; // 如果请求包含请求体,写入请求体 if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { exchange.flushRequest(); responseHeadersStarted = true; exchange.responseHeadersStart(); responseBuilder = exchange.readResponseHeaders(true); } if (responseBuilder == null) { if (request.body().isDuplex()) { exchange.flushRequest(); BufferedSink bufferedRequestBody = Okio.buffer( exchange.createRequestBody(request, true)); request.body().writeTo(bufferedRequestBody); } else { BufferedSink bufferedRequestBody = Okio.buffer( exchange.createRequestBody(request, false)); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } } else { exchange.noRequestBody(); if (!exchange.connection().isMultiplexed()) { exchange.noNewExchangesOnConnection(); } } } else { exchange.noRequestBody(); } if (request.body() == null || !request.body().isDuplex()) { // 将Buffer中的数据写给服务端 exchange.finishRequest(); } if (!responseHeadersStarted) { exchange.responseHeadersStart(); } if (responseBuilder == null) { // 获取响应头 responseBuilder = exchange.readResponseHeaders(false); } Response response = responseBuilder .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); int code = response.code(); if (code == 100) { response = exchange.readResponseHeaders(false) .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); code = response.code(); } exchange.responseHeadersEnd(response); if (forWebSocket && code == 101) { response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { response = response.newBuilder() .body(exchange.openResponseBody(response)) .build(); } if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { exchange.noNewExchangesOnConnection(); } if ((code == 204 || code == 205) && response.body().contentLength() > 0) { throw new ProtocolException( "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength()); } return response; } } 复制代码
@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } transmitter.timeoutEnter(); transmitter.callStart(); try { client.dispatcher().executed(this); return getResponseWithInterceptorChain(); } finally { client.dispatcher().finished(this); } } void finished(RealCall call) { finished(runningSyncCalls, call); } private <T> void finished(Deque<T> calls, T call) { Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); idleCallback = this.idleCallback; } boolean isRunning = promoteAndExecute(); if (!isRunning && idleCallback != null) { idleCallback.run(); } } 复制代码
public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) { throw new IllegalStateException("Already Executed"); } executed = true; } transmitter.callStart(); client.dispatcher().enqueue(new AsyncCall(responseCallback)); } 复制代码
//Dispatcher.java void enqueue(AsyncCall call) { synchronized (this) { // 将call加入到队列中去 readyAsyncCalls.add(call); if (!call.get().forWebSocket) { // 刚刚创建的call不是使用webSocket所以进入这里 AsyncCall existingCall = findExistingCallWithHost(call.host()); // 目的只是为了统计每个Host有几个AsyncCall if (existingCall != null) call.reuseCallsPerHostFrom(existingCall); } } promoteAndExecute(); } // 从正在执行或者等待执行的call队列中取出host属性为host的AsyncCall实例 private AsyncCall findExistingCallWithHost(String host) { for (AsyncCall existingCall : runningAsyncCalls) { if (existingCall.host().equals(host)) return existingCall; } for (AsyncCall existingCall : readyAsyncCalls) { if (existingCall.host().equals(host)) return existingCall; } return null; } private boolean promoteAndExecute() { assert (!Thread.holdsLock(this)); List<AsyncCall> executableCalls = new ArrayList<>(); boolean isRunning; synchronized (this) { for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall asyncCall = i.next(); // 如果已经达到最大请求数64就停止执行 if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity. // 如果每个Host达到了最大请求数5个就跳过该call if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity. i.remove(); // 每个端口请求计数加1 asyncCall.callsPerHost().incrementAndGet(); executableCalls.add(asyncCall); runningAsyncCalls.add(asyncCall); } isRunning = runningCallsCount() > 0; } for (int i = 0, size = executableCalls.size(); i < size; i++) { AsyncCall asyncCall = executableCalls.get(i); // 创建一个线程池,然后执行AsyncCall的execute方法 asyncCall.executeOn(executorService()); } return isRunning; } final class AsyncCall extends NamedRunnable { private final Callback responseCallback; private volatile AtomicInteger callsPerHost = new AtomicInteger(0); AsyncCall(Callback responseCallback) { super("OkHttp %s", redactedUrl()); this.responseCallback = responseCallback; } AtomicInteger callsPerHost() { return callsPerHost; } void reuseCallsPerHostFrom(AsyncCall other) { this.callsPerHost = other.callsPerHost; } String host() { return originalRequest.url().host(); } Request request() { return originalRequest; } RealCall get() { return RealCall.this; } void executeOn(ExecutorService executorService) { assert (!Thread.holdsLock(client.dispatcher())); boolean success = false; try { executorService.execute(this); success = true; } catch (RejectedExecutionException e) { InterruptedIOException ioException = new InterruptedIOException("executor rejected"); ioException.initCause(e); transmitter.noMoreExchanges(ioException); responseCallback.onFailure(RealCall.this, ioException); } finally { if (!success) { client.dispatcher().finished(this); // This call is no longer running! } } } @Override protected void execute() { // 在线程池中执行 boolean signalledCallback = false; transmitter.timeoutEnter(); try { Response response = getResponseWithInterceptorChain(); signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } catch (IOException e) { if (signalledCallback) { Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { // 每个端口请求计数减1 client.dispatcher().finished(this); } } } 复制代码
class WrapCall(private val call: Call) : Call by call { override fun enqueue(responseCallback: Callback) { call.enqueue(object : Callback { override fun onFailure(call: Call, e: IOException) { handler.post { responseCallback.onFailure(call, e) } } override fun onResponse(call: Call, response: Response) { handler.post { responseCallback.onResponse(call, response) } } }) } companion object { private val handler = Handler(Looper.getMainLooper()) } } // 外界使用,只要包装下 val call = WrapCall(client.newCall(request)) call.enqueue(object : Callback { override fun onFailure(call: Call, e: IOException) { Log.d(TAG, "错误: " + e.message) } override fun onResponse(call: Call, response: Response) { if (response.isSuccessful) { Log.d(TAG, "请求成功: ${Looper.myLooper() === Looper.getMainLooper()}" + response.body()!!.string()) } else { Log.d(TAG, "请求失败: ${Looper.myLooper() === Looper.getMainLooper()}" + response.message()) } } }) 复制代码
public final class RealConnectionPool { // 这个线程池是专门用来执行清理线程的 private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */, Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS, new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true)); // 最大空闲连接数 默认5个 private final int maxIdleConnections; // 最大保存存活的空闲连接时间 默认5分钟 private final long keepAliveDurationNs; // 当向连接池中加入一个连接后会执行清理操作,内部会寻找空闲时间最长的连接,如果其空闲时间 // 已经超过了最长时间就会将其关闭,不然就等待指定时间 private final Runnable cleanupRunnable = () -> { while (true) { long waitNanos = cleanup(System.nanoTime()); if (waitNanos == -1) return; if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); synchronized (RealConnectionPool.this) { try { RealConnectionPool.this.wait(waitMillis, (int) waitNanos); } catch (InterruptedException ignored) { } } } } }; // 双端队列保存当前OkHttpClient的所有连接 private final Deque<RealConnection> connections = new ArrayDeque<>(); // 记录了失败的路由 final RouteDatabase routeDatabase = new RouteDatabase(); // 清理线程是否正在执行 boolean cleanupRunning; // 首先创建了一个最大空闲连接为5,最大保存存活时间5分钟的连接池 public RealConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) { this.maxIdleConnections = maxIdleConnections; this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration); if (keepAliveDuration <= 0) { throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration); } } // 获取空闲连接数量 public synchronized int idleConnectionCount() { int total = 0; for (RealConnection connection : connections) { if (connection.transmitters.isEmpty()) total++; } return total; } // 总共的连接数 public synchronized int connectionCount() { return connections.size(); } // Transmitter调用该方法获取连接,内部判断如果连接池有连接满足条件就返回true,不然返回false boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter, @Nullable List<Route> routes, boolean requireMultiplexed) { assert (Thread.holdsLock(this)); for (RealConnection connection : connections) { if (requireMultiplexed && !connection.isMultiplexed()) continue; if (!connection.isEligible(address, routes)) continue; transmitter.acquireConnectionNoEvents(connection); return true; } return false; } // 将一个新建的连接放入到连接池中 void put(RealConnection connection) { assert (Thread.holdsLock(this)); // 如果清理线程还没运行就开始运行 if (!cleanupRunning) { cleanupRunning = true; executor.execute(cleanupRunnable); } // 再将连接放入双端队列中去 connections.add(connection); } // 当一个连接从执行中变成了空闲时调用,该方法会唤醒清理现场 boolean connectionBecameIdle(RealConnection connection) { assert (Thread.holdsLock(this)); if (connection.noNewExchanges || maxIdleConnections == 0) { connections.remove(connection); return true; } else { notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit. return false; } } // 关闭所有的连接 public void evictAll() { List<RealConnection> evictedConnections = new ArrayList<>(); synchronized (this) { for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) { RealConnection connection = i.next(); if (connection.transmitters.isEmpty()) { connection.noNewExchanges = true; evictedConnections.add(connection); i.remove(); } } } for (RealConnection connection : evictedConnections) { closeQuietly(connection.socket()); } } // 如果可以清理的话就关闭Socket返回0,不然返回需要等待的时间 long cleanup(long now) { int inUseConnectionCount = 0; int idleConnectionCount = 0; RealConnection longestIdleConnection = null; long longestIdleDurationNs = Long.MIN_VALUE; synchronized (this) { for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) { RealConnection connection = i.next(); if (pruneAndGetAllocationCount(connection, now) > 0) { inUseConnectionCount++; continue; } idleConnectionCount++; long idleDurationNs = now - connection.idleAtNanos; if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; } } if (longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) { connections.remove(longestIdleConnection); } else if (idleConnectionCount > 0) { return keepAliveDurationNs - longestIdleDurationNs; } else if (inUseConnectionCount > 0) { return keepAliveDurationNs; } else { cleanupRunning = false; return -1; } } closeQuietly(longestIdleConnection.socket()); return 0; } private int pruneAndGetAllocationCount(RealConnection connection, long now) { // 对于Http1.1这个List最多也只有一个元素 List<Reference<Transmitter>> references = connection.transmitters; for (int i = 0; i < references.size(); ) { Reference<Transmitter> reference = references.get(i); if (reference.get() != null) { i++; continue; } // 发现一个泄露的transmitter,这是一个应用程序的bug TransmitterReference transmitterRef = (TransmitterReference) reference; String message = "A connection to " + connection.route().address().url() + " was leaked. Did you forget to close a response body?"; Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace); references.remove(i); connection.noNewExchanges = true; eviction. if (references.isEmpty()) { connection.idleAtNanos = now - keepAliveDurationNs; return 0; } } return references.size(); } // 连接失败时调用将失败的Route放入到RouteDatabase中 public void connectFailed(Route failedRoute, IOException failure) { // Tell the proxy selector when we fail to connect on a fresh connection. if (failedRoute.proxy().type() != Proxy.Type.DIRECT) { Address address = failedRoute.address(); address.proxySelector().connectFailed( address.url().uri(), failedRoute.proxy().address(), failure); } routeDatabase.failed(failedRoute); } } 复制代码
- RetryAndFollowUpInterceptor 用于错误重试,以及重定向
- BridgeInterceptor 用于添加请求头(User-Agent、Connection等等),收到响应的时候可能会进行GZip解压
- CacheInterceptor 进行缓存管理,默认不带缓存,如果需要缓存可以给OkHttpClient设置cache属性,可以使用OkHttp内置的Cache类
- ConnectInterceptor 进行连接,首先从连接池中取出可以复用的连接,取不到就新建一个然后通过InetAddress获取到域名对于的IP地址,然后创建Socket与服务端进行连接,连接成功后如果是Https请求还会进行握手验证证书操作
- CallServerInterceptor 用于真正的发起请求,从Socket获取的输出流写入请求数据,从输入流中读取到响应数据
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析