@Throws(IOException::class) overridefunintercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange() val request = realChain.request() val requestBody = request.body val sentRequestMillis = System.currentTimeMillis()
// 写入请求头信息 exchange.writeRequestHeaders(request)
var responseHeadersStarted = false var responseBuilder: Response.Builder? = null // 写入请求体信息(有请求体的情况) if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { exchange.flushRequest() responseHeadersStarted = true exchange.responseHeadersStart() responseBuilder = exchange.readResponseHeaders(true) } if (responseBuilder == null) { if (requestBody.isDuplex()) { // Prepare a duplex body so that the application can send a request body later. exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() requestBody.writeTo(bufferedRequestBody) } else { // Write the request body if the "Expect: 100-continue" expectation was met. val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } } else { exchange.noRequestBody() if (!exchange.connection()!!.isMultiplexed) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. exchange.noNewExchangesOnConnection() } } } else { exchange.noRequestBody() }
// 结束请求 if (requestBody == null || !requestBody.isDuplex()) { exchange.finishRequest() } if (!responseHeadersStarted) { exchange.responseHeadersStart() } // 得到响应头 if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(false)!! }
// 构建初步响应 var response = responseBuilder .request(request) .handshake(exchange.connection()!!.handshake()) .sentRequestAtMillis(sentRequestMillis) // 发送请求的时间 .receivedResponseAtMillis(System.currentTimeMillis()) // 接收到响应的时间 .build() var code = response.code() if (code == 100) { // server sent a 100-continue even though we did not request one. // try again to read the actual response response = exchange.readResponseHeaders(false)!! .request(request) .handshake(exchange.connection()!!.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() code = response.code() }
exchange.responseHeadersEnd(response)
// 构建响应体 response = if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response.newBuilder() .body(EMPTY_RESPONSE) .build() } else { response.newBuilder() .body(exchange.openResponseBody(response)) .build() } if ("close".equals(response.request().header("Connection"), ignoreCase = true) || "close".equals(response.header("Connection"), ignoreCase = true)) { exchange.noNewExchangesOnConnection() } if ((code == 204 || code == 205) && response.body()?.contentLength() ?: -1L > 0L) { throw ProtocolException( "HTTP $code had non-zero Content-Length: ${response.body()?.contentLength()}") } return response } }
classHttp1ExchangeCodec( /** The client that configures this stream. May be null for HTTPS proxy tunnels. */ privateval client: OkHttpClient?, /** The connection that carries this stream. */ privateval realConnection: RealConnection?, privateval source: BufferedSource, privateval sink: BufferedSink ) : ExchangeCodec { /** * Prepares the HTTP headers and sends them to the server. * * For streaming requests with a body, headers must be prepared **before** the output stream has * been written to. Otherwise the body would need to be buffered! * * For non-streaming requests with a body, headers must be prepared **after** the output stream * has been written to and closed. This ensures that the `Content-Length` header field receives * the proper value. */ overridefunwriteRequestHeaders(request: Request) { val requestLine = RequestLine.get( request, realConnection!!.route().proxy().type()) writeRequest(request.headers, requestLine) } /** Returns bytes of a request header for sending on an HTTP transport. */ funwriteRequest(headers: Headers, requestLine: String) { check(state == STATE_IDLE) { "state: $state" } sink.writeUtf8(requestLine).writeUtf8("\r\n") for (i in0 until headers.size) { sink.writeUtf8(headers.name(i)) .writeUtf8(": ") .writeUtf8(headers.value(i)) .writeUtf8("\r\n") } sink.writeUtf8("\r\n") state = STATE_OPEN_REQUEST_BODY } }
遍历 Header,然后通过前一个拦截器建立的链接得到的 Sink 来进行写操作,同时将 state 变量赋值为了STATE_OPEN_REQUEST_BODY,运用了状态模式的思想
classHttp1ExchangeCodec( /** The client that configures this stream. May be null for HTTPS proxy tunnels. */ privateval client: OkHttpClient?, /** The connection that carries this stream. */ privateval realConnection: RealConnection?, privateval source: BufferedSource, privateval sink: BufferedSink ) : ExchangeCodec { overridefuncreateRequestBody(request: Request, contentLength: Long): Sink { returnwhen { request.body != null && request.body!!.isDuplex() -> throw ProtocolException( "Duplex connections are not supported for HTTP/1") request.isChunked() -> newChunkedSink() // Stream a request body of unknown length. contentLength != -1L -> newKnownLengthSink() // Stream a request body of a known length. else -> // Stream a request body of a known length. throw IllegalStateException( "Cannot stream a request body without chunked encoding or a known content length!") } } privatefunnewChunkedSink(): Sink { check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" } state = STATE_WRITING_REQUEST_BODY return ChunkedSink() } privatefunnewKnownLengthSink(): Sink { check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" } state = STATE_WRITING_REQUEST_BODY return KnownLengthSink() } /** An HTTP request body. */ private inner classKnownLengthSink : Sink { privateval timeout = ForwardingTimeout(sink.timeout()) privatevar closed: Boolean = false
overridefunflush() { if (closed) return// Don't throw; this stream might have been closed on the caller's behalf. sink.flush() }
overridefunclose() { if (closed) return closed = true detachTimeout(timeout) state = STATE_READ_RESPONSE_HEADERS } } /** * An HTTP body with alternating chunk sizes and chunk bodies. It is the caller's responsibility * to buffer chunks; typically by using a buffered sink with this sink. */ private inner classChunkedSink : Sink { privateval timeout = ForwardingTimeout(sink.timeout()) privatevar closed: Boolean = false
classExchange( internal val transmitter: Transmitter, internal val call: Call, internal val eventListener: EventListener, privateval finder: ExchangeFinder, privateval codec: ExchangeCodec ) { /** A request body that fires events when it completes. */ private inner classRequestBodySinkinternalconstructor( delegate: Sink, /** The exact number of bytes to be written, or -1L if that is unknown. */ privateval contentLength: Long ) : ForwardingSink(delegate) { privatevar completed = false privatevar bytesReceived = 0L privatevar closed = false
classFormBodyinternalconstructor( encodedNames: List<String>, encodedValues: List<String> ) : RequestBody() { @Throws(IOException::class) overridefunwriteTo(sink: BufferedSink) { writeOrCountBytes(sink, false) } /** * Either writes this request to `sink` or measures its content length. We have one method * do double-duty to make sure the counting and content are consistent, particularly when it comes * to awkward operations like measuring the encoded length of header strings, or the * length-in-digits of an encoded integer. */ privatefunwriteOrCountBytes(sink: BufferedSink?, countBytes: Boolean): Long { var byteCount = 0L val buffer: Buffer = if (countBytes) Buffer() else sink!!.buffer
for (i in0 until encodedNames.size) { if (i > 0) buffer.writeByte('&'.toInt()) buffer.writeUtf8(encodedNames[i]) buffer.writeByte('='.toInt()) buffer.writeUtf8(encodedValues[i]) }
if (countBytes) { byteCount = buffer.size buffer.clear() }
classExchange( internal val transmitter: Transmitter, internal val call: Call, internal val eventListener: EventListener, privateval finder: ExchangeFinder, privateval codec: ExchangeCodec ) { @Throws(IOException::class) funfinishRequest() { try { codec.finishRequest() } catch (e: IOException) { eventListener.requestFailed(call, e) trackFailure(e) throw e } } }
Http1ExchangeCodec#finishRequest()
1 2 3 4 5 6 7 8 9 10 11 12
classHttp1ExchangeCodec( /** The client that configures this stream. May be null for HTTPS proxy tunnels. */ privateval client: OkHttpClient?, /** The connection that carries this stream. */ privateval realConnection: RealConnection?, privateval source: BufferedSource, privateval sink: BufferedSink ) : ExchangeCodec { overridefunfinishRequest() { sink.flush() } }
sink.flush() 将缓存区中数据写入到底层的 sink 中,其实就是写入到 server 中去了,相当于一个刷新缓冲区的功能
returnwhen { expectContinue && statusLine.code == HTTP_CONTINUE -> { null } statusLine.code == HTTP_CONTINUE -> { state = STATE_READ_RESPONSE_HEADERS responseBuilder } else -> { state = STATE_OPEN_RESPONSE_BODY responseBuilder } } } catch (e: EOFException) { // Provide more context if the server ends the stream before sending a response. val address = realConnection?.route()?.address()?.url?.redact() ?: "unknown" throw IOException("unexpected end of stream on $address", e) } } }
val read = super.read(sink, minOf(bytesRemaining, byteCount)) if (read == -1L) { realConnection!!.noNewExchanges() // The server didn't supply the promised content length. val e = ProtocolException("unexpected end of stream") responseBodyComplete() throw e }
if (bytesRemainingInChunk == 0L || bytesRemainingInChunk == NO_CHUNK_YET) { readChunkSize() if (!hasMoreChunks) return -1 }
val read = super.read(sink, minOf(byteCount, bytesRemainingInChunk)) if (read == -1L) { realConnection!!.noNewExchanges() // The server didn't supply the promised chunk length. val e = ProtocolException("unexpected end of stream") responseBodyComplete() throw e } bytesRemainingInChunk -= read return read }
privatefunreadChunkSize() { // Read the suffix of the previous chunk. if (bytesRemainingInChunk != NO_CHUNK_YET) { source.readUtf8LineStrict() } try { bytesRemainingInChunk = source.readHexadecimalUnsignedLong() val extensions = source.readUtf8LineStrict().trim() if (bytesRemainingInChunk < 0L || extensions.isNotEmpty() && !extensions.startsWith(";")) { throw ProtocolException("expected chunk size and optional extensions" + " but was \"$bytesRemainingInChunk$extensions\"") } } catch (e: NumberFormatException) { throw ProtocolException(e.message) }
override fun close() { if (closed) return if (hasMoreChunks && !discard(ExchangeCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) { realConnection!!.noNewExchanges() // Unread bytes remain on the stream. responseBodyComplete() } closed = true } }
/** An HTTP message body terminated by the end of the underlying stream. */ private inner class UnknownLengthSource : AbstractSource() { private var inputExhausted: Boolean = false
override fun read(sink: Buffer, byteCount: Long): Long { require(byteCount >= 0L) { "byteCount < 0: $byteCount" } check(!closed) { "closed" } if (inputExhausted) return -1
val read = super.read(sink, byteCount) if (read == -1L) { inputExhausted = true responseBodyComplete() return -1 } return read }
overridefunclose() { if (closed) return if (!inputExhausted) { responseBodyComplete() } closed = true } } }