CaravanHttpClientImpl.java
/*
* #%L
* wcm.io
* %%
* Copyright (C) 2014 wcm.io
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package io.wcm.caravan.io.http.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.netflix.client.ClientException;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import io.wcm.caravan.common.performance.PerformanceMetrics;
import io.wcm.caravan.io.http.CaravanHttpClient;
import io.wcm.caravan.io.http.IllegalResponseRuntimeException;
import io.wcm.caravan.io.http.RequestFailedRuntimeException;
import io.wcm.caravan.io.http.impl.ribbon.RibbonHttpClient;
import io.wcm.caravan.io.http.impl.servletclient.NotSupportedByRequestMapperException;
import io.wcm.caravan.io.http.impl.servletclient.ServletHttpClient;
import io.wcm.caravan.io.http.request.CaravanHttpRequest;
import io.wcm.caravan.io.http.response.CaravanHttpResponse;
import rx.Observable;
import rx.Observable.Operator;
import rx.Scheduler;
import rx.Subscriber;
import rx.schedulers.Schedulers;
/**
* Default implementation of {@link CaravanHttpClient}.
*/
@Component(immediate = true)
@Service(CaravanHttpClient.class)
public class CaravanHttpClientImpl implements CaravanHttpClient {
private static final Logger LOG = LoggerFactory.getLogger(CaravanHttpClientImpl.class);
@Reference
private CaravanHttpClientConfig config;
@Reference
private ServletHttpClient servletClient;
@Reference
private RibbonHttpClient ribbonClient;
@Reference
private ApacheHttpClient apacheHttpClient;
private CaravanHttpCallbackExecutor callbackExecutor;
private Scheduler callbackScheduler;
@Activate
void activate() {
callbackExecutor = new CaravanHttpCallbackExecutor();
callbackScheduler = Schedulers.from(callbackExecutor);
}
@Deactivate
void deactivate() {
callbackExecutor.shutdownNow();
}
@Override
public Observable<CaravanHttpResponse> execute(CaravanHttpRequest request) {
Context ctx = new Context(request, null);
return execute(ctx)
.observeOn(callbackScheduler);
}
@Override
public Observable<CaravanHttpResponse> execute(CaravanHttpRequest request, Observable<CaravanHttpResponse> fallback) {
Context ctx = new Context(request, fallback);
return execute(ctx)
.observeOn(callbackScheduler);
}
private Observable<CaravanHttpResponse> execute(Context ctx) {
if (isRequestWithoutServiceId(ctx)) {
return createApacheResponse(ctx);
}
Observable<CaravanHttpResponse> ribbonResponse = createRibbonResponse(ctx);
if (isServletClientPossible(ctx)) {
return createServletClientResponse(ctx, ribbonResponse);
}
return ribbonResponse;
}
private boolean isRequestWithoutServiceId(Context ctx) {
return StringUtils.isEmpty(ctx.request.getServiceId());
}
private Observable<CaravanHttpResponse> createApacheResponse(Context ctx) {
Observable<CaravanHttpResponse> response = apacheHttpClient.execute(ctx.request);
return addHystrixAndErrorMapperAndMetrics(ctx, response);
}
private Observable<CaravanHttpResponse> createRibbonResponse(Context ctx) {
Observable<CaravanHttpResponse> response = ribbonClient.execute(ctx.request);
return addHystrixAndErrorMapperAndMetrics(ctx, response);
}
private boolean isServletClientPossible(Context ctx) {
boolean isGetRequest = "GET".equalsIgnoreCase(ctx.request.getMethod());
return isGetRequest
&& config.isServletClientEnabled()
&& servletClient.hasValidConfiguration(ctx.request.getServiceId());
}
private Observable<CaravanHttpResponse> createServletClientResponse(Context ctx, Observable<CaravanHttpResponse> ribbonResponse) {
Observable<CaravanHttpResponse> localhostResponse = servletClient.execute(ctx.request)
.lift(new ErrorDisassembleroperator(ctx, ribbonResponse));
return addHystrixAndErrorMapperAndMetrics(ctx, localhostResponse);
}
private Observable<CaravanHttpResponse> addHystrixAndErrorMapperAndMetrics(Context requestAndFallback,
Observable<CaravanHttpResponse> clientResponse) {
Observable<CaravanHttpResponse> hystrixResponse = wrapWithHystrix(requestAndFallback, clientResponse);
Observable<CaravanHttpResponse> exceptionMapperResponse = wrapWithExceptionMapper(requestAndFallback, hystrixResponse);
return addMetrics(requestAndFallback, exceptionMapperResponse);
}
private Observable<CaravanHttpResponse> wrapWithHystrix(Context ctx, Observable<CaravanHttpResponse> response) {
return new HttpHystrixCommand(ctx.request, response, ctx.fallback).toObservable();
}
private Observable<CaravanHttpResponse> wrapWithExceptionMapper(Context ctx, Observable<CaravanHttpResponse> response) {
return response.onErrorResumeNext(ex -> Observable.<CaravanHttpResponse>error(mapToKnownException(ctx.request, ex)));
}
private Throwable mapToKnownException(CaravanHttpRequest request, Throwable ex) {
if (ex instanceof RequestFailedRuntimeException || ex instanceof IllegalResponseRuntimeException) {
return ex;
}
if ((ex instanceof HystrixRuntimeException || ex instanceof ClientException) && ex.getCause() != null) {
return mapToKnownException(request, ex.getCause());
}
throw new RequestFailedRuntimeException(request, StringUtils.defaultString(ex.getMessage(), ex.getClass().getSimpleName()), ex);
}
private Observable<CaravanHttpResponse> addMetrics(Context ctx, Observable<CaravanHttpResponse> response) {
PerformanceMetrics metrics = ctx.request.getPerformanceMetrics();
return response.doOnSubscribe(metrics.getStartAction())
.doOnNext(metrics.getOnNextAction())
.doOnTerminate(metrics.getEndAction());
}
@Override
public boolean hasValidConfiguration(String serviceId) {
return CaravanHttpServiceConfigValidator.hasValidConfiguration(serviceId);
}
private class Context {
private final CaravanHttpRequest request;
private final Observable<CaravanHttpResponse> fallback;
Context(CaravanHttpRequest request, Observable<CaravanHttpResponse> fallback) {
this.request = request;
this.fallback = fallback;
}
}
private class ErrorDisassembleroperator implements Operator<CaravanHttpResponse, CaravanHttpResponse> {
private final Context ctx;
private final Observable<CaravanHttpResponse> nonLocalResponse;
ErrorDisassembleroperator(Context ctx, Observable<CaravanHttpResponse> nonLocalResponse) {
this.ctx = ctx;
this.nonLocalResponse = nonLocalResponse;
}
@Override
public Subscriber<? super CaravanHttpResponse> call(Subscriber<? super CaravanHttpResponse> subscriber) {
return new Subscriber<CaravanHttpResponse>() {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable ex) {
if (ex instanceof NotSupportedByRequestMapperException) {
LOG.warn("Could not execute request with localhost client for service " + ctx.request.getServiceId());
nonLocalResponse.subscribe(subscriber);
}
else {
subscriber.onError(ex);
}
}
@Override
public void onNext(CaravanHttpResponse next) {
subscriber.onNext(next);
}
};
}
}
}