Github: okhttp 分析版本:930d4d0

An HTTP client for Android, Kotlin, and Java.

OkHttp is an HTTP client that’s efficient by default:

  • HTTP/2 support allows all requests to the same host to share a socket.
  • Connection pooling reduces request latency (if HTTP/2 isn’t available).
  • Transparent GZIP shrinks download sizes.
  • Response caching avoids the network completely for repeat requests.

使用

Get a URL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class GetExample {
OkHttpClient client = new OkHttpClient();

String run(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();

try (Response response = client.newCall(request).execute()) {
return response.body().string();
}
}

public static void main(String[] args) throws IOException {
GetExample example = new GetExample();
String response = example.run("https://raw.github.com/square/okhttp/master/README.md");
System.out.println(response);
}
}

Post to a Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class PostExample {
public static final MediaType JSON = MediaType.get("application/json; charset=utf-8");

OkHttpClient client = new OkHttpClient();

String post(String url, String json) throws IOException {
RequestBody body = RequestBody.create(json, JSON);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
return response.body().string();
}
}

String bowlingJson(String player1, String player2) {
return "{'winCondition':'HIGH_SCORE',"
+ "'name':'Bowling',"
+ "'round':4,"
+ "'lastSaved':1367702411696,"
+ "'dateStarted':1367702378785,"
+ "'players':["
+ "{'name':'" + player1 + "','history':[10,8,6,7,8],'color':-13388315,'total':39},"
+ "{'name':'" + player2 + "','history':[6,10,5,10,10],'color':-48060,'total':41}"
+ "]}";
}

public static void main(String[] args) throws IOException {
PostExample example = new PostExample();
String json = example.bowlingJson("Jesse", "Jake");
String response = example.post("http://www.roundsapp.com/post", json);
System.out.println(response);
}
}

更多

okhttp

源码

OkHttpClient

1
OkHttpClient client = new OkHttpClient();

根据名字我们就能看出,OkHttpClient 为 OkHttp 的客户端,在使用的时候首先要做的就是要创建这样一个客户端

1
2
3
4
5
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
constructor() : this(Builder())
}

默认构造方法使用的是默认配置的 Builder:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Builder constructor() {
internal var dispatcher: Dispatcher = Dispatcher() // 调度器
internal var proxy: Proxy? = null // 代理
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS // 协议
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS // 传输层版本和连接协议
internal val interceptors: MutableList<Interceptor> = mutableListOf() // 拦截器
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf() // 网络拦截器
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
internal var proxySelector: ProxySelector = ProxySelector.getDefault() ?: NullProxySelector() // 代理选择器
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES // cookie
internal var cache: Cache? = null // cache 缓存
internal var internalCache: InternalCache? = null // 内部缓存
internal var socketFactory: SocketFactory = SocketFactory.getDefault() // socket 工厂
internal var sslSocketFactory: SSLSocketFactory? = null // socket工厂 用于https
internal var certificateChainCleaner: CertificateChainCleaner? = null // 验证确认响应书,适用HTTPS 请求连接的主机名
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier // 主机名字确认
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT // 证书链
internal var proxyAuthenticator: Authenticator = Authenticator.NONE // 代理身份验证
internal var authenticator: Authenticator = Authenticator.NONE // 身份验证
internal var connectionPool: ConnectionPool = ConnectionPool() //链接复用池
internal var dns: Dns = Dns.SYSTEM // DNS
internal var followSslRedirects: Boolean = true // 重定向
internal var followRedirects: Boolean = true // 本地重定向
internal var retryOnConnectionFailure: Boolean = true // 重试连接失败
internal var callTimeout: Int = 0 // 请求超时
internal var connectTimeout: Int = 10000 // 连接超时
internal var readTimeout: Int = 10000 // 读取超时
internal var writeTimeout: Int = 10000 // 写入超时
internal var pingInterval: Int = 0 // Web socket and HTTP/2 ping interval
// ...
fun build(): OkHttpClient = OkHttpClient(this)
}

okhttp 的最佳表现就是创建一个 OkHttpClient 实例,并将其重用到所有的 http 请求调用上之所以所有请求公用一个 OkHttpClient,因为每个 OkHttpClient 都有自己的的连接池和线程池,这样的话可以重用连接和线程可减少延迟并节省内存

Request

1
2
3
Request request = new Request.Builder()
.url(url)
.build();

发送一个 HTTP 请求类要构建一个 Request 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class Request internal constructor(
@get:JvmName("url") val url: HttpUrl, // 请求地址
@get:JvmName("method") val method: String, // 请求方法[GET/POST/PUT/PATCH/...]
@get:JvmName("headers") val headers: Headers, // 请求头
@get:JvmName("body") val body: RequestBody?, // 请求体
internal val tags: Map<Class<*>, Any> // 请求标签
) {
// ...
open class Builder {
internal var url: HttpUrl? = null
internal var method: String
internal var headers: Headers.Builder
internal var body: RequestBody? = null

//...

constructor() {
this.method = "GET"
this.headers = Headers.Builder()
}

/**
* Sets the URL target of this request.
*
* @throws IllegalArgumentException if [url] is not a valid HTTP or HTTPS URL. Avoid this
* exception by calling [HttpUrl.parse]; it returns null for invalid URLs.
*/

open fun url(url: String): Builder {
// Silently replace web socket URLs with HTTP URLs.
val finalUrl: String = when {
url.startsWith("ws:", ignoreCase = true) -> {
"http:${url.substring(3)}"
}
url.startsWith("wss:", ignoreCase = true) -> {
"https:${url.substring(4)}"
}
else -> url
}

return url(finalUrl.toHttpUrl())
}

// ...

open fun build(): Request {
return Request(
checkNotNull(url) { "url == null" },
method,
headers.build(),
body,
tags.toImmutableMap()
)
}
}

Request 也是通过 Builder 形式来创建的

Call

1
Call call = client.newCall(request);

Call 即调用是一个准备好去执行的请求 Request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
interface Call : Cloneable {

fun request(): Request

@Throws(IOException::class)
fun execute(): Response

fun enqueue(responseCallback: Callback)

fun cancel()

fun isExecuted(): Boolean

fun isCanceled(): Boolean

fun timeout(): Timeout

override fun clone(): Call

interface Factory {
fun newCall(request: Request): Call
}
}
  • request(): 返回的是原始的请求,也即调用 OkHttpClient 中的 newCall() 中设置的 Request
  • execute(): 同步请求方法,阻塞式方法
  • enqueue(responseCallback: Callback): 异步请求方法,通过调度器 dispatcher 定义了该方法请求何时被运行
  • cancel(): 取消请求
  • isExecuted(): 请求是否已经执行
  • isCanceled(): 请求是否已取消
  • timeout(): 超时

OkHttpClient 实现了 Call.Factory,使用工厂模式将构建的细节交给具体实现,顶层只需要拿到 Call 对象即可

1
2
3
4
5
6
7
8
9
10
11
12
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
// ...

/** Prepares the [request] to be executed at some point in the future. */
override fun newCall(request: Request): Call {
return RealCall.newRealCall(this, request, false /* for web socket */)
}

// ...
}

继续看 RealCall 中的 newRealCall 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
internal class RealCall private constructor(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
/**
* There is a cycle between the [Call] and [Transmitter] that makes this awkward.
* This is set after immediately after creating the call instance.
*/

private lateinit var transmitter: Transmitter

// ...

companion object {
fun newRealCall(
client: OkHttpClient,
originalRequest: Request,
forWebSocket: Boolean

): RealCall {

// Safely publish the Call instance to the EventListener.
return RealCall(client, originalRequest, forWebSocket).apply {
transmitter = Transmitter(client, this)
}
}
}
}

RealCall 为具体产品,实现了 Call 接口;其中 Transmitter 是 OkHttp 的应用层和网络层的一个桥梁类,包含了连接,请求,响应和流

Transmitter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Transmitter(
private val client: OkHttpClient,
private val call: Call
) {
private val connectionPool: RealConnectionPool = client.connectionPool().delegate
private val eventListener: EventListener = client.eventListenerFactory().create(call)
private val timeout = object : AsyncTimeout() {
override fun timedOut() {
cancel()
}
}.apply {
timeout(client.callTimeoutMillis().toLong(), MILLISECONDS)
}
// ...
}

在创建 Transmitter 对象的时候设置了相关指标的监听器和 ConnectionPool

execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
internal class RealCall private constructor(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
// ...

override fun execute(): Response {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
transmitter.timeoutEnter()
transmitter.callStart()
try {
client.dispatcher().executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher().finished(this)
}
}

// ...
}
  • 同步代码块,内部是做一个判断,判断是否已经执行execute方法,如果执行了抛出异常
  • 超时计时,最终调用的是 AsyncTimeout 类中的 enter() 方法
  • 请求开始的相关操作
    • getStackTraceForCloseable():应该是追踪捕捉一些堆栈信息
    • 调用 EventListener 的 callStart() 方法
  • 将请求 call 添加到调度器中的同步双端队列中
  • 通过拦截器链获取响应并返回
  • 请求结束时候回收移除同步请求

getResponseWithInterceptorChain()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
internal class RealCall private constructor(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
// ...

@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = ArrayList<Interceptor>()
interceptors.addAll(client.interceptors())
interceptors.add(RetryAndFollowUpInterceptor(client)) // 失败重试以及重定向
interceptors.add(BridgeInterceptor(client.cookieJar())) // 用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应
interceptors.add(CacheInterceptor(client.internalCache())) // 读取缓存直接返回、更新缓存
interceptors.add(ConnectInterceptor(client)) // 和服务器建立连接
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors())
}
interceptors.add(CallServerInterceptor(forWebSocket)) // 向服务器发送请求数据、从服务器读取响应数据

val chain = RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis())

var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (transmitter.isCanceled) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw transmitter.noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null)
}
}
}

// ...
}
  • 将自定义的拦截器和 okhttp 本身存在的拦截器添加到拦截器的集合
  • 创建一个拦截器链对象 Interceptor.Chain
  • 调用拦截器链对象的 proceed() ,开启链式调用请求,并最终返回响应 response
  • 结束请求,调用 Transmitter 对象的 noMoreExchanges() ,释放请求连接

Interceptor 使用的是责任链模式

enqueue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
internal class RealCall private constructor(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
// ...

override fun enqueue(responseCallback: Callback) {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
transmitter.callStart()
client.dispatcher().enqueue(AsyncCall(responseCallback))
}

// ...
}

最终调用的是 Dispatcher 中的 enqueue()

Dispatcher

executed(call: RealCall)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Dispatcher constructor() {
// ...

/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private val runningSyncCalls = ArrayDeque<RealCall>()

// ...

/** Used by `Call#execute` to signal it is in-flight. */
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}

// ...
}

直接将将 call 添加到正在执行的请求队列中去,runningSyncCalls 为正在请求的同步队列

enqueue(call: AsyncCall)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Dispatcher constructor() {
// ...

/** Ready async calls in the order they'll be run. */
private val readyAsyncCalls = ArrayDeque<AsyncCall>()

// ...

internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)

// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket) {
val existingCall = findExistingCallWithHost(call.host())
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}

// ...
}

封装到一个 AsyncCall 中传递进来,添加到正在等待的异步队列 readyAsyncCalls 中去,接着继续调用 promoteAndExecute() 方法执行相关操作

promoteAndExecute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class Dispatcher constructor() {
// ...

/**
* Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
* executor service. Must not be called with synchronization because executing calls can call
* into user code.
*
* @return true if the dispatcher is currently running calls.
*/

private fun promoteAndExecute(): Boolean {
assert(!Thread.holdsLock(this))

val executableCalls = ArrayList<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()

if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.

i.remove()
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}

for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService())
}

return isRunning
}

@Synchronized fun executorService(): ExecutorService {
if (executorService == null) {
executorService = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
}
return executorService!!
}

// ...
}
  • 创建一个集合用于添加储存可以执行的 Call
  • 遍历准备好的异步请求队列 readyAsyncCalls
  • 检验正在运行的异步请求队列 runningAsyncCalls 的大小数量是否超过了最大请求数 maxRequests,如果超过了最大的请求个数,直接跳出循环
  • 校验异步请求每个主机的请求数是否超过主机最大请求数 maxRequestsPerHost,若超过,则跳过这次循环
  • 将遍历到的第一个 AsyncCall 添加到可执行的集合 executableCalls 和正在运行的异步请求队列 runningAsyncCalls 中去,同时,在 readyAsyncCalls 将这条数据移除,并将主机请求数量 +1
  • 待循环遍历结束,获取正在运行的请求数量,判断是否存在正在运行的请求
  • 对 readyAsyncCalls 循环遍历结束后,会得到一个可运行的 executableCalls 集合,遍历该集合,将集合中的 AsyncCall 依次调用执行

AsyncCall

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
internal class RealCall private constructor(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
/**
* Attempt to enqueue this async call on [executorService]. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/

fun executeOn(executorService: ExecutorService) {
assert(!Thread.holdsLock(client.dispatcher()))
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
transmitter.noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher().finished(this) // This call is no longer running!
}
}
}

override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
transmitter.timeoutEnter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} finally {
client.dispatcher().finished(this)
}
}
}
}
}

又回到了 getResponseWithInterceptorChain()

RealInterceptorChain

所有的 interceptor 都整合到了 RealInterceptorChain 中,执行拦截器链方法 proceed()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class RealInterceptorChain(
private val interceptors: List<Interceptor>,
private val transmitter: Transmitter,
private val exchange: Exchange?,
private val index: Int,
private val request: Request,
private val call: Call,
private val connectTimeout: Int,
private val readTimeout: Int,
private val writeTimeout: Int
) : Interceptor.Chain {

// ...

override fun proceed(request: Request): Response {
return proceed(request, transmitter, exchange)
}

@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
if (index >= interceptors.size) throw AssertionError()

calls++

// If we already have a stream, confirm that the incoming request will use it.
check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}

// If we already have a stream, confirm that this is the only call to chain.proceed().
check(this.exchange == null || calls <= 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}

// Call the next interceptor in the chain.
val next = RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
val interceptor = interceptors[index]

@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")

// Confirm that the next interceptor made its required call to chain.proceed().
check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {

"network interceptor $interceptor must call proceed() exactly once"
}

check(response.body() != null) { "interceptor $interceptor returned a response with no body" }

return response
}
}

在初始化的时候,会将所有拦截器组成的集合传递过来,同时将请求 RequestCall 也会传递过来,index 参数,最开始传入的是 0,exchange 参数,如果是应用拦截器,connection 必须是 null;如果是网络拦截器,connection 必须不为 null

  • 首先判断 index 是否大于总的拦截器个数,大于抛出 AssertionError()
  • 对 calls 进行 +1 操作
  • 判断是否是一个网络拦截器,并且判断其 host 和 port 是否一致
  • 判断是否是一个网络拦截器,如果是判断是否该网络拦截器的 proceed 方法调用次数是否超过一次
  • 继续创建一个新的拦截器链对象,此时传入的 index 会进行 index+1 操作,表示开始真正的调用相关拦截器操作
  • 调用拦截器的 intercept 方法,将新的拦截器链对象塞进去
  • 返回 response 后进行校验

http://yydcdut.com/img/okhttp_interceptors.jpg

参考