View Javadoc
1   /*
2    * #%L
3    * wcm.io
4    * %%
5    * Copyright (C) 2015 wcm.io
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package io.wcm.caravan.io.http.impl;
21  
22  import static java.util.concurrent.TimeUnit.MILLISECONDS;
23  
24  import java.io.IOException;
25  import java.net.SocketTimeoutException;
26  
27  import org.apache.felix.scr.annotations.Component;
28  import org.apache.felix.scr.annotations.Reference;
29  import org.apache.felix.scr.annotations.Service;
30  import org.apache.http.HttpEntity;
31  import org.apache.http.HttpResponse;
32  import org.apache.http.StatusLine;
33  import org.apache.http.client.methods.CloseableHttpResponse;
34  import org.apache.http.client.methods.HttpUriRequest;
35  import org.apache.http.concurrent.FutureCallback;
36  import org.apache.http.entity.BufferedHttpEntity;
37  import org.apache.http.impl.client.CloseableHttpClient;
38  import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
39  import org.apache.http.util.EntityUtils;
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  import com.google.common.base.Stopwatch;
44  import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
45  
46  import io.wcm.caravan.commons.httpasyncclient.HttpAsyncClientFactory;
47  import io.wcm.caravan.commons.httpclient.HttpClientFactory;
48  import io.wcm.caravan.io.http.CaravanHttpClient;
49  import io.wcm.caravan.io.http.IllegalResponseRuntimeException;
50  import io.wcm.caravan.io.http.RequestFailedRuntimeException;
51  import io.wcm.caravan.io.http.request.CaravanHttpRequest;
52  import io.wcm.caravan.io.http.response.CaravanHttpResponse;
53  import io.wcm.caravan.io.http.response.CaravanHttpResponseBuilder;
54  import rx.Observable;
55  import rx.Subscriber;
56  
57  /**
58   * Simple implementation just executing the Apache HTTP client. Does not support a fallback.
59   */
60  @Component(immediate = true)
61  @Service(ApacheHttpClient.class)
62  public class ApacheHttpClient implements CaravanHttpClient {
63  
64    private static final Logger LOG = LoggerFactory.getLogger(ApacheHttpClient.class);
65  
66    @Reference
67    private HttpClientFactory httpClientFactory;
68  
69    @Reference
70    private HttpAsyncClientFactory httpAsyncClientFactory;
71  
72    @Override
73    public Observable<CaravanHttpResponse> execute(CaravanHttpRequest request) {
74      return Observable.create(new Observable.OnSubscribe<CaravanHttpResponse>() {
75  
76        @Override
77        public void call(final Subscriber<? super CaravanHttpResponse> subscriber) {
78          HttpUriRequest httpRequest = RequestUtil.buildHttpRequest(request);
79  
80          if (LOG.isTraceEnabled()) {
81            LOG.trace("Initiating request for {},\n{},\n{}", httpRequest.getURI(), request.toString(), request.getCorrelationId());
82          }
83  
84          if (HttpHystrixCommand.getIsolationStrategy(request) == ExecutionIsolationStrategy.THREAD) {
85            executeBlocking(subscriber, httpRequest);
86          }
87          else {
88            executeAsync(subscriber, httpRequest);
89          }
90        }
91  
92        private void executeBlocking(final Subscriber<? super CaravanHttpResponse> subscriber, HttpUriRequest httpRequest) {
93  
94          if (LOG.isTraceEnabled()) {
95            LOG.trace("Obtaining blocking http client to request " + httpRequest.getURI()
96            + ", because a hystrixThreadPoolKeyOverride is configured for this serviceId");
97          }
98  
99          CloseableHttpClient httpClient = (CloseableHttpClient)httpClientFactory.get(httpRequest.getURI());
100 
101         Stopwatch stopwatch = Stopwatch.createStarted();
102         try (CloseableHttpResponse result = httpClient.execute(httpRequest)) {
103           LOG.debug("Received response from {} in {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS), request.getCorrelationId());
104 
105           processResponse(httpRequest, subscriber, result);
106 
107         }
108         catch (Throwable ex) {
109           LOG.info("Caught exception requesting {} after {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS),
110               request.getCorrelationId());
111 
112           processExeption(httpRequest, subscriber, ex);
113         }
114       }
115 
116       private void executeAsync(final Subscriber<? super CaravanHttpResponse> subscriber, HttpUriRequest httpRequest) {
117 
118         if (LOG.isTraceEnabled()) {
119           LOG.trace("Obtaining async http client to request " + httpRequest.getURI()
120           + ", because a hystrixThreadPoolKeyOverride is *not* configured for this serviceId");
121         }
122 
123         CloseableHttpAsyncClient httpClient = (CloseableHttpAsyncClient)httpAsyncClientFactory.get(httpRequest.getURI());
124 
125         Stopwatch stopwatch = Stopwatch.createStarted();
126 
127         httpClient.execute(httpRequest, new FutureCallback<HttpResponse>() {
128 
129           @Override
130           public void completed(HttpResponse result) {
131             LOG.debug("Received response from {} in {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS), request.getCorrelationId());
132 
133             processResponse(httpRequest, subscriber, result);
134 
135           }
136 
137           @Override
138           public void failed(Exception ex) {
139             LOG.info("Caught exception requesting {} after {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS),
140                 request.getCorrelationId());
141 
142             processExeption(httpRequest, subscriber, ex);
143           }
144 
145           @Override
146           public void cancelled() {
147             LOG.warn("Cancelled request for {} after {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS), request.getCorrelationId());
148 
149             subscriber.onError(
150                 new RequestFailedRuntimeException(request, "The request was unexpectedly cancelled after " + stopwatch.elapsed(MILLISECONDS) + "ms", null));
151           }
152 
153         });
154       }
155 
156       void processExeption(HttpUriRequest httpRequest, Subscriber<? super CaravanHttpResponse> subscriber, Throwable ex) {
157         if (ex instanceof SocketTimeoutException) {
158           subscriber.onError(new IOException("Socket timeout requesting '" + httpRequest.getURI(), ex));
159         }
160         else if (ex instanceof IOException) {
161           subscriber.onError(new IOException("Connection to '" + httpRequest.getURI() + "' failed", ex));
162         }
163         else {
164           subscriber.onError(new IOException("Requesting '" + httpRequest.getURI() + "' failed", ex));
165         }
166       }
167 
168       void processResponse(HttpUriRequest httpRequest, final Subscriber<? super CaravanHttpResponse> subscriber, HttpResponse result) {
169 
170         try {
171           StatusLine status = result.getStatusLine();
172           HttpEntity entity = new BufferedHttpEntity(result.getEntity());
173           EntityUtils.consume(entity);
174 
175           boolean throwExceptionForStatus500 = CaravanHttpServiceConfigValidator.throwExceptionForStatus500(request.getServiceId());
176           if (status.getStatusCode() >= 500 && throwExceptionForStatus500) {
177             IllegalResponseRuntimeException illegalResponseRuntimeException = new IllegalResponseRuntimeException(request,
178                 httpRequest.getURI().toString(),
179                 status.getStatusCode(),
180                 EntityUtils.toString(entity),
181                 "Executing '" + httpRequest.getURI() + "' failed: " + result.getStatusLine());
182 
183             subscriber.onError(illegalResponseRuntimeException);
184             EntityUtils.consumeQuietly(entity);
185           }
186           else {
187 
188             CaravanHttpResponse response = new CaravanHttpResponseBuilder()
189                 .status(status.getStatusCode())
190                 .reason(status.getReasonPhrase())
191                 .headers(RequestUtil.toHeadersMap(result.getAllHeaders()))
192                 .body(entity.getContent(), entity.getContentLength() > 0 ? (int)entity.getContentLength() : null)
193                 .build();
194 
195             subscriber.onNext(response);
196             subscriber.onCompleted();
197           }
198         }
199         catch (IOException ex) {
200           subscriber.onError(new IOException("Reading response of '" + httpRequest.getURI() + "' failed", ex));
201         }
202         // CHECKSTYLE:OFF - yes we really wan to catch all exceptions here
203         catch (Exception ex) {
204           // CHECKSTYLE:ON
205           subscriber.onError(new IOException("Processing response of '" + httpRequest.getURI() + "' failed", ex));
206         }
207       }
208 
209     });
210   }
211 
212   @Override
213   public Observable<CaravanHttpResponse> execute(CaravanHttpRequest request, Observable<CaravanHttpResponse> fallback) {
214     return execute(request);
215   }
216 
217   @Override
218   public boolean hasValidConfiguration(String serviceId) {
219     return true;
220   }
221 
222 }