Github: okhttp 分析版本:930d4d0

This is the last interceptor in the chain. It makes a network call to the server

ConnectInterceptor 拦截器的功能就是负责与服务器建立 Socket 连接,并且创建了一个 Exchange 它包括通向服务器的输入流和输出流。而接下来的 CallServerInterceptor 拦截器的功能使用 Exchange 与服务器进行数据的读写操作的

intercept(chain: Interceptor.Chain)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange()
val request = realChain.request()
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()

// 写入请求头信息
exchange.writeRequestHeaders(request)

var responseHeadersStarted = false
var responseBuilder: Response.Builder? = null
// 写入请求体信息(有请求体的情况)
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseHeadersStarted = true
exchange.responseHeadersStart()
responseBuilder = exchange.readResponseHeaders(true)
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection()!!.isMultiplexed) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}

// 结束请求
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
if (!responseHeadersStarted) {
exchange.responseHeadersStart()
}
// 得到响应头
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(false)!!
}

// 构建初步响应
var response = responseBuilder
.request(request)
.handshake(exchange.connection()!!.handshake())
.sentRequestAtMillis(sentRequestMillis) // 发送请求的时间
.receivedResponseAtMillis(System.currentTimeMillis()) // 接收到响应的时间
.build()
var code = response.code()
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
response = exchange.readResponseHeaders(false)!!
.request(request)
.handshake(exchange.connection()!!.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code()
}

exchange.responseHeadersEnd(response)

// 构建响应体
response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request().header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body()?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body()?.contentLength()}")
}
return response
}
}
  • 写入请求 Header
  • 如果请求头的 Expect: 100-continue 时,只发送请求头
  • 根据返回的结果判断是否继续请求流程
  • 写入请求体,完成请求
  • 得到响应头,构建初步响应
  • 构建响应体,完成最终响应
  • 返回响应

Request Header

Exchange#writeRequestHeaders(request: Request)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Exchange(
internal val transmitter: Transmitter,
internal val call: Call,
internal val eventListener: EventListener,
private val finder: ExchangeFinder,
private val codec: ExchangeCodec
) {
@Throws(IOException::class)
fun writeRequestHeaders(request: Request) {
try {
eventListener.requestHeadersStart(call)
codec.writeRequestHeaders(request)
eventListener.requestHeadersEnd(call, request)
} catch (e: IOException) {
eventListener.requestFailed(call, e)
trackFailure(e)
throw e
}
}
}

这里调用了 ExchangeCodecwriteRequestHeaders() ,对应的使用策略模式分别根据是 Http 还是 Http/2 请求

Http1ExchangeCodec#writeRequestHeaders(request: Request)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Http1ExchangeCodec(
/** The client that configures this stream. May be null for HTTPS proxy tunnels. */
private val client: OkHttpClient?,
/** The connection that carries this stream. */
private val realConnection: RealConnection?,
private val source: BufferedSource,
private val sink: BufferedSink
) : ExchangeCodec {
/**
* Prepares the HTTP headers and sends them to the server.
*
* For streaming requests with a body, headers must be prepared **before** the output stream has
* been written to. Otherwise the body would need to be buffered!
*
* For non-streaming requests with a body, headers must be prepared **after** the output stream
* has been written to and closed. This ensures that the `Content-Length` header field receives
* the proper value.
*/

override fun writeRequestHeaders(request: Request) {
val requestLine = RequestLine.get(
request, realConnection!!.route().proxy().type())
writeRequest(request.headers, requestLine)
}

/** Returns bytes of a request header for sending on an HTTP transport. */
fun writeRequest(headers: Headers, requestLine: String) {
check(state == STATE_IDLE) { "state: $state" }
sink.writeUtf8(requestLine).writeUtf8("\r\n")
for (i in 0 until headers.size) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n")
}
sink.writeUtf8("\r\n")
state = STATE_OPEN_REQUEST_BODY
}
}

遍历 Header,然后通过前一个拦截器建立的链接得到的 Sink 来进行写操作,同时将 state 变量赋值为了STATE_OPEN_REQUEST_BODY,运用了状态模式的思想

Request Body

Exchange#createRequestBody(request: Request, duplex: Boolean)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Exchange(
internal val transmitter: Transmitter,
internal val call: Call,
internal val eventListener: EventListener,
private val finder: ExchangeFinder,
private val codec: ExchangeCodec
) {
@Throws(IOException::class)
fun createRequestBody(request: Request, duplex: Boolean): Sink {
this.isDuplex = duplex
val contentLength = request.body!!.contentLength()
eventListener.requestBodyStart(call)
val rawRequestBody = codec.createRequestBody(request, contentLength)
return RequestBodySink(rawRequestBody, contentLength)
}
}

通过 createRequestBody 创建一个 Sink 对象,本质还是使用在 ConnectIntercept 创建的 ExchangeCodec 内部封装 Sink 对象进行写操作的

Http1ExchangeCodec#createRequestBody(request: Request, contentLength: Long)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class Http1ExchangeCodec(
/** The client that configures this stream. May be null for HTTPS proxy tunnels. */
private val client: OkHttpClient?,
/** The connection that carries this stream. */
private val realConnection: RealConnection?,
private val source: BufferedSource,
private val sink: BufferedSink
) : ExchangeCodec {
override fun createRequestBody(request: Request, contentLength: Long): Sink {
return when {
request.body != null && request.body!!.isDuplex() -> throw ProtocolException(
"Duplex connections are not supported for HTTP/1")
request.isChunked() -> newChunkedSink() // Stream a request body of unknown length.
contentLength != -1L -> newKnownLengthSink() // Stream a request body of a known length.
else -> // Stream a request body of a known length.
throw IllegalStateException(
"Cannot stream a request body without chunked encoding or a known content length!")
}
}

private fun newChunkedSink(): Sink {
check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" }
state = STATE_WRITING_REQUEST_BODY
return ChunkedSink()
}

private fun newKnownLengthSink(): Sink {
check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" }
state = STATE_WRITING_REQUEST_BODY
return KnownLengthSink()
}

/** An HTTP request body. */
private inner class KnownLengthSink : Sink {
private val timeout = ForwardingTimeout(sink.timeout())
private var closed: Boolean = false

override fun timeout(): Timeout = timeout

override fun write(source: Buffer, byteCount: Long) {
check(!closed) { "closed" }
checkOffsetAndCount(source.size, 0, byteCount)
sink.write(source, byteCount)
}

override fun flush() {
if (closed) return // Don't throw; this stream might have been closed on the caller's behalf.
sink.flush()
}

override fun close() {
if (closed) return
closed = true
detachTimeout(timeout)
state = STATE_READ_RESPONSE_HEADERS
}
}

/**
* An HTTP body with alternating chunk sizes and chunk bodies. It is the caller's responsibility
* to buffer chunks; typically by using a buffered sink with this sink.
*/

private inner class ChunkedSink : Sink {
private val timeout = ForwardingTimeout(sink.timeout())
private var closed: Boolean = false

override fun timeout(): Timeout = timeout

override fun write(source: Buffer, byteCount: Long) {
check(!closed) { "closed" }
if (byteCount == 0L) return

sink.writeHexadecimalUnsignedLong(byteCount)
sink.writeUtf8("\r\n")
sink.write(source, byteCount)
sink.writeUtf8("\r\n")
}

@Synchronized
override fun flush() {
if (closed) return // Don't throw; this stream might have been closed on the caller's behalf.
sink.flush()
}

@Synchronized
override fun close() {
if (closed) return
closed = true
sink.writeUtf8("0\r\n\r\n")
detachTimeout(timeout)
state = STATE_READ_RESPONSE_HEADERS
}
}
}

根据请求头 Transfer-Encoding 是否为 chunked 的方式,来创建不同 Sink 实现类,如果是 chunked 方式那么就创建 ChunkedSink;如果不是 chunked 就表示内容的大小是固定的,那么就根据 content-length 创建指定大小的 KnownLengthSink,然后又在外边包装了一层 RequestBodySink

Exchange.RequestBodySink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class Exchange(
internal val transmitter: Transmitter,
internal val call: Call,
internal val eventListener: EventListener,
private val finder: ExchangeFinder,
private val codec: ExchangeCodec
) {
/** A request body that fires events when it completes. */
private inner class RequestBodySink internal constructor(
delegate: Sink,
/** The exact number of bytes to be written, or -1L if that is unknown. */
private val contentLength: Long
) : ForwardingSink(delegate) {
private var completed = false
private var bytesReceived = 0L
private var closed = false

@Throws(IOException::class)
override fun write(source: Buffer, byteCount: Long) {
check(!closed) { "closed" }
if (contentLength != -1L && bytesReceived + byteCount > contentLength) {
throw ProtocolException(
"expected $contentLength bytes but received ${bytesReceived + byteCount}")
}
try {
super.write(source, byteCount)
this.bytesReceived += byteCount
} catch (e: IOException) {
throw complete(e)
}
}

@Throws(IOException::class)
override fun flush() {
try {
super.flush()
} catch (e: IOException) {
throw complete(e)
}
}

@Throws(IOException::class)
override fun close() {
if (closed) return
closed = true
if (contentLength != -1L && bytesReceived != contentLength) {
throw ProtocolException("unexpected end of stream")
}
try {
super.close()
complete(null)
} catch (e: IOException) {
throw complete(e)
}
}

private fun <E : IOException?> complete(e: E): E {
if (completed) return e
completed = true
return bodyComplete(bytesReceived, false, true, e)
}
}
}

最终调用的 write(source: Buffer, byteCount: Long) 是调用的传入的 sink 的 write(source: Buffer, byteCount: Long)

FormBody#writeTo(sink: BufferedSink)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class FormBody internal constructor(
encodedNames: List<String>,
encodedValues: List<String>
) : RequestBody() {
@Throws(IOException::class)
override fun writeTo(sink: BufferedSink) {
writeOrCountBytes(sink, false)
}

/**
* Either writes this request to `sink` or measures its content length. We have one method
* do double-duty to make sure the counting and content are consistent, particularly when it comes
* to awkward operations like measuring the encoded length of header strings, or the
* length-in-digits of an encoded integer.
*/

private fun writeOrCountBytes(sink: BufferedSink?, countBytes: Boolean): Long {
var byteCount = 0L
val buffer: Buffer = if (countBytes) Buffer() else sink!!.buffer

for (i in 0 until encodedNames.size) {
if (i > 0) buffer.writeByte('&'.toInt())
buffer.writeUtf8(encodedNames[i])
buffer.writeByte('='.toInt())
buffer.writeUtf8(encodedValues[i])
}

if (countBytes) {
byteCount = buffer.size
buffer.clear()
}

return byteCount
}
}

具体是如何写入数据的,要根据传入到 Request 中的 RequestBody 的实现类来定,如果是表单类型的则是有 FormBody 负责具体的写操作,如果是文件类型的则是由 MutilPartBody 负责具体的写操作

Request finish

Exchange#finishRequest()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Exchange(
internal val transmitter: Transmitter,
internal val call: Call,
internal val eventListener: EventListener,
private val finder: ExchangeFinder,
private val codec: ExchangeCodec
) {
@Throws(IOException::class)
fun finishRequest() {
try {
codec.finishRequest()
} catch (e: IOException) {
eventListener.requestFailed(call, e)
trackFailure(e)
throw e
}
}
}

Http1ExchangeCodec#finishRequest()

1
2
3
4
5
6
7
8
9
10
11
12
class Http1ExchangeCodec(
/** The client that configures this stream. May be null for HTTPS proxy tunnels. */
private val client: OkHttpClient?,
/** The connection that carries this stream. */
private val realConnection: RealConnection?,
private val source: BufferedSource,
private val sink: BufferedSink
) : ExchangeCodec {
override fun finishRequest() {
sink.flush()
}
}

sink.flush() 将缓存区中数据写入到底层的 sink 中,其实就是写入到 server 中去了,相当于一个刷新缓冲区的功能

Response Header

Exchange#readResponseHeaders(expectContinue: Boolean)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Exchange(
internal val transmitter: Transmitter,
internal val call: Call,
internal val eventListener: EventListener,
private val finder: ExchangeFinder,
private val codec: ExchangeCodec
) {
@Throws(IOException::class)
fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
try {
val result = codec.readResponseHeaders(expectContinue)
result?.initExchange(this)
return result
} catch (e: IOException) {
eventListener.responseFailed(call, e)
trackFailure(e)
throw e
}
}
}

Http1ExchangeCodec#readResponseHeaders(expectContinue: Boolean)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class Http1ExchangeCodec(
/** The client that configures this stream. May be null for HTTPS proxy tunnels. */
private val client: OkHttpClient?,
/** The connection that carries this stream. */
private val realConnection: RealConnection?,
private val source: BufferedSource,
private val sink: BufferedSink
) : ExchangeCodec {
override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
check(state == STATE_OPEN_REQUEST_BODY || state == STATE_READ_RESPONSE_HEADERS) {
"state: $state"
}

try {
val statusLine = StatusLine.parse(readHeaderLine())

val responseBuilder = Response.Builder()
.protocol(statusLine.protocol) // 协议,也就是 http 的版本例如 http1/2 /spdy
.code(statusLine.code) // 响应码
.message(statusLine.message) // 响应消息
.headers(readHeaders()) // 响应头

return when {
expectContinue && statusLine.code == HTTP_CONTINUE -> {
null
}
statusLine.code == HTTP_CONTINUE -> {
state = STATE_READ_RESPONSE_HEADERS
responseBuilder
}
else -> {
state = STATE_OPEN_RESPONSE_BODY
responseBuilder
}
}
} catch (e: EOFException) {
// Provide more context if the server ends the stream before sending a response.
val address = realConnection?.route()?.address()?.url?.redact() ?: "unknown"
throw IOException("unexpected end of stream on $address", e)
}
}
}

当客户端将请求数据发送给服务端之后,服务端做了处理之后会将结果返回给客户端,这是客户端需要根据这些返回的数据构造出一个 Response 对象出来然后返回给调用者

Response Body

Exchange#openResponseBody(response: Response)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class Exchange(
internal val transmitter: Transmitter,
internal val call: Call,
internal val eventListener: EventListener,
private val finder: ExchangeFinder,
private val codec: ExchangeCodec
) {
@Throws(IOException::class)
fun openResponseBody(response: Response): ResponseBody {
try {
eventListener.responseBodyStart(call)
val contentType = response.header("Content-Type")
val contentLength = codec.reportedContentLength(response)
val rawSource = codec.openResponseBodySource(response)
val source = ResponseBodySource(rawSource, contentLength)
return RealResponseBody(contentType, contentLength, source.buffer())
} catch (e: IOException) {
eventListener.responseFailed(call, e)
trackFailure(e)
throw e
}
}

/** A response body that fires events when it completes. */
internal inner class ResponseBodySource(
delegate: Source,
private val contentLength: Long
) : ForwardingSource(delegate) {
private var bytesReceived = 0L
private var completed = false
private var closed = false

init {
if (contentLength == 0L) {
complete(null)
}
}

@Throws(IOException::class)
override fun read(sink: Buffer, byteCount: Long): Long {
check(!closed) { "closed" }
try {
val read = delegate.read(sink, byteCount)
if (read == -1L) {
complete(null)
return -1L
}

val newBytesReceived = bytesReceived + read
if (contentLength != -1L && newBytesReceived > contentLength) {
throw ProtocolException("expected $contentLength bytes but received $newBytesReceived")
}

bytesReceived = newBytesReceived
if (newBytesReceived == contentLength) {
complete(null)
}

return read
} catch (e: IOException) {
throw complete(e)
}
}

@Throws(IOException::class)
override fun close() {
if (closed) return
closed = true
try {
super.close()
complete(null)
} catch (e: IOException) {
throw complete(e)
}
}

fun <E : IOException?> complete(e: E): E {
if (completed) return e
completed = true
return bodyComplete(bytesReceived, true, false, e)
}
}
}

返回一个 ResponseBody 对象,该对象封装了连接服务端的输入流对 Source 对象

Http1ExchangeCodec#openResponseBodySource(response: Response)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class Http1ExchangeCodec(
/** The client that configures this stream. May be null for HTTPS proxy tunnels. */
private val client: OkHttpClient?,
/** The connection that carries this stream. */
private val realConnection: RealConnection?,
private val source: BufferedSource,
private val sink: BufferedSink
) : ExchangeCodec {
override fun openResponseBodySource(response: Response): Source {
return when {
!response.promisesBody() -> newFixedLengthSource(0)
response.isChunked() -> newChunkedSource(response.request().url)
else -> {
val contentLength = response.headersContentLength()
if (contentLength != -1L) {
newFixedLengthSource(contentLength)
} else {
newUnknownLengthSource()
}
}
}
}

private fun newFixedLengthSource(length: Long): Source {
check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
state = STATE_READING_RESPONSE_BODY
return FixedLengthSource(length)
}

private fun newChunkedSource(url: HttpUrl): Source {
check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
state = STATE_READING_RESPONSE_BODY
return ChunkedSource(url)
}

private fun newUnknownLengthSource(): Source {
check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
state = STATE_READING_RESPONSE_BODY
realConnection!!.noNewExchanges()
return UnknownLengthSource()
}

/** An HTTP body with a fixed length specified in advance. */
private inner class FixedLengthSource internal constructor(private var bytesRemaining: Long) :
AbstractSource() {

init {
if (bytesRemaining == 0L) {
responseBodyComplete()
}
}

override fun read(sink: Buffer, byteCount: Long): Long {
require(byteCount >= 0L) { "byteCount < 0: $byteCount" }
check(!closed) { "closed" }
if (bytesRemaining == 0L) return -1

val read = super.read(sink, minOf(bytesRemaining, byteCount))
if (read == -1L) {
realConnection!!.noNewExchanges() // The server didn't supply the promised content length.
val e = ProtocolException("unexpected end of stream")
responseBodyComplete()
throw e
}

bytesRemaining -= read
if (bytesRemaining == 0L) {
responseBodyComplete()
}
return read
}

override fun close() {
if (closed) return

if (bytesRemaining != 0L &&
!discard(ExchangeCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
realConnection!!.noNewExchanges() // Unread bytes remain on the stream.
responseBodyComplete()
}

closed = true
}
}

/** An HTTP body with alternating chunk sizes and chunk bodies. */
private inner class ChunkedSource internal constructor(private val url: HttpUrl) :
AbstractSource() {
private var bytesRemainingInChunk = NO_CHUNK_YET
private var hasMoreChunks = true

override fun read(sink: Buffer, byteCount: Long): Long {
require(byteCount >= 0L) { "byteCount < 0: $byteCount" }
check(!closed) { "closed" }
if (!hasMoreChunks) return -1

if (bytesRemainingInChunk == 0L || bytesRemainingInChunk == NO_CHUNK_YET) {
readChunkSize()
if (!hasMoreChunks) return -1
}

val read = super.read(sink, minOf(byteCount, bytesRemainingInChunk))
if (read == -1L) {
realConnection!!.noNewExchanges() // The server didn't supply the promised chunk length.
val e = ProtocolException("unexpected end of stream")
responseBodyComplete()
throw e
}
bytesRemainingInChunk -= read
return read
}

private fun readChunkSize() {
// Read the suffix of the previous chunk.
if (bytesRemainingInChunk != NO_CHUNK_YET) {
source.readUtf8LineStrict()
}
try {
bytesRemainingInChunk = source.readHexadecimalUnsignedLong()
val extensions = source.readUtf8LineStrict().trim()
if (bytesRemainingInChunk < 0L || extensions.isNotEmpty() && !extensions.startsWith(";")) {
throw ProtocolException("expected chunk size and optional extensions" +
" but was \"$bytesRemainingInChunk$extensions\"")
}
} catch (e: NumberFormatException) {
throw ProtocolException(e.message)
}

if (bytesRemainingInChunk == 0L) {
hasMoreChunks = false
trailers = readHeaders()
client!!.cookieJar().receiveHeaders(url, trailers!!)
responseBodyComplete()
}
}

override fun close() {
if (closed) return
if (hasMoreChunks &&
!discard(ExchangeCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
realConnection!!.noNewExchanges() // Unread bytes remain on the stream.
responseBodyComplete()
}
closed = true
}
}

/** An HTTP message body terminated by the end of the underlying stream. */
private inner class UnknownLengthSource : AbstractSource() {
private var inputExhausted: Boolean = false

override fun read(sink: Buffer, byteCount: Long): Long {
require(byteCount >= 0L) { "byteCount < 0: $byteCount" }

check(!closed) { "closed" }
if (inputExhausted) return -1

val read = super.read(sink, byteCount)
if (read == -1L) {
inputExhausted = true
responseBodyComplete()
return -1
}
return read
}

override fun close() {
if (closed) return
if (!inputExhausted) {
responseBodyComplete()
}
closed = true
}
}
}

根据响应的不同请求创建不同的 Source 对象