openclassOkHttpClientinternalconstructor( builder: Builder ) : Cloneable, Call.Factory, WebSocket.Factory { // ... /** Prepares the [request] to be executed at some point in the future. */ overridefunnewCall(request: Request): Call { return RealCall.newRealCall(this, request, false/* for web socket */) } // ... }
internal classRealCallprivateconstructor( val client: OkHttpClient, /** The application's original request unadulterated by redirects or auth headers. */ val originalRequest: Request, val forWebSocket: Boolean ) : Call { /** * There is a cycle between the [Call] and [Transmitter] that makes this awkward. * This is set after immediately after creating the call instance. */ private lateinit var transmitter: Transmitter // ... companionobject { funnewRealCall( client: OkHttpClient, originalRequest: Request, forWebSocket: Boolean ): RealCall { // Safely publish the Call instance to the EventListener. return RealCall(client, originalRequest, forWebSocket).apply { transmitter = Transmitter(client, this) } } } }
classDispatcherconstructor() { // ... /** Ready async calls in the order they'll be run. */ privateval readyAsyncCalls = ArrayDeque<AsyncCall>() // ... internal funenqueue(call: AsyncCall) { synchronized(this) { readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.get().forWebSocket) { val existingCall = findExistingCallWithHost(call.host()) if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) } } promoteAndExecute() } // ... }
classDispatcherconstructor() { // ... /** * Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the * executor service. Must not be called with synchronization because executing calls can call * into user code. * * @return true if the dispatcher is currently running calls. */ privatefunpromoteAndExecute(): Boolean { assert(!Thread.holdsLock(this))
val executableCalls = ArrayList<AsyncCall>() val isRunning: Boolean synchronized(this) { val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break// Max capacity. if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue// Host max capacity.
internal classRealCallprivateconstructor( val client: OkHttpClient, /** The application's original request unadulterated by redirects or auth headers. */ val originalRequest: Request, val forWebSocket: Boolean ) : Call { internal inner classAsyncCall( privateval responseCallback: Callback ) : Runnable { /** * Attempt to enqueue this async call on [executorService]. This will attempt to clean up * if the executor has been shut down by reporting the call as failed. */ funexecuteOn(executorService: ExecutorService) { assert(!Thread.holdsLock(client.dispatcher())) var success = false try { executorService.execute(this) success = true } catch (e: RejectedExecutionException) { val ioException = InterruptedIOException("executor rejected") ioException.initCause(e) transmitter.noMoreExchanges(ioException) responseCallback.onFailure(this@RealCall, ioException) } finally { if (!success) { client.dispatcher().finished(this) // This call is no longer running! } } } overridefunrun() { threadName("OkHttp ${redactedUrl()}") { var signalledCallback = false transmitter.timeoutEnter() try { val response = getResponseWithInterceptorChain() signalledCallback = true responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e) } else { responseCallback.onFailure(this@RealCall, e) } } finally { client.dispatcher().finished(this) } } } } }
// If we already have a stream, confirm that the incoming request will use it. check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) { "network interceptor ${interceptors[index - 1]} must retain the same host and port" }
// If we already have a stream, confirm that this is the only call to chain.proceed(). check(this.exchange == null || calls <= 1) { "network interceptor ${interceptors[index - 1]} must call proceed() exactly once" }
// Call the next interceptor in the chain. val next = RealInterceptorChain(interceptors, transmitter, exchange, index + 1, request, call, connectTimeout, readTimeout, writeTimeout) val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS") val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null")
// Confirm that the next interceptor made its required call to chain.proceed(). check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) { "network interceptor $interceptor must call proceed() exactly once" }
check(response.body() != null) { "interceptor $interceptor returned a response with no body" }