View Javadoc
1   /*
2    * #%L
3    * wcm.io
4    * %%
5    * Copyright (C) 2014 wcm.io
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package io.wcm.caravan.pipeline.impl;
21  
22  import static org.apache.commons.lang3.StringUtils.isNotBlank;
23  import static org.apache.commons.lang3.Validate.isTrue;
24  import io.wcm.caravan.common.performance.PerformanceMetrics;
25  import io.wcm.caravan.io.http.CaravanHttpClient;
26  import io.wcm.caravan.io.http.request.CaravanHttpRequest;
27  import io.wcm.caravan.io.http.response.CaravanHttpResponse;
28  import io.wcm.caravan.pipeline.JsonPipeline;
29  import io.wcm.caravan.pipeline.JsonPipelineAction;
30  import io.wcm.caravan.pipeline.JsonPipelineContext;
31  import io.wcm.caravan.pipeline.JsonPipelineExceptionHandler;
32  import io.wcm.caravan.pipeline.JsonPipelineOutput;
33  import io.wcm.caravan.pipeline.cache.CachePersistencyOptions;
34  import io.wcm.caravan.pipeline.cache.CacheStrategy;
35  import io.wcm.caravan.pipeline.impl.operators.AssertExistsOperator;
36  import io.wcm.caravan.pipeline.impl.operators.CachePointTransformer;
37  import io.wcm.caravan.pipeline.impl.operators.CollectOperator;
38  import io.wcm.caravan.pipeline.impl.operators.ExtractOperator;
39  import io.wcm.caravan.pipeline.impl.operators.HandleExceptionOperator;
40  import io.wcm.caravan.pipeline.impl.operators.MergeTransformer;
41  import io.wcm.caravan.pipeline.impl.operators.ResponseHandlingOperator;
42  
43  import java.util.LinkedList;
44  import java.util.List;
45  import java.util.SortedSet;
46  import java.util.TreeSet;
47  
48  import org.apache.commons.lang3.StringUtils;
49  import org.slf4j.Logger;
50  import org.slf4j.LoggerFactory;
51  
52  import rx.Observable;
53  
54  import com.fasterxml.jackson.databind.JsonNode;
55  
56  /**
57   * Default implementation of {@link JsonPipeline}.
58   * Each new JsonPipeline instance created by this implementation provides a reuse of data, received after the first
59   * subscription call. Reuse should reduce the number of actual calls to external resources while execution of pipeline
60   * by multiple subscribers.
61   */
62  public final class JsonPipelineImpl implements JsonPipeline {
63  
64    private static final Logger log = LoggerFactory.getLogger(JsonPipelineImpl.class);
65  
66    private final SortedSet<String> sourceServiceIds = new TreeSet<String>();
67    private final List<CaravanHttpRequest> requests = new LinkedList<CaravanHttpRequest>();
68    private JsonPipelineContextImpl context;
69    private String descriptor;
70    private Observable<JsonPipelineOutput> observable;
71    private PerformanceMetrics performanceMetrics;
72  
73    /**
74     * @param request the REST request that provides the source data
75     * @param responseObservable the response observable obtained by the {@link CaravanHttpClient}
76     * @param context preinitialized JSON pipeline context
77     */
78    public JsonPipelineImpl(final CaravanHttpRequest request, final Observable<CaravanHttpResponse> responseObservable, final JsonPipelineContextImpl context) {
79      if (isNotBlank(request.getServiceId())) {
80        this.sourceServiceIds.add(request.getServiceId());
81      }
82      this.requests.add(request);
83      this.descriptor = isNotBlank(request.getUrl()) ? "GET(//" + request.getServiceId() + request.getUrl() + ")" : "EMPTY()";
84      this.observable = responseObservable.lift(new ResponseHandlingOperator(request)).cache();
85      this.context = context;
86      if (request.getPerformanceMetrics() != null) {
87        this.performanceMetrics = request.getPerformanceMetrics().createNext(isNotBlank(request.getUrl()) ? "GET" : "EMPTY", descriptor);
88      } else {
89        this.performanceMetrics = PerformanceMetrics.createNew(isNotBlank(request.getUrl()) ? "GET" : "EMPTY", descriptor, request.getCorrelationId());
90      }
91      this.observable = this.observable
92          .doOnSubscribe(this.performanceMetrics.getStartAction())
93          .doOnNext(this.performanceMetrics.getOnNextAction())
94          .doOnTerminate(this.performanceMetrics.getEndAction());
95  
96    }
97  
98    private JsonPipelineImpl() {
99      // only used internally
100   }
101 
102   JsonPipelineImpl cloneWith(Observable<JsonPipelineOutput> newObservable, String descriptorSuffix, String action) {
103     return cloneWith(newObservable, descriptorSuffix, action, null);
104   }
105 
106   JsonPipelineImpl cloneWith(Observable<JsonPipelineOutput> newObservable, String descriptorSuffix, String action, Class actionClass) {
107     JsonPipelineImpl clone = new JsonPipelineImpl();
108     clone.sourceServiceIds.addAll(this.sourceServiceIds);
109     clone.requests.addAll(this.requests);
110 
111     clone.descriptor = this.descriptor;
112     if (StringUtils.isNotBlank(descriptorSuffix)) {
113       clone.descriptor += "+" + descriptorSuffix;
114     }
115 
116     clone.observable = newObservable.cache();
117     clone.context = context;
118     clone.performanceMetrics = performanceMetrics.createNext(action, clone.descriptor, actionClass);
119     clone.observable = clone.observable
120         .doOnSubscribe(clone.performanceMetrics.getStartAction())
121         .doOnNext(clone.performanceMetrics.getOnNextAction())
122         .doOnTerminate(clone.performanceMetrics.getEndAction());
123 
124     return clone;
125   }
126 
127   @Override
128   public String getDescriptor() {
129     return descriptor;
130   }
131 
132   @Override
133   public SortedSet<String> getSourceServices() {
134     return this.sourceServiceIds;
135   }
136 
137   @Override
138   public List<CaravanHttpRequest> getRequests() {
139     return this.requests;
140   }
141 
142   @Override
143   public PerformanceMetrics getPerformanceMetrics() {
144     return this.performanceMetrics;
145   }
146 
147   @Override
148   public JsonPipeline assertExists(String jsonPath, int statusCode, String msg) {
149 
150     Observable<JsonPipelineOutput> assertingObservable = observable.lift(new AssertExistsOperator(jsonPath, statusCode, msg));
151 
152     return cloneWith(assertingObservable, null, "ASSERT_EXISTS");
153   }
154 
155   @Override
156   public JsonPipeline extract(String jsonPath) {
157 
158     Observable<JsonPipelineOutput> extractingObservable = observable.lift(new ExtractOperator(jsonPath, null));
159     String transformationDesc = "EXTRACT(" + jsonPath + ")";
160 
161     return cloneWith(extractingObservable, transformationDesc, "EXTRACT");
162   }
163 
164   @Override
165   public JsonPipeline extract(String jsonPath, String targetProperty) {
166 
167     isTrue(isNotBlank(targetProperty), "Target property is '" + targetProperty
168         + "'. Please provide meaningfull targetProperty or use another extract method wothout targetProperty parameter, if any targetProperty isn't required.");
169 
170     Observable<JsonPipelineOutput> extractingObservable = observable.lift(new ExtractOperator(jsonPath, targetProperty));
171     String transformationDesc = "EXTRACT(" + jsonPath + " INTO " + targetProperty + ")";
172 
173     return cloneWith(extractingObservable, transformationDesc, "EXTRACT");
174   }
175 
176   @Override
177   public JsonPipeline collect(String jsonPath) {
178 
179     Observable<JsonPipelineOutput> collectingObservable = observable.lift(new CollectOperator(jsonPath, null));
180     String transformationDesc = "COLLECT(" + jsonPath + ")";
181 
182     return cloneWith(collectingObservable, transformationDesc, "COLLECT");
183   }
184 
185   @Override
186   public JsonPipeline collect(String jsonPath, String targetProperty) {
187 
188     isTrue(isNotBlank(targetProperty), "Target property is '" + targetProperty
189         + "'. Please provide meaningfull targetProperty or use another collect method wothout targetProperty parameter, if any targetProperty isn't required.");
190 
191     Observable<JsonPipelineOutput> collectingObservable = observable.lift(new CollectOperator(jsonPath, targetProperty));
192     String transformationDesc = "COLLECT(" + jsonPath + " INTO " + targetProperty + ")";
193 
194     return cloneWith(collectingObservable, transformationDesc, "COLLECT");
195   }
196 
197   @Override
198   public JsonPipeline merge(JsonPipeline secondarySource) {
199 
200     MergeTransformer transformer = new MergeTransformer(descriptor, secondarySource.getOutput(), null);
201     Observable<JsonPipelineOutput> mergedObservable = observable.compose(transformer);
202     String transformationDesc = "MERGE(" + secondarySource.getDescriptor() + ")";
203 
204     JsonPipelineImpl mergedPipeline = cloneWith(mergedObservable, transformationDesc, "MERGE");
205     mergedPipeline.sourceServiceIds.addAll(secondarySource.getSourceServices());
206     mergedPipeline.requests.addAll(secondarySource.getRequests());
207 
208     return mergedPipeline;
209   }
210 
211   @Override
212   public JsonPipeline merge(JsonPipeline secondarySource, String targetProperty) {
213 
214     isTrue(isNotBlank(targetProperty), "Target property is '" + targetProperty
215         + "'. Please provide meaningfull targetProperty or use another merge method wothout targetProperty parameter, if any targetProperty isn't required.");
216 
217     MergeTransformer transformer = new MergeTransformer(descriptor, secondarySource.getOutput(), targetProperty);
218     Observable<JsonPipelineOutput> mergedObservable = observable.compose(transformer);
219     String transformationDesc = "MERGE(" + secondarySource.getDescriptor() + " INTO " + targetProperty + ")";
220 
221     JsonPipelineImpl mergedPipeline = cloneWith(mergedObservable, transformationDesc, "MERGE");
222     mergedPipeline.sourceServiceIds.addAll(secondarySource.getSourceServices());
223     mergedPipeline.requests.addAll(secondarySource.getRequests());
224 
225     return mergedPipeline;
226   }
227 
228   @Override
229   public JsonPipeline applyAction(JsonPipelineAction action) {
230     String actionDesc = "ACTION(" + action.getId() + ")";
231 
232     Observable<JsonPipelineOutput> transformedObservable = observable.flatMap(output -> {
233       try {
234         return action.execute(output, context);
235       }
236       catch (Throwable e) {
237         log.error("Failed to execute action " + action.getId(), e);
238         return Observable.error(e);
239       }
240     });
241 
242     return cloneWith(transformedObservable, actionDesc, "ACTION", action.getClass());
243   }
244 
245   @Override
246   public JsonPipeline addCachePoint(CacheStrategy strategy) {
247     CachePersistencyOptions options = strategy.getCachePersistencyOptions(requests);
248     // skip all caching logic if the expiry time or refresh interval for this request is 0
249     if (!options.isCacheable()) {
250       return this;
251     }
252 
253     CachePointTransformer transformer = new CachePointTransformer(context, requests, descriptor, strategy);
254     Observable<JsonPipelineOutput> cachingObservable = observable.compose(transformer);
255 
256     return cloneWith(cachingObservable, null, "ADD_CACHEPOINT");
257   }
258 
259   @Override
260   public JsonPipeline handleException(JsonPipelineExceptionHandler handler) {
261 
262     Observable<JsonPipelineOutput> exceptionHandlingObservable = observable.lift(new HandleExceptionOperator(requests, handler));
263 
264     return cloneWith(exceptionHandlingObservable, null, "HANDLE_EXCEPTION");
265   }
266 
267   @Override
268   public Observable<JsonPipelineOutput> getOutput() {
269     return observable.map(o -> o);
270   }
271 
272   @Override
273   public Observable<JsonNode> getJsonOutput() {
274     return observable.map(model -> model.getPayload());
275   }
276 
277   @Override
278   public Observable<String> getStringOutput() {
279     return getJsonOutput().map(JacksonFunctions::nodeToString);
280   }
281 
282   JsonPipelineContext getJsonPipelineContext() {
283     return this.context;
284   }
285 
286 }