View Javadoc
1   /*
2    * #%L
3    * wcm.io
4    * %%
5    * Copyright (C) 2014 wcm.io
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package io.wcm.caravan.io.http.impl;
21  
22  import org.apache.commons.lang3.StringUtils;
23  import org.apache.felix.scr.annotations.Activate;
24  import org.apache.felix.scr.annotations.Component;
25  import org.apache.felix.scr.annotations.Deactivate;
26  import org.apache.felix.scr.annotations.Reference;
27  import org.apache.felix.scr.annotations.Service;
28  import org.slf4j.Logger;
29  import org.slf4j.LoggerFactory;
30  
31  import com.netflix.client.ClientException;
32  import com.netflix.hystrix.exception.HystrixRuntimeException;
33  
34  import io.wcm.caravan.common.performance.PerformanceMetrics;
35  import io.wcm.caravan.io.http.CaravanHttpClient;
36  import io.wcm.caravan.io.http.IllegalResponseRuntimeException;
37  import io.wcm.caravan.io.http.RequestFailedRuntimeException;
38  import io.wcm.caravan.io.http.impl.ribbon.RibbonHttpClient;
39  import io.wcm.caravan.io.http.impl.servletclient.NotSupportedByRequestMapperException;
40  import io.wcm.caravan.io.http.impl.servletclient.ServletHttpClient;
41  import io.wcm.caravan.io.http.request.CaravanHttpRequest;
42  import io.wcm.caravan.io.http.response.CaravanHttpResponse;
43  import rx.Observable;
44  import rx.Observable.Operator;
45  import rx.Scheduler;
46  import rx.Subscriber;
47  import rx.schedulers.Schedulers;
48  
49  /**
50   * Default implementation of {@link CaravanHttpClient}.
51   */
52  @Component(immediate = true)
53  @Service(CaravanHttpClient.class)
54  public class CaravanHttpClientImpl implements CaravanHttpClient {
55  
56    private static final Logger LOG = LoggerFactory.getLogger(CaravanHttpClientImpl.class);
57  
58    @Reference
59    private CaravanHttpClientConfig config;
60    @Reference
61    private ServletHttpClient servletClient;
62    @Reference
63    private RibbonHttpClient ribbonClient;
64    @Reference
65    private ApacheHttpClient apacheHttpClient;
66  
67    private CaravanHttpCallbackExecutor callbackExecutor;
68    private Scheduler callbackScheduler;
69  
70    @Activate
71    void activate() {
72      callbackExecutor = new CaravanHttpCallbackExecutor();
73      callbackScheduler = Schedulers.from(callbackExecutor);
74    }
75  
76    @Deactivate
77    void deactivate() {
78      callbackExecutor.shutdownNow();
79    }
80  
81    @Override
82    public Observable<CaravanHttpResponse> execute(CaravanHttpRequest request) {
83      Context ctx = new Context(request, null);
84      return execute(ctx)
85          .observeOn(callbackScheduler);
86    }
87  
88    @Override
89    public Observable<CaravanHttpResponse> execute(CaravanHttpRequest request, Observable<CaravanHttpResponse> fallback) {
90      Context ctx = new Context(request, fallback);
91      return execute(ctx)
92          .observeOn(callbackScheduler);
93    }
94  
95    private Observable<CaravanHttpResponse> execute(Context ctx) {
96  
97      if (isRequestWithoutServiceId(ctx)) {
98        return createApacheResponse(ctx);
99      }
100 
101     Observable<CaravanHttpResponse> ribbonResponse = createRibbonResponse(ctx);
102     if (isServletClientPossible(ctx)) {
103       return createServletClientResponse(ctx, ribbonResponse);
104     }
105     return ribbonResponse;
106   }
107 
108   private boolean isRequestWithoutServiceId(Context ctx) {
109     return StringUtils.isEmpty(ctx.request.getServiceId());
110   }
111 
112   private Observable<CaravanHttpResponse> createApacheResponse(Context ctx) {
113     Observable<CaravanHttpResponse> response = apacheHttpClient.execute(ctx.request);
114     return addHystrixAndErrorMapperAndMetrics(ctx, response);
115   }
116 
117   private Observable<CaravanHttpResponse> createRibbonResponse(Context ctx) {
118     Observable<CaravanHttpResponse> response = ribbonClient.execute(ctx.request);
119     return addHystrixAndErrorMapperAndMetrics(ctx, response);
120   }
121 
122   private boolean isServletClientPossible(Context ctx) {
123     boolean isGetRequest = "GET".equalsIgnoreCase(ctx.request.getMethod());
124     return isGetRequest
125         && config.isServletClientEnabled()
126         && servletClient.hasValidConfiguration(ctx.request.getServiceId());
127   }
128 
129   private Observable<CaravanHttpResponse> createServletClientResponse(Context ctx, Observable<CaravanHttpResponse> ribbonResponse) {
130     Observable<CaravanHttpResponse> localhostResponse = servletClient.execute(ctx.request)
131         .lift(new ErrorDisassembleroperator(ctx, ribbonResponse));
132     return addHystrixAndErrorMapperAndMetrics(ctx, localhostResponse);
133   }
134 
135   private Observable<CaravanHttpResponse> addHystrixAndErrorMapperAndMetrics(Context requestAndFallback,
136       Observable<CaravanHttpResponse> clientResponse) {
137     Observable<CaravanHttpResponse> hystrixResponse = wrapWithHystrix(requestAndFallback, clientResponse);
138     Observable<CaravanHttpResponse> exceptionMapperResponse = wrapWithExceptionMapper(requestAndFallback, hystrixResponse);
139     return addMetrics(requestAndFallback, exceptionMapperResponse);
140   }
141 
142   private Observable<CaravanHttpResponse> wrapWithHystrix(Context ctx, Observable<CaravanHttpResponse> response) {
143 
144     return new HttpHystrixCommand(ctx.request, response, ctx.fallback).toObservable();
145   }
146 
147   private Observable<CaravanHttpResponse> wrapWithExceptionMapper(Context ctx, Observable<CaravanHttpResponse> response) {
148     return response.onErrorResumeNext(ex -> Observable.<CaravanHttpResponse>error(mapToKnownException(ctx.request, ex)));
149   }
150 
151   private Throwable mapToKnownException(CaravanHttpRequest request, Throwable ex) {
152     if (ex instanceof RequestFailedRuntimeException || ex instanceof IllegalResponseRuntimeException) {
153       return ex;
154     }
155     if ((ex instanceof HystrixRuntimeException || ex instanceof ClientException) && ex.getCause() != null) {
156       return mapToKnownException(request, ex.getCause());
157     }
158     throw new RequestFailedRuntimeException(request, StringUtils.defaultString(ex.getMessage(), ex.getClass().getSimpleName()), ex);
159   }
160 
161   private Observable<CaravanHttpResponse> addMetrics(Context ctx, Observable<CaravanHttpResponse> response) {
162     PerformanceMetrics metrics = ctx.request.getPerformanceMetrics();
163     return response.doOnSubscribe(metrics.getStartAction())
164         .doOnNext(metrics.getOnNextAction())
165         .doOnTerminate(metrics.getEndAction());
166   }
167 
168   @Override
169   public boolean hasValidConfiguration(String serviceId) {
170     return CaravanHttpServiceConfigValidator.hasValidConfiguration(serviceId);
171   }
172 
173   private class Context {
174 
175     private final CaravanHttpRequest request;
176     private final Observable<CaravanHttpResponse> fallback;
177 
178     Context(CaravanHttpRequest request, Observable<CaravanHttpResponse> fallback) {
179       this.request = request;
180       this.fallback = fallback;
181     }
182 
183   }
184 
185   private class ErrorDisassembleroperator implements Operator<CaravanHttpResponse, CaravanHttpResponse> {
186 
187     private final Context ctx;
188     private final Observable<CaravanHttpResponse> nonLocalResponse;
189 
190     ErrorDisassembleroperator(Context ctx, Observable<CaravanHttpResponse> nonLocalResponse) {
191       this.ctx = ctx;
192       this.nonLocalResponse = nonLocalResponse;
193     }
194 
195     @Override
196     public Subscriber<? super CaravanHttpResponse> call(Subscriber<? super CaravanHttpResponse> subscriber) {
197       return new Subscriber<CaravanHttpResponse>() {
198 
199         @Override
200         public void onCompleted() {
201           subscriber.onCompleted();
202         }
203 
204         @Override
205         public void onError(Throwable ex) {
206           if (ex instanceof NotSupportedByRequestMapperException) {
207             LOG.warn("Could not execute request with localhost client for service " + ctx.request.getServiceId());
208             nonLocalResponse.subscribe(subscriber);
209           }
210           else {
211             subscriber.onError(ex);
212           }
213         }
214 
215         @Override
216         public void onNext(CaravanHttpResponse next) {
217           subscriber.onNext(next);
218         }
219 
220       };
221     }
222 
223   }
224 
225 }