1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package io.wcm.caravan.io.http.impl;
21
22 import org.apache.commons.lang3.StringUtils;
23 import org.apache.felix.scr.annotations.Activate;
24 import org.apache.felix.scr.annotations.Component;
25 import org.apache.felix.scr.annotations.Deactivate;
26 import org.apache.felix.scr.annotations.Reference;
27 import org.apache.felix.scr.annotations.Service;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import com.netflix.client.ClientException;
32 import com.netflix.hystrix.exception.HystrixRuntimeException;
33
34 import io.wcm.caravan.common.performance.PerformanceMetrics;
35 import io.wcm.caravan.io.http.CaravanHttpClient;
36 import io.wcm.caravan.io.http.IllegalResponseRuntimeException;
37 import io.wcm.caravan.io.http.RequestFailedRuntimeException;
38 import io.wcm.caravan.io.http.impl.ribbon.RibbonHttpClient;
39 import io.wcm.caravan.io.http.impl.servletclient.NotSupportedByRequestMapperException;
40 import io.wcm.caravan.io.http.impl.servletclient.ServletHttpClient;
41 import io.wcm.caravan.io.http.request.CaravanHttpRequest;
42 import io.wcm.caravan.io.http.response.CaravanHttpResponse;
43 import rx.Observable;
44 import rx.Observable.Operator;
45 import rx.Scheduler;
46 import rx.Subscriber;
47 import rx.schedulers.Schedulers;
48
49
50
51
52 @Component(immediate = true)
53 @Service(CaravanHttpClient.class)
54 public class CaravanHttpClientImpl implements CaravanHttpClient {
55
56 private static final Logger LOG = LoggerFactory.getLogger(CaravanHttpClientImpl.class);
57
58 @Reference
59 private CaravanHttpClientConfig config;
60 @Reference
61 private ServletHttpClient servletClient;
62 @Reference
63 private RibbonHttpClient ribbonClient;
64 @Reference
65 private ApacheHttpClient apacheHttpClient;
66
67 private CaravanHttpCallbackExecutor callbackExecutor;
68 private Scheduler callbackScheduler;
69
70 @Activate
71 void activate() {
72 callbackExecutor = new CaravanHttpCallbackExecutor();
73 callbackScheduler = Schedulers.from(callbackExecutor);
74 }
75
76 @Deactivate
77 void deactivate() {
78 callbackExecutor.shutdownNow();
79 }
80
81 @Override
82 public Observable<CaravanHttpResponse> execute(CaravanHttpRequest request) {
83 Context ctx = new Context(request, null);
84 return execute(ctx)
85 .observeOn(callbackScheduler);
86 }
87
88 @Override
89 public Observable<CaravanHttpResponse> execute(CaravanHttpRequest request, Observable<CaravanHttpResponse> fallback) {
90 Context ctx = new Context(request, fallback);
91 return execute(ctx)
92 .observeOn(callbackScheduler);
93 }
94
95 private Observable<CaravanHttpResponse> execute(Context ctx) {
96
97 if (isRequestWithoutServiceId(ctx)) {
98 return createApacheResponse(ctx);
99 }
100
101 Observable<CaravanHttpResponse> ribbonResponse = createRibbonResponse(ctx);
102 if (isServletClientPossible(ctx)) {
103 return createServletClientResponse(ctx, ribbonResponse);
104 }
105 return ribbonResponse;
106 }
107
108 private boolean isRequestWithoutServiceId(Context ctx) {
109 return StringUtils.isEmpty(ctx.request.getServiceId());
110 }
111
112 private Observable<CaravanHttpResponse> createApacheResponse(Context ctx) {
113 Observable<CaravanHttpResponse> response = apacheHttpClient.execute(ctx.request);
114 return addHystrixAndErrorMapperAndMetrics(ctx, response);
115 }
116
117 private Observable<CaravanHttpResponse> createRibbonResponse(Context ctx) {
118 Observable<CaravanHttpResponse> response = ribbonClient.execute(ctx.request);
119 return addHystrixAndErrorMapperAndMetrics(ctx, response);
120 }
121
122 private boolean isServletClientPossible(Context ctx) {
123 boolean isGetRequest = "GET".equalsIgnoreCase(ctx.request.getMethod());
124 return isGetRequest
125 && config.isServletClientEnabled()
126 && servletClient.hasValidConfiguration(ctx.request.getServiceId());
127 }
128
129 private Observable<CaravanHttpResponse> createServletClientResponse(Context ctx, Observable<CaravanHttpResponse> ribbonResponse) {
130 Observable<CaravanHttpResponse> localhostResponse = servletClient.execute(ctx.request)
131 .lift(new ErrorDisassembleroperator(ctx, ribbonResponse));
132 return addHystrixAndErrorMapperAndMetrics(ctx, localhostResponse);
133 }
134
135 private Observable<CaravanHttpResponse> addHystrixAndErrorMapperAndMetrics(Context requestAndFallback,
136 Observable<CaravanHttpResponse> clientResponse) {
137 Observable<CaravanHttpResponse> hystrixResponse = wrapWithHystrix(requestAndFallback, clientResponse);
138 Observable<CaravanHttpResponse> exceptionMapperResponse = wrapWithExceptionMapper(requestAndFallback, hystrixResponse);
139 return addMetrics(requestAndFallback, exceptionMapperResponse);
140 }
141
142 private Observable<CaravanHttpResponse> wrapWithHystrix(Context ctx, Observable<CaravanHttpResponse> response) {
143
144 return new HttpHystrixCommand(ctx.request, response, ctx.fallback).toObservable();
145 }
146
147 private Observable<CaravanHttpResponse> wrapWithExceptionMapper(Context ctx, Observable<CaravanHttpResponse> response) {
148 return response.onErrorResumeNext(ex -> Observable.<CaravanHttpResponse>error(mapToKnownException(ctx.request, ex)));
149 }
150
151 private Throwable mapToKnownException(CaravanHttpRequest request, Throwable ex) {
152 if (ex instanceof RequestFailedRuntimeException || ex instanceof IllegalResponseRuntimeException) {
153 return ex;
154 }
155 if ((ex instanceof HystrixRuntimeException || ex instanceof ClientException) && ex.getCause() != null) {
156 return mapToKnownException(request, ex.getCause());
157 }
158 throw new RequestFailedRuntimeException(request, StringUtils.defaultString(ex.getMessage(), ex.getClass().getSimpleName()), ex);
159 }
160
161 private Observable<CaravanHttpResponse> addMetrics(Context ctx, Observable<CaravanHttpResponse> response) {
162 PerformanceMetrics metrics = ctx.request.getPerformanceMetrics();
163 return response.doOnSubscribe(metrics.getStartAction())
164 .doOnNext(metrics.getOnNextAction())
165 .doOnTerminate(metrics.getEndAction());
166 }
167
168 @Override
169 public boolean hasValidConfiguration(String serviceId) {
170 return CaravanHttpServiceConfigValidator.hasValidConfiguration(serviceId);
171 }
172
173 private class Context {
174
175 private final CaravanHttpRequest request;
176 private final Observable<CaravanHttpResponse> fallback;
177
178 Context(CaravanHttpRequest request, Observable<CaravanHttpResponse> fallback) {
179 this.request = request;
180 this.fallback = fallback;
181 }
182
183 }
184
185 private class ErrorDisassembleroperator implements Operator<CaravanHttpResponse, CaravanHttpResponse> {
186
187 private final Context ctx;
188 private final Observable<CaravanHttpResponse> nonLocalResponse;
189
190 ErrorDisassembleroperator(Context ctx, Observable<CaravanHttpResponse> nonLocalResponse) {
191 this.ctx = ctx;
192 this.nonLocalResponse = nonLocalResponse;
193 }
194
195 @Override
196 public Subscriber<? super CaravanHttpResponse> call(Subscriber<? super CaravanHttpResponse> subscriber) {
197 return new Subscriber<CaravanHttpResponse>() {
198
199 @Override
200 public void onCompleted() {
201 subscriber.onCompleted();
202 }
203
204 @Override
205 public void onError(Throwable ex) {
206 if (ex instanceof NotSupportedByRequestMapperException) {
207 LOG.warn("Could not execute request with localhost client for service " + ctx.request.getServiceId());
208 nonLocalResponse.subscribe(subscriber);
209 }
210 else {
211 subscriber.onError(ex);
212 }
213 }
214
215 @Override
216 public void onNext(CaravanHttpResponse next) {
217 subscriber.onNext(next);
218 }
219
220 };
221 }
222
223 }
224
225 }