1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package io.wcm.caravan.pipeline.impl;
21
22 import static org.apache.commons.lang3.StringUtils.isNotBlank;
23 import static org.apache.commons.lang3.Validate.isTrue;
24 import io.wcm.caravan.common.performance.PerformanceMetrics;
25 import io.wcm.caravan.io.http.CaravanHttpClient;
26 import io.wcm.caravan.io.http.request.CaravanHttpRequest;
27 import io.wcm.caravan.io.http.response.CaravanHttpResponse;
28 import io.wcm.caravan.pipeline.JsonPipeline;
29 import io.wcm.caravan.pipeline.JsonPipelineAction;
30 import io.wcm.caravan.pipeline.JsonPipelineContext;
31 import io.wcm.caravan.pipeline.JsonPipelineExceptionHandler;
32 import io.wcm.caravan.pipeline.JsonPipelineOutput;
33 import io.wcm.caravan.pipeline.cache.CachePersistencyOptions;
34 import io.wcm.caravan.pipeline.cache.CacheStrategy;
35 import io.wcm.caravan.pipeline.impl.operators.AssertExistsOperator;
36 import io.wcm.caravan.pipeline.impl.operators.CachePointTransformer;
37 import io.wcm.caravan.pipeline.impl.operators.CollectOperator;
38 import io.wcm.caravan.pipeline.impl.operators.ExtractOperator;
39 import io.wcm.caravan.pipeline.impl.operators.HandleExceptionOperator;
40 import io.wcm.caravan.pipeline.impl.operators.MergeTransformer;
41 import io.wcm.caravan.pipeline.impl.operators.ResponseHandlingOperator;
42
43 import java.util.LinkedList;
44 import java.util.List;
45 import java.util.SortedSet;
46 import java.util.TreeSet;
47
48 import org.apache.commons.lang3.StringUtils;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 import rx.Observable;
53
54 import com.fasterxml.jackson.databind.JsonNode;
55
56
57
58
59
60
61
62 public final class JsonPipelineImpl implements JsonPipeline {
63
64 private static final Logger log = LoggerFactory.getLogger(JsonPipelineImpl.class);
65
66 private final SortedSet<String> sourceServiceIds = new TreeSet<String>();
67 private final List<CaravanHttpRequest> requests = new LinkedList<CaravanHttpRequest>();
68 private JsonPipelineContextImpl context;
69 private String descriptor;
70 private Observable<JsonPipelineOutput> observable;
71 private PerformanceMetrics performanceMetrics;
72
73
74
75
76
77
78 public JsonPipelineImpl(final CaravanHttpRequest request, final Observable<CaravanHttpResponse> responseObservable, final JsonPipelineContextImpl context) {
79 if (isNotBlank(request.getServiceId())) {
80 this.sourceServiceIds.add(request.getServiceId());
81 }
82 this.requests.add(request);
83 this.descriptor = isNotBlank(request.getUrl()) ? "GET(//" + request.getServiceId() + request.getUrl() + ")" : "EMPTY()";
84 this.observable = responseObservable.lift(new ResponseHandlingOperator(request)).cache();
85 this.context = context;
86 if (request.getPerformanceMetrics() != null) {
87 this.performanceMetrics = request.getPerformanceMetrics().createNext(isNotBlank(request.getUrl()) ? "GET" : "EMPTY", descriptor);
88 } else {
89 this.performanceMetrics = PerformanceMetrics.createNew(isNotBlank(request.getUrl()) ? "GET" : "EMPTY", descriptor, request.getCorrelationId());
90 }
91 this.observable = this.observable
92 .doOnSubscribe(this.performanceMetrics.getStartAction())
93 .doOnNext(this.performanceMetrics.getOnNextAction())
94 .doOnTerminate(this.performanceMetrics.getEndAction());
95
96 }
97
98 private JsonPipelineImpl() {
99
100 }
101
102 JsonPipelineImpl cloneWith(Observable<JsonPipelineOutput> newObservable, String descriptorSuffix, String action) {
103 return cloneWith(newObservable, descriptorSuffix, action, null);
104 }
105
106 JsonPipelineImpl cloneWith(Observable<JsonPipelineOutput> newObservable, String descriptorSuffix, String action, Class actionClass) {
107 JsonPipelineImpl clone = new JsonPipelineImpl();
108 clone.sourceServiceIds.addAll(this.sourceServiceIds);
109 clone.requests.addAll(this.requests);
110
111 clone.descriptor = this.descriptor;
112 if (StringUtils.isNotBlank(descriptorSuffix)) {
113 clone.descriptor += "+" + descriptorSuffix;
114 }
115
116 clone.observable = newObservable.cache();
117 clone.context = context;
118 clone.performanceMetrics = performanceMetrics.createNext(action, clone.descriptor, actionClass);
119 clone.observable = clone.observable
120 .doOnSubscribe(clone.performanceMetrics.getStartAction())
121 .doOnNext(clone.performanceMetrics.getOnNextAction())
122 .doOnTerminate(clone.performanceMetrics.getEndAction());
123
124 return clone;
125 }
126
127 @Override
128 public String getDescriptor() {
129 return descriptor;
130 }
131
132 @Override
133 public SortedSet<String> getSourceServices() {
134 return this.sourceServiceIds;
135 }
136
137 @Override
138 public List<CaravanHttpRequest> getRequests() {
139 return this.requests;
140 }
141
142 @Override
143 public PerformanceMetrics getPerformanceMetrics() {
144 return this.performanceMetrics;
145 }
146
147 @Override
148 public JsonPipeline assertExists(String jsonPath, int statusCode, String msg) {
149
150 Observable<JsonPipelineOutput> assertingObservable = observable.lift(new AssertExistsOperator(jsonPath, statusCode, msg));
151
152 return cloneWith(assertingObservable, null, "ASSERT_EXISTS");
153 }
154
155 @Override
156 public JsonPipeline extract(String jsonPath) {
157
158 Observable<JsonPipelineOutput> extractingObservable = observable.lift(new ExtractOperator(jsonPath, null));
159 String transformationDesc = "EXTRACT(" + jsonPath + ")";
160
161 return cloneWith(extractingObservable, transformationDesc, "EXTRACT");
162 }
163
164 @Override
165 public JsonPipeline extract(String jsonPath, String targetProperty) {
166
167 isTrue(isNotBlank(targetProperty), "Target property is '" + targetProperty
168 + "'. Please provide meaningfull targetProperty or use another extract method wothout targetProperty parameter, if any targetProperty isn't required.");
169
170 Observable<JsonPipelineOutput> extractingObservable = observable.lift(new ExtractOperator(jsonPath, targetProperty));
171 String transformationDesc = "EXTRACT(" + jsonPath + " INTO " + targetProperty + ")";
172
173 return cloneWith(extractingObservable, transformationDesc, "EXTRACT");
174 }
175
176 @Override
177 public JsonPipeline collect(String jsonPath) {
178
179 Observable<JsonPipelineOutput> collectingObservable = observable.lift(new CollectOperator(jsonPath, null));
180 String transformationDesc = "COLLECT(" + jsonPath + ")";
181
182 return cloneWith(collectingObservable, transformationDesc, "COLLECT");
183 }
184
185 @Override
186 public JsonPipeline collect(String jsonPath, String targetProperty) {
187
188 isTrue(isNotBlank(targetProperty), "Target property is '" + targetProperty
189 + "'. Please provide meaningfull targetProperty or use another collect method wothout targetProperty parameter, if any targetProperty isn't required.");
190
191 Observable<JsonPipelineOutput> collectingObservable = observable.lift(new CollectOperator(jsonPath, targetProperty));
192 String transformationDesc = "COLLECT(" + jsonPath + " INTO " + targetProperty + ")";
193
194 return cloneWith(collectingObservable, transformationDesc, "COLLECT");
195 }
196
197 @Override
198 public JsonPipeline merge(JsonPipeline secondarySource) {
199
200 MergeTransformer transformer = new MergeTransformer(descriptor, secondarySource.getOutput(), null);
201 Observable<JsonPipelineOutput> mergedObservable = observable.compose(transformer);
202 String transformationDesc = "MERGE(" + secondarySource.getDescriptor() + ")";
203
204 JsonPipelineImpl mergedPipeline = cloneWith(mergedObservable, transformationDesc, "MERGE");
205 mergedPipeline.sourceServiceIds.addAll(secondarySource.getSourceServices());
206 mergedPipeline.requests.addAll(secondarySource.getRequests());
207
208 return mergedPipeline;
209 }
210
211 @Override
212 public JsonPipeline merge(JsonPipeline secondarySource, String targetProperty) {
213
214 isTrue(isNotBlank(targetProperty), "Target property is '" + targetProperty
215 + "'. Please provide meaningfull targetProperty or use another merge method wothout targetProperty parameter, if any targetProperty isn't required.");
216
217 MergeTransformer transformer = new MergeTransformer(descriptor, secondarySource.getOutput(), targetProperty);
218 Observable<JsonPipelineOutput> mergedObservable = observable.compose(transformer);
219 String transformationDesc = "MERGE(" + secondarySource.getDescriptor() + " INTO " + targetProperty + ")";
220
221 JsonPipelineImpl mergedPipeline = cloneWith(mergedObservable, transformationDesc, "MERGE");
222 mergedPipeline.sourceServiceIds.addAll(secondarySource.getSourceServices());
223 mergedPipeline.requests.addAll(secondarySource.getRequests());
224
225 return mergedPipeline;
226 }
227
228 @Override
229 public JsonPipeline applyAction(JsonPipelineAction action) {
230 String actionDesc = "ACTION(" + action.getId() + ")";
231
232 Observable<JsonPipelineOutput> transformedObservable = observable.flatMap(output -> {
233 try {
234 return action.execute(output, context);
235 }
236 catch (Throwable e) {
237 log.error("Failed to execute action " + action.getId(), e);
238 return Observable.error(e);
239 }
240 });
241
242 return cloneWith(transformedObservable, actionDesc, "ACTION", action.getClass());
243 }
244
245 @Override
246 public JsonPipeline addCachePoint(CacheStrategy strategy) {
247 CachePersistencyOptions options = strategy.getCachePersistencyOptions(requests);
248
249 if (!options.isCacheable()) {
250 return this;
251 }
252
253 CachePointTransformer transformer = new CachePointTransformer(context, requests, descriptor, strategy);
254 Observable<JsonPipelineOutput> cachingObservable = observable.compose(transformer);
255
256 return cloneWith(cachingObservable, null, "ADD_CACHEPOINT");
257 }
258
259 @Override
260 public JsonPipeline handleException(JsonPipelineExceptionHandler handler) {
261
262 Observable<JsonPipelineOutput> exceptionHandlingObservable = observable.lift(new HandleExceptionOperator(requests, handler));
263
264 return cloneWith(exceptionHandlingObservable, null, "HANDLE_EXCEPTION");
265 }
266
267 @Override
268 public Observable<JsonPipelineOutput> getOutput() {
269 return observable.map(o -> o);
270 }
271
272 @Override
273 public Observable<JsonNode> getJsonOutput() {
274 return observable.map(model -> model.getPayload());
275 }
276
277 @Override
278 public Observable<String> getStringOutput() {
279 return getJsonOutput().map(JacksonFunctions::nodeToString);
280 }
281
282 JsonPipelineContext getJsonPipelineContext() {
283 return this.context;
284 }
285
286 }