CaravanHttpCallbackExecutor.java

/* Copyright (c) pro!vision GmbH. All rights reserved. */
package io.wcm.caravan.io.http.impl;

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.schedulers.Schedulers;

/**
 * A thread pool executor that is used to do perform all callbacks to the subscribers of Observables returned by
 * {@link CaravanHttpClientImpl#execute(io.wcm.caravan.io.http.request.CaravanHttpRequest)}. This is desired to avoid
 * the threads that actually execute the HTTP request are being blocked by client code. This thread pool has a core size
 * of just four threads, but if all of these threads are used by slow callback code, additional threads are
 * automatically spawned on demand.
 */
public class CaravanHttpCallbackExecutor extends ThreadPoolExecutor {

  private static final Logger LOG = LoggerFactory.getLogger(CaravanHttpCallbackExecutor.class);

  static final String THREAD_GROUP_NAME = "Caravan-Http-Callbacks";

  private static final AtomicInteger THREAD_INDEX_COUNTER = new AtomicInteger();

  private static final int THREAD_POOL_CORE_SIZE = 4;
  private static final int THREAD_POOL_MAX_SIZE = 1000;
  private static final int IDLE_THREAD_KEEP_ALIVE_SECONDS = 10;

  private static final int WATCHDOG_INTERVAL_SECONDS = 1;

  private final rx.Scheduler.Worker watchdogWorker;

  CaravanHttpCallbackExecutor() {
    // by using a LinkedBlockingQueue, the ThreadPoolExecutor will not automatically spawn any new threads when all core threads are in use,
    // instead, additional tasks will be queued up. If this happens, the periodically called ::increaseCoreSizeIfJobsAreQeueued method will adjust the core size
    super(THREAD_POOL_CORE_SIZE, THREAD_POOL_MAX_SIZE, IDLE_THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
        new RejectionHandler());


    watchdogWorker = Schedulers.computation().createWorker();
    watchdogWorker.schedulePeriodically(this::increaseCoreSizeIfJobsAreQeueued, 5, WATCHDOG_INTERVAL_SECONDS, TimeUnit.SECONDS);

    setThreadFactory(runnable -> new Thread(runnable, THREAD_GROUP_NAME + "-" + THREAD_INDEX_COUNTER.getAndIncrement()));
  }

  @Override
  public void shutdown() {

    watchdogWorker.unsubscribe();

    super.shutdown();
  }

  @Override
  public List<Runnable> shutdownNow() {

    watchdogWorker.unsubscribe();

    return super.shutdownNow();
  }

  private void increaseCoreSizeIfJobsAreQeueued() {

    // This is based on "Idea #2" described in the following comment
    // http://stackoverflow.com/questions/9622599/java-threadpoolexecutor-strategy-direct-handoff-with-queue#answer-9623100

    // the idea is that we don't allow ThreadPoolExecutor to decide when to spawn new thread on its own, as this could lead to a huge threadpool if many
    // http requests are initiated at once. As long as the callbacks are executed fast, the system should work well by just using the core threadpool

    int queueSize = getQueue().size();
    int currentPoolSize = getPoolSize();
    int corePoolSize = getCorePoolSize();
    int activeCount = getActiveCount();
    try {

      if (LOG.isTraceEnabled()) {
        LOG.trace(
            "Checking if threadool core size should be adjusted: queueSize=" + queueSize + ". poolSize=" + currentPoolSize + ", activeCount=" + activeCount);
      }

      if (queueSize > 0 && activeCount == currentPoolSize && currentPoolSize < getMaximumPoolSize()) {
        int newCoreSize = Math.min(currentPoolSize + queueSize, getMaximumPoolSize());
        LOG.warn(
            "Increasing thread pool core size to " + newCoreSize + ", because there are " + queueSize + " callbacks waiting in the queue, and all of "
                + currentPoolSize + " threads are blocked.");

        setCorePoolSize(newCoreSize);
      }
      else if (queueSize == 0 && activeCount < currentPoolSize && corePoolSize > THREAD_POOL_CORE_SIZE) {
        int newCoreSize = corePoolSize - 1;

        LOG.info(
            "Decreasing thread pool core size to " + newCoreSize + ", because there are no callbacks waiting in the queue, and only " + activeCount + " of "
                + currentPoolSize + " threads are blocked.");

        setCorePoolSize(newCoreSize);
      }
    }
    catch (IllegalArgumentException ex) {
      LOG.error("Failed to adjust core size queueSize=" + queueSize + ". poolSize=" + currentPoolSize + ", activeCount=" + activeCount, ex);
    }

  }

  @Override
  protected void beforeExecute(Thread t, Runnable r) {

    if (LOG.isTraceEnabled()) {
      LOG.trace("Executing HTTP callback on thread " + t + ", current pool size is " + getPoolSize());
    }

    super.beforeExecute(t, r);
  }


  private static class RejectionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      LOG.error("Rejected to execute async http callback " + r.toString() + " because the maximum number of threads (" + executor.getMaximumPoolSize()
      + ") has been reached");
    }

  }

}