1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package io.wcm.caravan.pipeline.impl.operators;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.SortedSet;
26 import java.util.TreeSet;
27 import java.util.concurrent.TimeUnit;
28
29 import org.apache.commons.lang3.StringUtils;
30 import org.apache.commons.lang3.math.NumberUtils;
31 import org.apache.http.HttpStatus;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import com.codahale.metrics.Counter;
36 import com.codahale.metrics.MetricRegistry;
37 import com.codahale.metrics.Timer;
38 import com.fasterxml.jackson.databind.JsonNode;
39 import com.fasterxml.jackson.databind.node.ObjectNode;
40 import com.google.common.collect.ImmutableList;
41 import com.google.common.collect.ImmutableMap;
42
43 import io.wcm.caravan.commons.metrics.rx.TimerMetricsOperator;
44 import io.wcm.caravan.io.http.request.CaravanHttpRequest;
45 import io.wcm.caravan.pipeline.JsonPipelineInputException;
46 import io.wcm.caravan.pipeline.JsonPipelineOutput;
47 import io.wcm.caravan.pipeline.cache.CacheDateUtils;
48 import io.wcm.caravan.pipeline.cache.CachePersistencyOptions;
49 import io.wcm.caravan.pipeline.cache.CacheStrategy;
50 import io.wcm.caravan.pipeline.cache.spi.CacheAdapter;
51 import io.wcm.caravan.pipeline.impl.JacksonFunctions;
52 import io.wcm.caravan.pipeline.impl.JsonPipelineContextImpl;
53 import io.wcm.caravan.pipeline.impl.JsonPipelineOutputImpl;
54 import rx.Observable;
55 import rx.Observable.Transformer;
56 import rx.Observer;
57 import rx.Subscriber;
58 import rx.exceptions.Exceptions;
59
60
61
62
63 public class CachePointTransformer implements Transformer<JsonPipelineOutput, JsonPipelineOutput> {
64
65 private static final Logger log = LoggerFactory.getLogger(CachePointTransformer.class);
66
67 private JsonPipelineContextImpl context;
68 private final List<CaravanHttpRequest> requests;
69 private final String descriptor;
70 private final CacheStrategy strategy;
71 private final String correlationId;
72
73
74
75
76
77
78
79 public CachePointTransformer(JsonPipelineContextImpl context, List<CaravanHttpRequest> requests, String descriptor, CacheStrategy strategy) {
80 super();
81 this.context = context;
82 this.requests = requests;
83 this.descriptor = descriptor;
84 this.strategy = strategy;
85 StringBuffer sb = new StringBuffer();
86 for (CaravanHttpRequest request : requests) {
87 if (sb.length() == 0) {
88 sb.append(request.getCorrelationId());
89 }
90 else {
91 sb.append(",").append(request.getCorrelationId());
92 }
93 }
94 this.correlationId = sb.toString();
95 }
96
97 private static SortedSet<String> getSourceServiceIds(List<CaravanHttpRequest> requests) {
98 SortedSet<String> sourceServiceIds = new TreeSet<String>();
99 for (CaravanHttpRequest request : requests) {
100 sourceServiceIds.add(request.getServiceId());
101 }
102 return sourceServiceIds;
103 }
104
105 private String getSourceServicePrefix() {
106 return StringUtils.join(getSourceServiceIds(requests), '+');
107 }
108
109 @Override
110 public Observable<JsonPipelineOutput> call(Observable<JsonPipelineOutput> output) {
111
112
113 Observable<JsonPipelineOutput> cachedSource = Observable.create((subscriber) -> {
114
115
116 String sourceServicePrefix = getSourceServicePrefix();
117 CacheAdapter cacheAdapter = context.getCacheAdapter();
118 final String cacheKey = sourceServicePrefix + ":" + descriptor;
119
120
121 CachePersistencyOptions options = strategy.getCachePersistencyOptions(requests);
122
123
124 Observable<String> cachedJsonString = cacheAdapter.get(cacheKey, options);
125
126
127 MetricRegistry metricRegistry = context.getMetricRegistry();
128 Timer timer = metricRegistry.timer(MetricRegistry.name(getClass(), sourceServicePrefix, "latency", "get"));
129
130 CacheMetrics cacheMetrics = new CacheMetrics(metricRegistry, sourceServicePrefix);
131
132
133 cachedJsonString
134 .lift(new TimerMetricsOperator<String>(timer))
135 .subscribe(new CacheResponseObserver(cacheKey, output, subscriber, cacheMetrics));
136 });
137
138 return cachedSource;
139 }
140
141 private final class CacheMetrics {
142
143 private final Counter hitsCounter;
144 private final Counter missesCounter;
145 private final Counter stalesCounter;
146 private final Counter fallbacksCounter;
147 private final Counter errorsCounter;
148
149 private CacheMetrics(MetricRegistry metricRegistry, String metricServicePrefix) {
150 hitsCounter = metricRegistry.counter(MetricRegistry.name(CachePointTransformer.class, metricServicePrefix, "hits"));
151 missesCounter = metricRegistry.counter(MetricRegistry.name(CachePointTransformer.class, metricServicePrefix, "misses"));
152 stalesCounter = metricRegistry.counter(MetricRegistry.name(CachePointTransformer.class, metricServicePrefix, "stales"));
153 fallbacksCounter = metricRegistry.counter(MetricRegistry.name(CachePointTransformer.class, metricServicePrefix, "fallbacks"));
154 errorsCounter = metricRegistry.counter(MetricRegistry.name(CachePointTransformer.class, metricServicePrefix, "errors"));
155 }
156 }
157
158
159
160
161
162
163
164
165
166
167
168 public final class CacheResponseObserver implements Observer<String> {
169
170 private final String cacheKey;
171 private final Observable<JsonPipelineOutput> originalSource;
172 private final Subscriber<? super JsonPipelineOutput> subscriber;
173
174 private final CacheMetrics cacheMetrics;
175
176 private boolean cacheHit;
177
178 private CacheResponseObserver(String cacheKey, Observable<JsonPipelineOutput> originalSource, Subscriber<? super JsonPipelineOutput> subscriberToForwardTo,
179 CacheMetrics cacheMetrics) {
180 this.cacheKey = cacheKey;
181 this.originalSource = originalSource;
182 this.subscriber = subscriberToForwardTo;
183
184 this.cacheMetrics = cacheMetrics;
185 }
186
187 @Override
188 public void onNext(String cachedContent) {
189
190 CacheEnvelope cacheEntry = CacheEnvelope.fromEnvelopeString(cachedContent, cacheKey);
191 if (cacheEntry == null) {
192 log.warn("CACHE ERROR for {} - the cached response could not be parsed,\n{}", this.cacheKey, correlationId);
193
194
195 cacheMetrics.errorsCounter.inc();
196
197
198 return;
199 }
200 cacheHit = true;
201
202 int responseAge = cacheEntry.getResponseAge();
203 int refreshInterval = strategy.getCachePersistencyOptions(requests).getRefreshInterval();
204
205 int expirySeconds = cacheEntry.getExpirySeconds();
206
207 int maxAgeFromClient = getClientMaxAge();
208
209
210 if (responseAge < refreshInterval && responseAge < maxAgeFromClient && expirySeconds > 0) {
211
212 log.debug("CACHE HIT for {},\n{}", this.cacheKey, correlationId);
213
214
215 cacheMetrics.hitsCounter.inc();
216
217
218 serveCachedContent(cacheEntry, refreshInterval);
219 }
220 else {
221
222 String reason;
223 if (responseAge >= refreshInterval) {
224 reason = "it's " + responseAge + " seconds old and the cache strategy has a refresh interval of " + refreshInterval + " seconds.";
225 }
226 else if (responseAge >= maxAgeFromClient) {
227 reason = "it's " + responseAge + " seconds old and the client requested a max-age of " + maxAgeFromClient + " seconds.";
228 }
229 else {
230 reason = "it has expired " + (-expirySeconds) + " seconds ago, according to the original max-age header from the http-response";
231 }
232
233 log.debug("CACHE STALE - content for {} is available, but {},\n{}", cacheKey, reason, correlationId);
234
235 fetchAndStore(new Subscriber<JsonPipelineOutput>() {
236
237 @Override
238 public void onNext(JsonPipelineOutput fetchedOutput) {
239
240
241 cacheMetrics.stalesCounter.inc();
242
243 subscriber.onNext(fetchedOutput);
244 }
245
246 @Override
247 public void onCompleted() {
248 subscriber.onCompleted();
249 }
250
251 @Override
252 public void onError(Throwable e) {
253 Exceptions.throwIfFatal(e);
254
255
256
257 if (cacheEntry.getStatusCode() >= 400) {
258 subscriber.onError(e);
259 return;
260 }
261
262
263 if (e instanceof JsonPipelineInputException && ((JsonPipelineInputException)e).getStatusCode() == 404) {
264 log.warn("CACHE FALLBACK - Using stale content from cache as a fallback after failing to fresh content for " + cacheKey + ",\n"
265 + correlationId + "\n" + e.getMessage());
266 }
267 else {
268 log.warn("CACHE FALLBACK - Using stale content from cache as a fallback after failing to fresh content for " + cacheKey + ",\n"
269 + correlationId, e);
270 }
271
272
273 cacheMetrics.fallbacksCounter.inc();
274
275 JsonPipelineOutputImpl pipelineOutput = new JsonPipelineOutputImpl(cacheEntry.getContentNode(), requests);
276
277
278 subscriber.onNext(pipelineOutput.withMaxAge(0));
279 subscriber.onCompleted();
280 }
281 });
282 }
283 }
284
285 private int getClientMaxAge() {
286 int maxAgeFromClient = (int)TimeUnit.DAYS.toSeconds(365);
287 for (String cacheControl : requests.get(0).getHeaders().get("Cache-Control")) {
288 if (cacheControl.startsWith("max-age")) {
289 int maxAge = NumberUtils.toInt(StringUtils.substringAfter(cacheControl, "="), maxAgeFromClient);
290 if (maxAge > 0) {
291 maxAgeFromClient = maxAge;
292 }
293 }
294 }
295 return maxAgeFromClient;
296 }
297
298 private void serveCachedContent(CacheEnvelope cacheEntry, int refreshInterval) {
299
300 if (cacheEntry.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
301
302 String cachedInfoSuffix = " (Cached from " + cacheEntry.getSources() + " at " + cacheEntry.getGeneratedDate() + ")";
303 subscriber.onError(new JsonPipelineInputException(HttpStatus.SC_NOT_FOUND, cacheEntry.getReasonString() + cachedInfoSuffix));
304 }
305 else {
306
307 int maxAge = refreshInterval - cacheEntry.getResponseAge();
308 maxAge = Math.min(maxAge, cacheEntry.getExpirySeconds());
309
310 subscriber.onNext(new JsonPipelineOutputImpl(cacheEntry.getContentNode(), requests).withMaxAge(maxAge));
311 subscriber.onCompleted();
312 }
313 }
314
315 @Override
316 public void onCompleted() {
317 if (!cacheHit) {
318
319 log.debug("CACHE MISS for {} fetching response from {} through pipeline,\n{}", cacheKey, getSourceServicePrefix(), correlationId);
320
321
322 cacheMetrics.missesCounter.inc();
323
324 fetchAndStore(subscriber);
325 }
326 }
327
328 @Override
329 public void onError(Throwable e) {
330 Exceptions.throwIfFatal(e);
331
332
333 log.warn("Failed to connect to couchbase server, falling back to direct connection to " + getSourceServicePrefix() + ",\n"
334 + correlationId, e);
335 fetchAndStore(subscriber);
336 }
337
338 private void fetchAndStore(Subscriber<? super JsonPipelineOutput> backendResponseSubscriber) {
339
340
341 originalSource.subscribe(new Observer<JsonPipelineOutput>() {
342
343 @Override
344 public void onNext(JsonPipelineOutput fetchedModel) {
345 CachePersistencyOptions options = strategy.getCachePersistencyOptions(requests);
346
347 int contentMaxAge = options.getRefreshInterval();
348 if (fetchedModel.getMaxAge() >= 0) {
349 contentMaxAge = Math.min(contentMaxAge, fetchedModel.getMaxAge());
350 }
351
352 log.debug("CACHE PUT - response for {} has been fetched and will be put in the cache, max-age={} sec,\n{}", cacheKey, contentMaxAge,
353 correlationId);
354
355 CacheEnvelope cacheEntry = CacheEnvelope.from200Response(fetchedModel.getPayload(), contentMaxAge, requests,
356 cacheKey, descriptor, context.getProperties());
357 context.getCacheAdapter().put(cacheKey, cacheEntry.getEnvelopeString(), options);
358
359
360 backendResponseSubscriber.onNext(fetchedModel.withMaxAge(contentMaxAge));
361 }
362
363 @Override
364 public void onCompleted() {
365 backendResponseSubscriber.onCompleted();
366 }
367
368 @Override
369 public void onError(Throwable e) {
370 Exceptions.throwIfFatal(e);
371
372 if (e instanceof JsonPipelineInputException) {
373 if (((JsonPipelineInputException)e).getStatusCode() == HttpStatus.SC_NOT_FOUND) {
374
375 int maxAgeFor404 = 60;
376 CachePersistencyOptions options = CachePersistencyOptions.createTransient(maxAgeFor404);
377 log.debug("CACHE PUT - 404 response for {} will be stored in the cache, max-age={} sec,\n{}",
378 descriptor, options.getRefreshInterval(), correlationId);
379
380 CacheEnvelope cacheEntry = CacheEnvelope.from404Response(e.getMessage(), maxAgeFor404, requests, cacheKey, descriptor, context.getProperties());
381 context.getCacheAdapter().put(cacheKey, cacheEntry.getEnvelopeString(), options);
382 }
383 }
384 backendResponseSubscriber.onError(e);
385 }
386 });
387 }
388 }
389
390
391
392
393
394 public static final class CacheEnvelope {
395
396 private static final String CACHE_METADATA_PROPERTY = "metadata";
397 private static final String CACHE_CONTENT_PROPERTY = "content";
398
399 private final ObjectNode envelopeNode;
400 private final ObjectNode metadataNode;
401 private final JsonNode contentNode;
402
403 private CacheEnvelope(ObjectNode envelopeNode) {
404 this.envelopeNode = envelopeNode;
405 metadataNode = (ObjectNode)envelopeNode.get(CACHE_METADATA_PROPERTY);
406 contentNode = envelopeNode.get(CACHE_CONTENT_PROPERTY);
407 }
408
409
410
411
412
413
414
415 public static CacheEnvelope fromEnvelopeString(String jsonString, String cacheKey) {
416 try {
417 ObjectNode envelopeFromCache = JacksonFunctions.stringToObjectNode(jsonString);
418 if (!envelopeFromCache.has(CACHE_METADATA_PROPERTY) || !envelopeFromCache.has(CACHE_CONTENT_PROPERTY)) {
419 log.warn("Ignoring cached document {}, because it doesn't have the expected metadata/content envelope.", cacheKey);
420 return null;
421 }
422
423 return new CacheEnvelope(envelopeFromCache);
424 }
425 catch (JsonPipelineInputException e) {
426 log.warn("Failed parse cached JSON document from " + cacheKey, e);
427 return null;
428 }
429 }
430
431
432
433
434
435
436
437
438
439
440
441 public static CacheEnvelope from200Response(JsonNode contentNode, int maxAge, List<CaravanHttpRequest> requests, String cacheKey,
442 String pipelineDescriptor, Map<String, String> contextProperties) {
443
444 ObjectNode envelope = createEnvelopeNode(contentNode, HttpStatus.SC_OK, maxAge, requests, cacheKey, pipelineDescriptor, null, contextProperties);
445 return new CacheEnvelope(envelope);
446 }
447
448
449
450
451
452
453
454
455
456
457
458 public static CacheEnvelope from404Response(String reason, int maxAge, List<CaravanHttpRequest> requests, String cacheKey,
459 String pipelineDescriptor, Map<String, String> contextProperties) {
460
461 JsonNode contentNode = JacksonFunctions.emptyObject();
462 int statusCode = HttpStatus.SC_NOT_FOUND;
463
464 ObjectNode envelope = createEnvelopeNode(contentNode, statusCode, maxAge, requests, cacheKey, pipelineDescriptor, reason, contextProperties);
465 return new CacheEnvelope(envelope);
466 }
467
468 static CacheEnvelope fromContentString(String contentJson, int age) {
469 ObjectNode envelopeNode = createEnvelopeNode(JacksonFunctions.stringToObjectNode(contentJson), 200, 0, ImmutableList.of(),
470 "Cache-Key", "Descriptor", null, ImmutableMap.of());
471
472 CacheEnvelope envelope = new CacheEnvelope(envelopeNode);
473
474 envelope.getMetadataNode().put("generated", CacheDateUtils.formatRelativeTime(-age));
475
476 return envelope;
477 }
478
479
480 private static ObjectNode createEnvelopeNode(JsonNode contentNode, int statusCode, int maxAge, List<CaravanHttpRequest> requests,
481 String cacheKey, String pipelineDescriptor, String reason, Map<String, String> contextProperties) {
482
483 ObjectNode envelope = JacksonFunctions.emptyObject();
484 ObjectNode metadata = envelope.putObject(CACHE_METADATA_PROPERTY);
485
486 metadata.put("cacheKey", cacheKey);
487 metadata.set("sources", JacksonFunctions.pojoToNode(getSourceServiceIds(requests)));
488 metadata.put("pipeline", pipelineDescriptor);
489 metadata.put("generated", CacheDateUtils.formatCurrentTime());
490 if (maxAge > 0) {
491 metadata.put("expires", CacheDateUtils.formatRelativeTime(maxAge));
492 }
493 metadata.put("statusCode", statusCode);
494
495 List<String> sourcePaths = new ArrayList<String>();
496 for (CaravanHttpRequest req : requests) {
497 sourcePaths.add(StringUtils.substringBefore(req.getUrl(), "?"));
498 }
499 metadata.set("sourcePaths", JacksonFunctions.pojoToNode(sourcePaths));
500
501
502 if (StringUtils.isNotBlank(reason)) {
503 metadata.put("reason", reason);
504 }
505 metadata.set("contextProperties", JacksonFunctions.pojoToNode(contextProperties));
506 envelope.set(CACHE_CONTENT_PROPERTY, contentNode);
507
508 return envelope;
509 }
510
511
512
513
514
515 public String getEnvelopeString() {
516 return JacksonFunctions.nodeToString(envelopeNode);
517 }
518
519 JsonNode getContentNode() {
520 return contentNode;
521 }
522
523 ObjectNode getMetadataNode() {
524 return metadataNode;
525 }
526
527 int getStatusCode() {
528 return metadataNode.at("/statusCode").asInt(HttpStatus.SC_OK);
529 }
530
531 String getReasonString() {
532 return metadataNode.at("/reason").asText("Not Found");
533 }
534
535 String getSources() {
536 return metadataNode.at("/sources").toString();
537 }
538
539 int getResponseAge() {
540 return CacheDateUtils.getSecondsSince(getGeneratedDate());
541 }
542
543 String getGeneratedDate() {
544 return metadataNode.at("/generated").asText();
545 }
546
547 int getExpirySeconds() {
548 if (!metadataNode.has("expires")) {
549 return (int)TimeUnit.DAYS.toSeconds(365);
550 }
551 String expiryDate = metadataNode.at("/expires").asText();
552 return CacheDateUtils.getSecondsUntil(expiryDate);
553 }
554
555
556
557
558 public void setGeneratedDate(String newDate) {
559 metadataNode.put("generated", newDate);
560 }
561
562
563
564
565 public void setExpiresDate(String newDate) {
566 metadataNode.put("expires", newDate);
567 }
568 }
569 }