View Javadoc
1   /* Copyright (c) pro!vision GmbH. All rights reserved. */
2   package io.wcm.caravan.io.http.impl;
3   
4   import java.util.List;
5   import java.util.concurrent.LinkedBlockingQueue;
6   import java.util.concurrent.RejectedExecutionHandler;
7   import java.util.concurrent.ThreadPoolExecutor;
8   import java.util.concurrent.TimeUnit;
9   import java.util.concurrent.atomic.AtomicInteger;
10  
11  import org.slf4j.Logger;
12  import org.slf4j.LoggerFactory;
13  
14  import rx.schedulers.Schedulers;
15  
16  /**
17   * A thread pool executor that is used to do perform all callbacks to the subscribers of Observables returned by
18   * {@link CaravanHttpClientImpl#execute(io.wcm.caravan.io.http.request.CaravanHttpRequest)}. This is desired to avoid
19   * the threads that actually execute the HTTP request are being blocked by client code. This thread pool has a core size
20   * of just four threads, but if all of these threads are used by slow callback code, additional threads are
21   * automatically spawned on demand.
22   */
23  public class CaravanHttpCallbackExecutor extends ThreadPoolExecutor {
24  
25    private static final Logger LOG = LoggerFactory.getLogger(CaravanHttpCallbackExecutor.class);
26  
27    static final String THREAD_GROUP_NAME = "Caravan-Http-Callbacks";
28  
29    private static final AtomicInteger THREAD_INDEX_COUNTER = new AtomicInteger();
30  
31    private static final int THREAD_POOL_CORE_SIZE = 4;
32    private static final int THREAD_POOL_MAX_SIZE = 1000;
33    private static final int IDLE_THREAD_KEEP_ALIVE_SECONDS = 10;
34  
35    private static final int WATCHDOG_INTERVAL_SECONDS = 1;
36  
37    private final rx.Scheduler.Worker watchdogWorker;
38  
39    CaravanHttpCallbackExecutor() {
40      // by using a LinkedBlockingQueue, the ThreadPoolExecutor will not automatically spawn any new threads when all core threads are in use,
41      // instead, additional tasks will be queued up. If this happens, the periodically called ::increaseCoreSizeIfJobsAreQeueued method will adjust the core size
42      super(THREAD_POOL_CORE_SIZE, THREAD_POOL_MAX_SIZE, IDLE_THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
43          new RejectionHandler());
44  
45  
46      watchdogWorker = Schedulers.computation().createWorker();
47      watchdogWorker.schedulePeriodically(this::increaseCoreSizeIfJobsAreQeueued, 5, WATCHDOG_INTERVAL_SECONDS, TimeUnit.SECONDS);
48  
49      setThreadFactory(runnable -> new Thread(runnable, THREAD_GROUP_NAME + "-" + THREAD_INDEX_COUNTER.getAndIncrement()));
50    }
51  
52    @Override
53    public void shutdown() {
54  
55      watchdogWorker.unsubscribe();
56  
57      super.shutdown();
58    }
59  
60    @Override
61    public List<Runnable> shutdownNow() {
62  
63      watchdogWorker.unsubscribe();
64  
65      return super.shutdownNow();
66    }
67  
68    private void increaseCoreSizeIfJobsAreQeueued() {
69  
70      // This is based on "Idea #2" described in the following comment
71      // http://stackoverflow.com/questions/9622599/java-threadpoolexecutor-strategy-direct-handoff-with-queue#answer-9623100
72  
73      // the idea is that we don't allow ThreadPoolExecutor to decide when to spawn new thread on its own, as this could lead to a huge threadpool if many
74      // http requests are initiated at once. As long as the callbacks are executed fast, the system should work well by just using the core threadpool
75  
76      int queueSize = getQueue().size();
77      int currentPoolSize = getPoolSize();
78      int corePoolSize = getCorePoolSize();
79      int activeCount = getActiveCount();
80      try {
81  
82        if (LOG.isTraceEnabled()) {
83          LOG.trace(
84              "Checking if threadool core size should be adjusted: queueSize=" + queueSize + ". poolSize=" + currentPoolSize + ", activeCount=" + activeCount);
85        }
86  
87        if (queueSize > 0 && activeCount == currentPoolSize && currentPoolSize < getMaximumPoolSize()) {
88          int newCoreSize = Math.min(currentPoolSize + queueSize, getMaximumPoolSize());
89          LOG.warn(
90              "Increasing thread pool core size to " + newCoreSize + ", because there are " + queueSize + " callbacks waiting in the queue, and all of "
91                  + currentPoolSize + " threads are blocked.");
92  
93          setCorePoolSize(newCoreSize);
94        }
95        else if (queueSize == 0 && activeCount < currentPoolSize && corePoolSize > THREAD_POOL_CORE_SIZE) {
96          int newCoreSize = corePoolSize - 1;
97  
98          LOG.info(
99              "Decreasing thread pool core size to " + newCoreSize + ", because there are no callbacks waiting in the queue, and only " + activeCount + " of "
100                 + currentPoolSize + " threads are blocked.");
101 
102         setCorePoolSize(newCoreSize);
103       }
104     }
105     catch (IllegalArgumentException ex) {
106       LOG.error("Failed to adjust core size queueSize=" + queueSize + ". poolSize=" + currentPoolSize + ", activeCount=" + activeCount, ex);
107     }
108 
109   }
110 
111   @Override
112   protected void beforeExecute(Thread t, Runnable r) {
113 
114     if (LOG.isTraceEnabled()) {
115       LOG.trace("Executing HTTP callback on thread " + t + ", current pool size is " + getPoolSize());
116     }
117 
118     super.beforeExecute(t, r);
119   }
120 
121 
122   private static class RejectionHandler implements RejectedExecutionHandler {
123 
124     @Override
125     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
126       LOG.error("Rejected to execute async http callback " + r.toString() + " because the maximum number of threads (" + executor.getMaximumPoolSize()
127       + ") has been reached");
128     }
129 
130   }
131 
132 }