1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package io.wcm.caravan.io.http.impl;
21
22 import static java.util.concurrent.TimeUnit.MILLISECONDS;
23
24 import java.io.IOException;
25 import java.net.SocketTimeoutException;
26
27 import org.apache.felix.scr.annotations.Component;
28 import org.apache.felix.scr.annotations.Reference;
29 import org.apache.felix.scr.annotations.Service;
30 import org.apache.http.HttpEntity;
31 import org.apache.http.HttpResponse;
32 import org.apache.http.StatusLine;
33 import org.apache.http.client.methods.CloseableHttpResponse;
34 import org.apache.http.client.methods.HttpUriRequest;
35 import org.apache.http.concurrent.FutureCallback;
36 import org.apache.http.entity.BufferedHttpEntity;
37 import org.apache.http.impl.client.CloseableHttpClient;
38 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
39 import org.apache.http.util.EntityUtils;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import com.google.common.base.Stopwatch;
44 import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
45
46 import io.wcm.caravan.commons.httpasyncclient.HttpAsyncClientFactory;
47 import io.wcm.caravan.commons.httpclient.HttpClientFactory;
48 import io.wcm.caravan.io.http.CaravanHttpClient;
49 import io.wcm.caravan.io.http.IllegalResponseRuntimeException;
50 import io.wcm.caravan.io.http.RequestFailedRuntimeException;
51 import io.wcm.caravan.io.http.request.CaravanHttpRequest;
52 import io.wcm.caravan.io.http.response.CaravanHttpResponse;
53 import io.wcm.caravan.io.http.response.CaravanHttpResponseBuilder;
54 import rx.Observable;
55 import rx.Subscriber;
56
57
58
59
60 @Component(immediate = true)
61 @Service(ApacheHttpClient.class)
62 public class ApacheHttpClient implements CaravanHttpClient {
63
64 private static final Logger LOG = LoggerFactory.getLogger(ApacheHttpClient.class);
65
66 @Reference
67 private HttpClientFactory httpClientFactory;
68
69 @Reference
70 private HttpAsyncClientFactory httpAsyncClientFactory;
71
72 @Override
73 public Observable<CaravanHttpResponse> execute(CaravanHttpRequest request) {
74 return Observable.create(new Observable.OnSubscribe<CaravanHttpResponse>() {
75
76 @Override
77 public void call(final Subscriber<? super CaravanHttpResponse> subscriber) {
78 HttpUriRequest httpRequest = RequestUtil.buildHttpRequest(request);
79
80 if (LOG.isTraceEnabled()) {
81 LOG.trace("Initiating request for {},\n{},\n{}", httpRequest.getURI(), request.toString(), request.getCorrelationId());
82 }
83
84 if (HttpHystrixCommand.getIsolationStrategy(request) == ExecutionIsolationStrategy.THREAD) {
85 executeBlocking(subscriber, httpRequest);
86 }
87 else {
88 executeAsync(subscriber, httpRequest);
89 }
90 }
91
92 private void executeBlocking(final Subscriber<? super CaravanHttpResponse> subscriber, HttpUriRequest httpRequest) {
93
94 if (LOG.isTraceEnabled()) {
95 LOG.trace("Obtaining blocking http client to request " + httpRequest.getURI()
96 + ", because a hystrixThreadPoolKeyOverride is configured for this serviceId");
97 }
98
99 CloseableHttpClient httpClient = (CloseableHttpClient)httpClientFactory.get(httpRequest.getURI());
100
101 Stopwatch stopwatch = Stopwatch.createStarted();
102 try (CloseableHttpResponse result = httpClient.execute(httpRequest)) {
103 LOG.debug("Received response from {} in {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS), request.getCorrelationId());
104
105 processResponse(httpRequest, subscriber, result);
106
107 }
108 catch (Throwable ex) {
109 LOG.info("Caught exception requesting {} after {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS),
110 request.getCorrelationId());
111
112 processExeption(httpRequest, subscriber, ex);
113 }
114 }
115
116 private void executeAsync(final Subscriber<? super CaravanHttpResponse> subscriber, HttpUriRequest httpRequest) {
117
118 if (LOG.isTraceEnabled()) {
119 LOG.trace("Obtaining async http client to request " + httpRequest.getURI()
120 + ", because a hystrixThreadPoolKeyOverride is *not* configured for this serviceId");
121 }
122
123 CloseableHttpAsyncClient httpClient = (CloseableHttpAsyncClient)httpAsyncClientFactory.get(httpRequest.getURI());
124
125 Stopwatch stopwatch = Stopwatch.createStarted();
126
127 httpClient.execute(httpRequest, new FutureCallback<HttpResponse>() {
128
129 @Override
130 public void completed(HttpResponse result) {
131 LOG.debug("Received response from {} in {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS), request.getCorrelationId());
132
133 processResponse(httpRequest, subscriber, result);
134
135 }
136
137 @Override
138 public void failed(Exception ex) {
139 LOG.info("Caught exception requesting {} after {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS),
140 request.getCorrelationId());
141
142 processExeption(httpRequest, subscriber, ex);
143 }
144
145 @Override
146 public void cancelled() {
147 LOG.warn("Cancelled request for {} after {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS), request.getCorrelationId());
148
149 subscriber.onError(
150 new RequestFailedRuntimeException(request, "The request was unexpectedly cancelled after " + stopwatch.elapsed(MILLISECONDS) + "ms", null));
151 }
152
153 });
154 }
155
156 void processExeption(HttpUriRequest httpRequest, Subscriber<? super CaravanHttpResponse> subscriber, Throwable ex) {
157 if (ex instanceof SocketTimeoutException) {
158 subscriber.onError(new IOException("Socket timeout requesting '" + httpRequest.getURI(), ex));
159 }
160 else if (ex instanceof IOException) {
161 subscriber.onError(new IOException("Connection to '" + httpRequest.getURI() + "' failed", ex));
162 }
163 else {
164 subscriber.onError(new IOException("Requesting '" + httpRequest.getURI() + "' failed", ex));
165 }
166 }
167
168 void processResponse(HttpUriRequest httpRequest, final Subscriber<? super CaravanHttpResponse> subscriber, HttpResponse result) {
169
170 try {
171 StatusLine status = result.getStatusLine();
172 HttpEntity entity = new BufferedHttpEntity(result.getEntity());
173 EntityUtils.consume(entity);
174
175 boolean throwExceptionForStatus500 = CaravanHttpServiceConfigValidator.throwExceptionForStatus500(request.getServiceId());
176 if (status.getStatusCode() >= 500 && throwExceptionForStatus500) {
177 IllegalResponseRuntimeException illegalResponseRuntimeException = new IllegalResponseRuntimeException(request,
178 httpRequest.getURI().toString(),
179 status.getStatusCode(),
180 EntityUtils.toString(entity),
181 "Executing '" + httpRequest.getURI() + "' failed: " + result.getStatusLine());
182
183 subscriber.onError(illegalResponseRuntimeException);
184 EntityUtils.consumeQuietly(entity);
185 }
186 else {
187
188 CaravanHttpResponse response = new CaravanHttpResponseBuilder()
189 .status(status.getStatusCode())
190 .reason(status.getReasonPhrase())
191 .headers(RequestUtil.toHeadersMap(result.getAllHeaders()))
192 .body(entity.getContent(), entity.getContentLength() > 0 ? (int)entity.getContentLength() : null)
193 .build();
194
195 subscriber.onNext(response);
196 subscriber.onCompleted();
197 }
198 }
199 catch (IOException ex) {
200 subscriber.onError(new IOException("Reading response of '" + httpRequest.getURI() + "' failed", ex));
201 }
202
203 catch (Exception ex) {
204
205 subscriber.onError(new IOException("Processing response of '" + httpRequest.getURI() + "' failed", ex));
206 }
207 }
208
209 });
210 }
211
212 @Override
213 public Observable<CaravanHttpResponse> execute(CaravanHttpRequest request, Observable<CaravanHttpResponse> fallback) {
214 return execute(request);
215 }
216
217 @Override
218 public boolean hasValidConfiguration(String serviceId) {
219 return true;
220 }
221
222 }