Skip to content

okhttp async call

时间有限,先说结论。在服务器端,不适合使用okhttp的异步请求,因为okhttp的异步请求会占用很多线程,导致服务器性能下降。

Show Case

java
    OkHttpClient client = new OkHttpClient.Builder().build();
    Request.Builder requestBuilder = new Request.Builder().url("https://www.baidu.com");
    // enqueue是这次需要关注的部分
    client.newCall(requestBuilder.build()).enqueue(new Callback() {

      @Override
      public void onResponse(Call arg0, Response arg1) throws IOException {

      }

      @Override
      public void onFailure(Call arg0, IOException arg1) {

      }
    });

enqueue部分源码

java
  override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

再看client.dispatcher.enqueue

java
   internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()// 这里
  }

继续看promoteAndExecute

kotlin
private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

重点来了:asyncCall.executeOn(executorService)

kotlin
    fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }

重点看executorService的定义

kotlin
  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

线程池的初始化,默认的线程池大小为0,最大线程池大小为Int.MAX_VALUE,线程存活时间为60秒,队列为SynchronousQueue,线程工厂为DispatcherThreadFactory。

默认情况下,如果并发请求量大的话,那么线程池中的线程就会不断创建,最终导致OOM。

虽然可以通过控制线程池的参数,来控制最大线程数,但是,又会有别的问题:例如导致请求堆积在任务队列中,从而导致服务无响应。

基于以上原因,不建议在并发量高的场景下使用okhttp的异步功能。

性能优化

如果实在避免不了使用okhttp的异步功能,那么可以通过以下方式来优化性能:

  1. 线程池参数优化:限定线程池的线程数,避免线程数过多导致OOM。
  2. 根据业务特点自定义请求并发数
java
    ThreadPoolExecutor executorService =
        new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    Dispatcher dispatcher = new Dispatcher(executorService);
    dispatcher.setMaxRequests(4096);
    dispatcher.setMaxRequestsPerHost(4096);
    OkHttpClient client = new OkHttpClient.Builder().dispatcher(dispatcher).build();
    Request.Builder requestBuilder = new Request.Builder().url("https://www.baidu.com");

    client.newCall(requestBuilder.build()).enqueue(new Callback() {

      @Override
      public void onResponse(Call arg0, Response arg1) throws IOException {

      }

      @Override
      public void onFailure(Call arg0, IOException arg1) {

      }
    });

上次更新于: