ApacheHttpClient.java
/*
* #%L
* wcm.io
* %%
* Copyright (C) 2015 wcm.io
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package io.wcm.caravan.io.http.impl;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.io.IOException;
import java.net.SocketTimeoutException;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Stopwatch;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import io.wcm.caravan.commons.httpasyncclient.HttpAsyncClientFactory;
import io.wcm.caravan.commons.httpclient.HttpClientFactory;
import io.wcm.caravan.io.http.CaravanHttpClient;
import io.wcm.caravan.io.http.IllegalResponseRuntimeException;
import io.wcm.caravan.io.http.RequestFailedRuntimeException;
import io.wcm.caravan.io.http.request.CaravanHttpRequest;
import io.wcm.caravan.io.http.response.CaravanHttpResponse;
import io.wcm.caravan.io.http.response.CaravanHttpResponseBuilder;
import rx.Observable;
import rx.Subscriber;
/**
* Simple implementation just executing the Apache HTTP client. Does not support a fallback.
*/
@Component(immediate = true)
@Service(ApacheHttpClient.class)
public class ApacheHttpClient implements CaravanHttpClient {
private static final Logger LOG = LoggerFactory.getLogger(ApacheHttpClient.class);
@Reference
private HttpClientFactory httpClientFactory;
@Reference
private HttpAsyncClientFactory httpAsyncClientFactory;
@Override
public Observable<CaravanHttpResponse> execute(CaravanHttpRequest request) {
return Observable.create(new Observable.OnSubscribe<CaravanHttpResponse>() {
@Override
public void call(final Subscriber<? super CaravanHttpResponse> subscriber) {
HttpUriRequest httpRequest = RequestUtil.buildHttpRequest(request);
if (LOG.isTraceEnabled()) {
LOG.trace("Initiating request for {},\n{},\n{}", httpRequest.getURI(), request.toString(), request.getCorrelationId());
}
if (HttpHystrixCommand.getIsolationStrategy(request) == ExecutionIsolationStrategy.THREAD) {
executeBlocking(subscriber, httpRequest);
}
else {
executeAsync(subscriber, httpRequest);
}
}
private void executeBlocking(final Subscriber<? super CaravanHttpResponse> subscriber, HttpUriRequest httpRequest) {
if (LOG.isTraceEnabled()) {
LOG.trace("Obtaining blocking http client to request " + httpRequest.getURI()
+ ", because a hystrixThreadPoolKeyOverride is configured for this serviceId");
}
CloseableHttpClient httpClient = (CloseableHttpClient)httpClientFactory.get(httpRequest.getURI());
Stopwatch stopwatch = Stopwatch.createStarted();
try (CloseableHttpResponse result = httpClient.execute(httpRequest)) {
LOG.debug("Received response from {} in {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS), request.getCorrelationId());
processResponse(httpRequest, subscriber, result);
}
catch (Throwable ex) {
LOG.info("Caught exception requesting {} after {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS),
request.getCorrelationId());
processExeption(httpRequest, subscriber, ex);
}
}
private void executeAsync(final Subscriber<? super CaravanHttpResponse> subscriber, HttpUriRequest httpRequest) {
if (LOG.isTraceEnabled()) {
LOG.trace("Obtaining async http client to request " + httpRequest.getURI()
+ ", because a hystrixThreadPoolKeyOverride is *not* configured for this serviceId");
}
CloseableHttpAsyncClient httpClient = (CloseableHttpAsyncClient)httpAsyncClientFactory.get(httpRequest.getURI());
Stopwatch stopwatch = Stopwatch.createStarted();
httpClient.execute(httpRequest, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse result) {
LOG.debug("Received response from {} in {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS), request.getCorrelationId());
processResponse(httpRequest, subscriber, result);
}
@Override
public void failed(Exception ex) {
LOG.info("Caught exception requesting {} after {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS),
request.getCorrelationId());
processExeption(httpRequest, subscriber, ex);
}
@Override
public void cancelled() {
LOG.warn("Cancelled request for {} after {} ms\n{}", httpRequest.getURI().toString(), stopwatch.elapsed(MILLISECONDS), request.getCorrelationId());
subscriber.onError(
new RequestFailedRuntimeException(request, "The request was unexpectedly cancelled after " + stopwatch.elapsed(MILLISECONDS) + "ms", null));
}
});
}
void processExeption(HttpUriRequest httpRequest, Subscriber<? super CaravanHttpResponse> subscriber, Throwable ex) {
if (ex instanceof SocketTimeoutException) {
subscriber.onError(new IOException("Socket timeout requesting '" + httpRequest.getURI(), ex));
}
else if (ex instanceof IOException) {
subscriber.onError(new IOException("Connection to '" + httpRequest.getURI() + "' failed", ex));
}
else {
subscriber.onError(new IOException("Requesting '" + httpRequest.getURI() + "' failed", ex));
}
}
void processResponse(HttpUriRequest httpRequest, final Subscriber<? super CaravanHttpResponse> subscriber, HttpResponse result) {
try {
StatusLine status = result.getStatusLine();
HttpEntity entity = new BufferedHttpEntity(result.getEntity());
EntityUtils.consume(entity);
boolean throwExceptionForStatus500 = CaravanHttpServiceConfigValidator.throwExceptionForStatus500(request.getServiceId());
if (status.getStatusCode() >= 500 && throwExceptionForStatus500) {
IllegalResponseRuntimeException illegalResponseRuntimeException = new IllegalResponseRuntimeException(request,
httpRequest.getURI().toString(),
status.getStatusCode(),
EntityUtils.toString(entity),
"Executing '" + httpRequest.getURI() + "' failed: " + result.getStatusLine());
subscriber.onError(illegalResponseRuntimeException);
EntityUtils.consumeQuietly(entity);
}
else {
CaravanHttpResponse response = new CaravanHttpResponseBuilder()
.status(status.getStatusCode())
.reason(status.getReasonPhrase())
.headers(RequestUtil.toHeadersMap(result.getAllHeaders()))
.body(entity.getContent(), entity.getContentLength() > 0 ? (int)entity.getContentLength() : null)
.build();
subscriber.onNext(response);
subscriber.onCompleted();
}
}
catch (IOException ex) {
subscriber.onError(new IOException("Reading response of '" + httpRequest.getURI() + "' failed", ex));
}
// CHECKSTYLE:OFF - yes we really wan to catch all exceptions here
catch (Exception ex) {
// CHECKSTYLE:ON
subscriber.onError(new IOException("Processing response of '" + httpRequest.getURI() + "' failed", ex));
}
}
});
}
@Override
public Observable<CaravanHttpResponse> execute(CaravanHttpRequest request, Observable<CaravanHttpResponse> fallback) {
return execute(request);
}
@Override
public boolean hasValidConfiguration(String serviceId) {
return true;
}
}