Github: okhttp 分析版本:930d4d0

Opens a connection to the target server and proceeds to the next interceptor

intercept(chain: Interceptor.Chain)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ConnectInterceptor(val client: OkHttpClient) : Interceptor {

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val request = realChain.request()
val transmitter = realChain.transmitter()

// We need the network to satisfy this request. Possibly for validating a conditional GET.
val doExtensiveHealthChecks = request.method != "GET"
val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)

return realChain.proceed(request, transmitter, exchange)
}
}
  • doExtensiveHealthChecks 为非 GET 请求
  • 通过 transmitter.newExchange() 来创建 Exchange
  • realChain.proceed() 告知下一个拦截器开始去执行

Transmitter#newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Transmitter(
private val client: OkHttpClient,
private val call: Call
) {
/** Returns a new exchange to carry a new request and response. */
internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
synchronized(connectionPool) {
check(!noMoreExchanges) { "released" }
check(exchange == null) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
}
}

val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)

synchronized(connectionPool) {
this.exchange = result
this.exchangeRequestDone = false
this.exchangeResponseDone = false
return result
}
}
}

通过 exchangeFinder!!.find() 来创建 ExchangeCodec

ExchangeFinder#find(client: OkHttpClient, chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class ExchangeFinder(
private val transmitter: Transmitter,
private val connectionPool: RealConnectionPool,
private val address: Address,
private val call: Call,
private val eventListener: EventListener
) {
fun find(
client: OkHttpClient,
chain: Interceptor.Chain,
doExtensiveHealthChecks: Boolean

): ExchangeCodec {

val connectTimeout = chain.connectTimeoutMillis()
val readTimeout = chain.readTimeoutMillis()
val writeTimeout = chain.writeTimeoutMillis()
val pingIntervalMillis = client.pingIntervalMillis()
val connectionRetryEnabled = client.retryOnConnectionFailure()

try {
// 寻找一个链接(在链接池中寻找或者在新创建一个连接)
val resultConnection = findHealthyConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled,
doExtensiveHealthChecks = doExtensiveHealthChecks
)
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure()
throw e
} catch (e: IOException) {
trackFailure()
throw RouteException(e)
}
}
}

通过 findHealthyConnection() 找到一条『健康』的链接,然后通过 RealConnection#newCodec() 来创建 ExchangeCodec

ExchangeFinder#findHealthyConnection(connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class ExchangeFinder(
private val transmitter: Transmitter,
private val connectionPool: RealConnectionPool,
private val address: Address,
private val call: Call,
private val eventListener: EventListener
) {

/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/

@Throws(IOException::class)
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean

): RealConnection {

while (true) { // 循环查找一个链接
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)

// 如果是新链接,跳过 healthy 判断直接返回
// If this is a brand new connection, we can skip the extensive health checks.
synchronized(connectionPool) {
if (candidate.successCount == 0) {
return candidate
}
}

// 这条链接是否可用
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges() // 禁止这条链接,将 noNewExchanges 置为 true
continue
}

return candidate
}
}
}

findHealthyConnection() 是负责死循环去检测获取到的 RealConnection 是否可用,如果是新创建的则跳过检测,当 RealConnection 不可用的话就继续去调用 findConnection 去重新获取一个连接

ExchangeFinder#findConnection(connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class ExchangeFinder(
private val transmitter: Transmitter,
private val connectionPool: RealConnectionPool,
private val address: Address,
private val call: Call,
private val eventListener: EventListener
) {

/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/

@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean

): RealConnection {

var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
if (transmitter.isCanceled) throw IOException("Canceled") // 取消了
hasStreamFailure = false // This is a fresh attempt.

releasedConnection = transmitter.connection
// 若不可用了,则关闭
toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
transmitter.releaseConnectionNoEvents()
} else {
null
}

// 从 transmitter 获取
if (transmitter.connection != null) {
// We had an already-allocated connection and it's good.
result = transmitter.connection
releasedConnection = null
}

if (result == null) {
// 从链接池中取,取到赋值给 transmitter
// Attempt to get a connection from the pool.
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true
result = transmitter.connection
} else if (nextRouteToTry != null) {
// 路由
selectedRoute = nextRouteToTry
nextRouteToTry = null
} else if (retryCurrentRoute()) {
selectedRoute = transmitter.connection!!.route()
}
}
}
// 释放没用的 connection
toClose?.closeQuietly()

if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection!!)
}
// 如果找到复用的,则使用这条链接,回调
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
}
if (result != null) {
// 找到一条可复用的链接
// If we found an already-allocated or pooled connection, we're done.
return result!!
}

// 切换路由再在链接池里面找,如果有则返回
// If we need a route selection, make one. This is a blocking operation.
var newRouteSelection = false
if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
newRouteSelection = true
routeSelection = routeSelector.next()
}

var routes: List<Route>? = null
synchronized(connectionPool) {
if (transmitter.isCanceled) throw IOException("Canceled")

if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
routes = routeSelection!!.routes
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true
result = transmitter.connection
}
}

if (!foundPooledConnection) {
// 没找到则创建一条
if (selectedRoute == null) {
selectedRoute = routeSelection!!.next()
}

// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
}

// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
return result!!
}

// 建立链接
// Do TCP + TLS handshakes. This is a blocking operation.
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
// 将这条路由从错误缓存中清除
connectionPool.routeDatabase.connected(result!!.route())

var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
// 检测一下,若多并发情况下同 address 下导致创建多个,则将当前这个释放掉
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
result!!.noNewExchanges = true
socket = result!!.socket()
result = transmitter.connection
} else {
// 将这个请求加入链接池
connectionPool.put(result!!)
transmitter.acquireConnectionNoEvents(result!!)
}
}
// 释放掉 socket
socket?.closeQuietly()

eventListener.connectionAcquired(call, result!!)
return result!!
}
}

RealConnectionPool#transmitterAcquirePooledConnection(address: Address, transmitter: Transmitter, routes: List?, requireMultiplexed: Boolean)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class RealConnectionPool(
/** The maximum number of idle connections for each address. */
private val maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
) {
/**
* Attempts to acquire a recycled connection to `address` for `transmitter`. Returns true if a
* connection was acquired.
*
* If `routes` is non-null these are the resolved routes (ie. IP addresses) for the connection.
* This is used to coalesce related domains to the same HTTP/2 connection, such as `square.com`
* and `square.ca`.
*/

fun transmitterAcquirePooledConnection(
address: Address,
transmitter: Transmitter,
routes: List<Route>?,
requireMultiplexed: Boolean

): Boolean {

assert(Thread.holdsLock(this))
for (connection in connections) {
if (requireMultiplexed && !connection.isMultiplexed) continue
if (!connection.isEligible(address, routes)) continue
transmitter.acquireConnectionNoEvents(connection)
return true
}
return false
}
}

遍历 pool 中的 connections(ArrayQueue),如果链接是可以复用的则将这个连接返回

RealConnection#isEligible(address: Address, routes: List?)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class RealConnection(
val connectionPool: RealConnectionPool,
private val route: Route
) : Http2Connection.Listener(), Connection {
/**
* Returns true if this connection can carry a stream allocation to `address`. If non-null
* `route` is the resolved route for a connection.
*/

internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
// 如果当前这次连接的最大并发数达到上限,返回 false
// If this connection is not accepting new exchanges, we're done.
if (transmitters.size >= allocationLimit || noNewExchanges) return false

// 如果两个 address 的其他参数不相同,返回 false
// If the non-host fields of the address don't overlap, we're done.
if (!this.route.address().equalsNonHost(address)) return false

// 如果两个 address 的 url 的 host 相同,返回 true,
// If the host exactly matches, we're done: this connection can carry the address.
if (address.url.host == this.route().address().url.host) {
return true // This connection is a perfect match.
}

// 如果上面的不符合,在下面的情况下可以合并链接
// At this point we don't have a hostname match. But we still be able to carry the request if
// our connection coalescing requirements are met. See also:
// https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
// https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/

// 首先这个链接需要时 HTTP/2
// 1. This connection must be HTTP/2.
if (http2Connection == null) return false

// 同一 IP
// 2. The routes must share an IP address.
if (routes == null || !routeMatchesAny(routes)) return false

// 这个连接的服务器证书必须覆盖新的主机
// 3. This connection's server certificate's must cover the new host.
if (address.hostnameVerifier !== OkHostnameVerifier) return false
if (!supportsUrl(address.url)) return false

// 证书将必须匹配主机
// 4. Certificate pinning must match the host.
try {
address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)
} catch (_: SSLPeerUnverifiedException) {
return false
}

return true // The caller's address can be carried by this connection.
}
}
  • 当前的链接的最大并发数不能达到上限,否则不能复用
  • 两个链接的 address 的参数不相同,不能复用
  • 两个链接的 url 的 host 相同则可以复用
  • 合并
    • 这个链接需要是 HTTP/2
    • IP 的 address 要相同
    • 这个链接的服务器证书必须覆盖新的主机
    • 证书将必须匹配主机

RealConnection#newCodec(client: OkHttpClient, chain: Interceptor.Chain)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class RealConnection(
val connectionPool: RealConnectionPool,
private val route: Route
) : Http2Connection.Listener(), Connection {
@Throws(SocketException::class)
internal fun newCodec(client: OkHttpClient, chain: Interceptor.Chain): ExchangeCodec {
val socket = this.socket!!
val source = this.source!!
val sink = this.sink!!
val http2Connection = this.http2Connection

return if (http2Connection != null) {
Http2ExchangeCodec(client, this, chain, http2Connection)
} else {
socket.soTimeout = chain.readTimeoutMillis()
source.timeout().timeout(chain.readTimeoutMillis().toLong(), MILLISECONDS)
sink.timeout().timeout(chain.writeTimeoutMillis().toLong(), MILLISECONDS)
Http1ExchangeCodec(client, this, source, sink)
}
}
}

判断是 Http 还是 Http2,然后根据策略模式返回