1
2 package io.wcm.caravan.io.http.impl;
3
4 import java.util.List;
5 import java.util.concurrent.LinkedBlockingQueue;
6 import java.util.concurrent.RejectedExecutionHandler;
7 import java.util.concurrent.ThreadPoolExecutor;
8 import java.util.concurrent.TimeUnit;
9 import java.util.concurrent.atomic.AtomicInteger;
10
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13
14 import rx.schedulers.Schedulers;
15
16
17
18
19
20
21
22
23 public class CaravanHttpCallbackExecutor extends ThreadPoolExecutor {
24
25 private static final Logger LOG = LoggerFactory.getLogger(CaravanHttpCallbackExecutor.class);
26
27 static final String THREAD_GROUP_NAME = "Caravan-Http-Callbacks";
28
29 private static final AtomicInteger THREAD_INDEX_COUNTER = new AtomicInteger();
30
31 private static final int THREAD_POOL_CORE_SIZE = 4;
32 private static final int THREAD_POOL_MAX_SIZE = 1000;
33 private static final int IDLE_THREAD_KEEP_ALIVE_SECONDS = 10;
34
35 private static final int WATCHDOG_INTERVAL_SECONDS = 1;
36
37 private final rx.Scheduler.Worker watchdogWorker;
38
39 CaravanHttpCallbackExecutor() {
40
41
42 super(THREAD_POOL_CORE_SIZE, THREAD_POOL_MAX_SIZE, IDLE_THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
43 new RejectionHandler());
44
45
46 watchdogWorker = Schedulers.computation().createWorker();
47 watchdogWorker.schedulePeriodically(this::increaseCoreSizeIfJobsAreQeueued, 5, WATCHDOG_INTERVAL_SECONDS, TimeUnit.SECONDS);
48
49 setThreadFactory(runnable -> new Thread(runnable, THREAD_GROUP_NAME + "-" + THREAD_INDEX_COUNTER.getAndIncrement()));
50 }
51
52 @Override
53 public void shutdown() {
54
55 watchdogWorker.unsubscribe();
56
57 super.shutdown();
58 }
59
60 @Override
61 public List<Runnable> shutdownNow() {
62
63 watchdogWorker.unsubscribe();
64
65 return super.shutdownNow();
66 }
67
68 private void increaseCoreSizeIfJobsAreQeueued() {
69
70
71
72
73
74
75
76 int queueSize = getQueue().size();
77 int currentPoolSize = getPoolSize();
78 int corePoolSize = getCorePoolSize();
79 int activeCount = getActiveCount();
80 try {
81
82 if (LOG.isTraceEnabled()) {
83 LOG.trace(
84 "Checking if threadool core size should be adjusted: queueSize=" + queueSize + ". poolSize=" + currentPoolSize + ", activeCount=" + activeCount);
85 }
86
87 if (queueSize > 0 && activeCount == currentPoolSize && currentPoolSize < getMaximumPoolSize()) {
88 int newCoreSize = Math.min(currentPoolSize + queueSize, getMaximumPoolSize());
89 LOG.warn(
90 "Increasing thread pool core size to " + newCoreSize + ", because there are " + queueSize + " callbacks waiting in the queue, and all of "
91 + currentPoolSize + " threads are blocked.");
92
93 setCorePoolSize(newCoreSize);
94 }
95 else if (queueSize == 0 && activeCount < currentPoolSize && corePoolSize > THREAD_POOL_CORE_SIZE) {
96 int newCoreSize = corePoolSize - 1;
97
98 LOG.info(
99 "Decreasing thread pool core size to " + newCoreSize + ", because there are no callbacks waiting in the queue, and only " + activeCount + " of "
100 + currentPoolSize + " threads are blocked.");
101
102 setCorePoolSize(newCoreSize);
103 }
104 }
105 catch (IllegalArgumentException ex) {
106 LOG.error("Failed to adjust core size queueSize=" + queueSize + ". poolSize=" + currentPoolSize + ", activeCount=" + activeCount, ex);
107 }
108
109 }
110
111 @Override
112 protected void beforeExecute(Thread t, Runnable r) {
113
114 if (LOG.isTraceEnabled()) {
115 LOG.trace("Executing HTTP callback on thread " + t + ", current pool size is " + getPoolSize());
116 }
117
118 super.beforeExecute(t, r);
119 }
120
121
122 private static class RejectionHandler implements RejectedExecutionHandler {
123
124 @Override
125 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
126 LOG.error("Rejected to execute async http callback " + r.toString() + " because the maximum number of threads (" + executor.getMaximumPoolSize()
127 + ") has been reached");
128 }
129
130 }
131
132 }