View Javadoc
1   /*
2    * #%L
3    * wcm.io
4    * %%
5    * Copyright (C) 2014 wcm.io
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package io.wcm.caravan.pipeline.impl.operators;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.SortedSet;
26  import java.util.TreeSet;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.apache.commons.lang3.StringUtils;
30  import org.apache.commons.lang3.math.NumberUtils;
31  import org.apache.http.HttpStatus;
32  import org.slf4j.Logger;
33  import org.slf4j.LoggerFactory;
34  
35  import com.codahale.metrics.Counter;
36  import com.codahale.metrics.MetricRegistry;
37  import com.codahale.metrics.Timer;
38  import com.fasterxml.jackson.databind.JsonNode;
39  import com.fasterxml.jackson.databind.node.ObjectNode;
40  import com.google.common.collect.ImmutableList;
41  import com.google.common.collect.ImmutableMap;
42  
43  import io.wcm.caravan.commons.metrics.rx.TimerMetricsOperator;
44  import io.wcm.caravan.io.http.request.CaravanHttpRequest;
45  import io.wcm.caravan.pipeline.JsonPipelineInputException;
46  import io.wcm.caravan.pipeline.JsonPipelineOutput;
47  import io.wcm.caravan.pipeline.cache.CacheDateUtils;
48  import io.wcm.caravan.pipeline.cache.CachePersistencyOptions;
49  import io.wcm.caravan.pipeline.cache.CacheStrategy;
50  import io.wcm.caravan.pipeline.cache.spi.CacheAdapter;
51  import io.wcm.caravan.pipeline.impl.JacksonFunctions;
52  import io.wcm.caravan.pipeline.impl.JsonPipelineContextImpl;
53  import io.wcm.caravan.pipeline.impl.JsonPipelineOutputImpl;
54  import rx.Observable;
55  import rx.Observable.Transformer;
56  import rx.Observer;
57  import rx.Subscriber;
58  import rx.exceptions.Exceptions;
59  
60  /**
61   * a rather complicated transformer that implements the pipelines caching capabilities
62   */
63  public class CachePointTransformer implements Transformer<JsonPipelineOutput, JsonPipelineOutput> {
64  
65    private static final Logger log = LoggerFactory.getLogger(CachePointTransformer.class);
66  
67    private JsonPipelineContextImpl context;
68    private final List<CaravanHttpRequest> requests;
69    private final String descriptor;
70    private final CacheStrategy strategy;
71    private final String correlationId;
72  
73    /**
74     * @param context a context of the actual JSON pipeline
75     * @param requests the outgoing REST request(s) used to obtain the JSON data to be cached
76     * @param descriptor the unique id of the pipeline (to build a cache key)
77     * @param strategy the CacheStrategy to get storage time and refresh interval
78     */
79    public CachePointTransformer(JsonPipelineContextImpl context, List<CaravanHttpRequest> requests, String descriptor, CacheStrategy strategy) {
80      super();
81      this.context = context;
82      this.requests = requests;
83      this.descriptor = descriptor;
84      this.strategy = strategy;
85      StringBuffer sb = new StringBuffer();
86      for (CaravanHttpRequest request : requests) {
87        if (sb.length() == 0) {
88          sb.append(request.getCorrelationId());
89        }
90        else {
91          sb.append(",").append(request.getCorrelationId());
92        }
93      }
94      this.correlationId = sb.toString();
95    }
96  
97    private static SortedSet<String> getSourceServiceIds(List<CaravanHttpRequest> requests) {
98      SortedSet<String> sourceServiceIds = new TreeSet<String>();
99      for (CaravanHttpRequest request : requests) {
100       sourceServiceIds.add(request.getServiceId());
101     }
102     return sourceServiceIds;
103   }
104 
105   private String getSourceServicePrefix() {
106     return StringUtils.join(getSourceServiceIds(requests), '+');
107   }
108 
109   @Override
110   public Observable<JsonPipelineOutput> call(Observable<JsonPipelineOutput> output) {
111 
112     // the code within the lambda passed to Observable#create will be executed when subscribe is called on the "cachedSource" observable
113     Observable<JsonPipelineOutput> cachedSource = Observable.create((subscriber) -> {
114 
115       // construct a unique cache key from the pipeline's descriptor
116       String sourceServicePrefix = getSourceServicePrefix();
117       CacheAdapter cacheAdapter = context.getCacheAdapter();
118       final String cacheKey = sourceServicePrefix + ":" + descriptor;
119 
120       // the caching strategy determines if the storage time should be extended for cache hits(i.e. Time-to-Idle behaviour)
121       CachePersistencyOptions options = strategy.getCachePersistencyOptions(requests);
122 
123       // try to asynchronously(!) fetch the response from the cache
124       Observable<String> cachedJsonString = cacheAdapter.get(cacheKey, options);
125 
126       // create service specific metrics
127       MetricRegistry metricRegistry = context.getMetricRegistry();
128       Timer timer = metricRegistry.timer(MetricRegistry.name(getClass(), sourceServicePrefix, "latency", "get"));
129 
130       CacheMetrics cacheMetrics = new CacheMetrics(metricRegistry, sourceServicePrefix);
131 
132       // CacheResponseObserver will decide what to do when the response is ready (or could not be retrieved from cache)
133       cachedJsonString
134       .lift(new TimerMetricsOperator<String>(timer))
135       .subscribe(new CacheResponseObserver(cacheKey, output, subscriber, cacheMetrics));
136     });
137 
138     return cachedSource;
139   }
140 
141   private final class CacheMetrics {
142 
143     private final Counter hitsCounter;
144     private final Counter missesCounter;
145     private final Counter stalesCounter;
146     private final Counter fallbacksCounter;
147     private final Counter errorsCounter;
148 
149     private CacheMetrics(MetricRegistry metricRegistry, String metricServicePrefix) {
150       hitsCounter = metricRegistry.counter(MetricRegistry.name(CachePointTransformer.class, metricServicePrefix, "hits"));
151       missesCounter = metricRegistry.counter(MetricRegistry.name(CachePointTransformer.class, metricServicePrefix, "misses"));
152       stalesCounter = metricRegistry.counter(MetricRegistry.name(CachePointTransformer.class, metricServicePrefix, "stales"));
153       fallbacksCounter = metricRegistry.counter(MetricRegistry.name(CachePointTransformer.class, metricServicePrefix, "fallbacks"));
154       errorsCounter = metricRegistry.counter(MetricRegistry.name(CachePointTransformer.class, metricServicePrefix, "errors"));
155     }
156   }
157 
158   /**
159    * An observer that is subscribed to the {@link Observable} returned by
160    * {@link CacheAdapter#get(String, CachePersistencyOptions)} , and is responsible for
161    * <ul>
162    * <li>unwrapping the JSON content from the caching envelope if it was successfully retrieved from cache</li>
163    * <li>forwarding the unwrapped response to the subscriber given in the constructor</li>
164    * <li>fetch the response from the Pipeline's dataSource if it couldn't be retrieved from cache</li>
165    * <li>store the fetched responses to couchbase (wrapped in an envelope with metadata</li>
166    * </ul>
167    */
168   public final class CacheResponseObserver implements Observer<String> {
169 
170     private final String cacheKey;
171     private final Observable<JsonPipelineOutput> originalSource;
172     private final Subscriber<? super JsonPipelineOutput> subscriber;
173 
174     private final CacheMetrics cacheMetrics;
175 
176     private boolean cacheHit;
177 
178     private CacheResponseObserver(String cacheKey, Observable<JsonPipelineOutput> originalSource, Subscriber<? super JsonPipelineOutput> subscriberToForwardTo,
179         CacheMetrics cacheMetrics) {
180       this.cacheKey = cacheKey;
181       this.originalSource = originalSource;
182       this.subscriber = subscriberToForwardTo;
183 
184       this.cacheMetrics = cacheMetrics;
185     }
186 
187     @Override
188     public void onNext(String cachedContent) {
189 
190       CacheEnvelope cacheEntry = CacheEnvelope.fromEnvelopeString(cachedContent, cacheKey);
191       if (cacheEntry == null) {
192         log.warn("CACHE ERROR for {} - the cached response could not be parsed,\n{}", this.cacheKey, correlationId);
193 
194         // increase the error counter for the source serviceID
195         cacheMetrics.errorsCounter.inc();
196 
197         // ignore cache envelopes that can not be parsed
198         return;
199       }
200       cacheHit = true;
201 
202       int responseAge = cacheEntry.getResponseAge();
203       int refreshInterval = strategy.getCachePersistencyOptions(requests).getRefreshInterval();
204 
205       int expirySeconds = cacheEntry.getExpirySeconds();
206 
207       int maxAgeFromClient = getClientMaxAge();
208 
209       // check if the content from cache is fresh enough to serve it
210       if (responseAge < refreshInterval && responseAge < maxAgeFromClient && expirySeconds > 0) {
211 
212         log.debug("CACHE HIT for {},\n{}", this.cacheKey, correlationId);
213 
214         // increase the hits counter for the source serviceID
215         cacheMetrics.hitsCounter.inc();
216 
217         // the document could be retrieved, so forward it (parsed as a JsonNode) to the actual subscriber to the cachedSource
218         serveCachedContent(cacheEntry, refreshInterval);
219       }
220       else {
221         // this means the cached content is outdated - we better fetch the data from the backend
222         String reason;
223         if (responseAge >= refreshInterval) {
224           reason = "it's " + responseAge + " seconds old and the cache strategy has a refresh interval of " + refreshInterval + " seconds.";
225         }
226         else if (responseAge >= maxAgeFromClient) {
227           reason = "it's " + responseAge + " seconds old and the client requested a max-age of " + maxAgeFromClient + " seconds.";
228         }
229         else {
230           reason = "it has expired " + (-expirySeconds) + " seconds ago, according to the original max-age header from the http-response";
231         }
232 
233         log.debug("CACHE STALE - content for {} is available, but {},\n{}", cacheKey, reason, correlationId);
234 
235         fetchAndStore(new Subscriber<JsonPipelineOutput>() {
236 
237           @Override
238           public void onNext(JsonPipelineOutput fetchedOutput) {
239 
240             // increase the stales counter for the source serviceID
241             cacheMetrics.stalesCounter.inc();
242 
243             subscriber.onNext(fetchedOutput);
244           }
245 
246           @Override
247           public void onCompleted() {
248             subscriber.onCompleted();
249           }
250 
251           @Override
252           public void onError(Throwable e) {
253             Exceptions.throwIfFatal(e);
254 
255             // if the cached response was a an error as well (e.g. 404), then do not use it as a fallback.
256             // instead  just forward the actual exception that occurred
257             if (cacheEntry.getStatusCode() >= 400) {
258               subscriber.onError(e);
259               return;
260             }
261 
262 
263             if (e instanceof JsonPipelineInputException && ((JsonPipelineInputException)e).getStatusCode() == 404) {
264               log.warn("CACHE FALLBACK - Using stale content from cache as a fallback after failing to fresh content for " + cacheKey + ",\n"
265                   + correlationId + "\n" + e.getMessage());
266             }
267             else {
268               log.warn("CACHE FALLBACK - Using stale content from cache as a fallback after failing to fresh content for " + cacheKey + ",\n"
269                   + correlationId, e);
270             }
271 
272             // increase the fallbacks counter for the source serviceID
273             cacheMetrics.fallbacksCounter.inc();
274 
275             JsonPipelineOutputImpl pipelineOutput = new JsonPipelineOutputImpl(cacheEntry.getContentNode(), requests);
276 
277             // when fallback content is served from cache, it should not be cached by the client at all
278             subscriber.onNext(pipelineOutput.withMaxAge(0));
279             subscriber.onCompleted();
280           }
281         });
282       }
283     }
284 
285     private int getClientMaxAge() {
286       int maxAgeFromClient = (int)TimeUnit.DAYS.toSeconds(365);
287       for (String cacheControl : requests.get(0).getHeaders().get("Cache-Control")) {
288         if (cacheControl.startsWith("max-age")) {
289           int maxAge = NumberUtils.toInt(StringUtils.substringAfter(cacheControl, "="), maxAgeFromClient);
290           if (maxAge > 0) {
291             maxAgeFromClient = maxAge;
292           }
293         }
294       }
295       return maxAgeFromClient;
296     }
297 
298     private void serveCachedContent(CacheEnvelope cacheEntry, int refreshInterval) {
299 
300       if (cacheEntry.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
301         // the cache entry is a 404 response that should be thrown as an exception to be handled by the subscriber
302         String cachedInfoSuffix = " (Cached from " + cacheEntry.getSources() + " at " + cacheEntry.getGeneratedDate() + ")";
303         subscriber.onError(new JsonPipelineInputException(HttpStatus.SC_NOT_FOUND, cacheEntry.getReasonString() + cachedInfoSuffix));
304       }
305       else {
306         // make sure to set the max-age content-header just to the time the cached content will become stale
307         int maxAge = refreshInterval - cacheEntry.getResponseAge();
308         maxAge = Math.min(maxAge, cacheEntry.getExpirySeconds());
309 
310         subscriber.onNext(new JsonPipelineOutputImpl(cacheEntry.getContentNode(), requests).withMaxAge(maxAge));
311         subscriber.onCompleted();
312       }
313     }
314 
315     @Override
316     public void onCompleted() {
317       if (!cacheHit) {
318         // there was no emission, so the response has to be fetched from the service
319         log.debug("CACHE MISS for {} fetching response from {} through pipeline,\n{}", cacheKey, getSourceServicePrefix(), correlationId);
320 
321         // increase the misses counter for the source serviceID
322         cacheMetrics.missesCounter.inc();
323 
324         fetchAndStore(subscriber);
325       }
326     }
327 
328     @Override
329     public void onError(Throwable e) {
330       Exceptions.throwIfFatal(e);
331 
332       // also fall back to the actual service if the couchbase request failed
333       log.warn("Failed to connect to couchbase server, falling back to direct connection to " + getSourceServicePrefix() + ",\n"
334           + correlationId, e);
335       fetchAndStore(subscriber);
336     }
337 
338     private void fetchAndStore(Subscriber<? super JsonPipelineOutput> backendResponseSubscriber) {
339 
340       // fetch the output with a new subscription, which will also store the response in the cache when it is retrieved
341       originalSource.subscribe(new Observer<JsonPipelineOutput>() {
342 
343         @Override
344         public void onNext(JsonPipelineOutput fetchedModel) {
345           CachePersistencyOptions options = strategy.getCachePersistencyOptions(requests);
346 
347           int contentMaxAge = options.getRefreshInterval();
348           if (fetchedModel.getMaxAge() >= 0) {
349             contentMaxAge = Math.min(contentMaxAge, fetchedModel.getMaxAge());
350           }
351 
352           log.debug("CACHE PUT - response for {} has been fetched and will be put in the cache, max-age={} sec,\n{}", cacheKey, contentMaxAge,
353               correlationId);
354 
355           CacheEnvelope cacheEntry = CacheEnvelope.from200Response(fetchedModel.getPayload(), contentMaxAge, requests,
356               cacheKey, descriptor, context.getProperties());
357           context.getCacheAdapter().put(cacheKey, cacheEntry.getEnvelopeString(), options);
358 
359           // everything else is just forwarding to the subscriber to the cachedSource
360           backendResponseSubscriber.onNext(fetchedModel.withMaxAge(contentMaxAge));
361         }
362 
363         @Override
364         public void onCompleted() {
365           backendResponseSubscriber.onCompleted();
366         }
367 
368         @Override
369         public void onError(Throwable e) {
370           Exceptions.throwIfFatal(e);
371 
372           if (e instanceof JsonPipelineInputException) {
373             if (((JsonPipelineInputException)e).getStatusCode() == HttpStatus.SC_NOT_FOUND) {
374 
375               int maxAgeFor404 = 60;
376               CachePersistencyOptions options = CachePersistencyOptions.createTransient(maxAgeFor404);
377               log.debug("CACHE PUT - 404 response for {} will be stored in the cache, max-age={} sec,\n{}",
378                   descriptor, options.getRefreshInterval(), correlationId);
379 
380               CacheEnvelope cacheEntry = CacheEnvelope.from404Response(e.getMessage(), maxAgeFor404, requests, cacheKey, descriptor, context.getProperties());
381               context.getCacheAdapter().put(cacheKey, cacheEntry.getEnvelopeString(), options);
382             }
383           }
384           backendResponseSubscriber.onError(e);
385         }
386       });
387     }
388   }
389 
390   /**
391    * Implements generation and parsing of the cache "envelope" document, that wraps the JSON output of the pipeline to
392    * be able to store additional metadata in the cache
393    */
394   public static final class CacheEnvelope {
395 
396     private static final String CACHE_METADATA_PROPERTY = "metadata";
397     private static final String CACHE_CONTENT_PROPERTY = "content";
398 
399     private final ObjectNode envelopeNode;
400     private final ObjectNode metadataNode;
401     private final JsonNode contentNode;
402 
403     private CacheEnvelope(ObjectNode envelopeNode) {
404       this.envelopeNode = envelopeNode;
405       metadataNode = (ObjectNode)envelopeNode.get(CACHE_METADATA_PROPERTY);
406       contentNode = envelopeNode.get(CACHE_CONTENT_PROPERTY);
407     }
408 
409     /**
410      * Parse a JSON string that was obtained from the couchbase cache
411      * @param jsonString JSON string
412      * @param cacheKey Cache key
413      * @return the CacheEntry - or null if the json String was not in the expected format
414      */
415     public static CacheEnvelope fromEnvelopeString(String jsonString, String cacheKey) {
416       try {
417         ObjectNode envelopeFromCache = JacksonFunctions.stringToObjectNode(jsonString);
418         if (!envelopeFromCache.has(CACHE_METADATA_PROPERTY) || !envelopeFromCache.has(CACHE_CONTENT_PROPERTY)) {
419           log.warn("Ignoring cached document {}, because it doesn't have the expected metadata/content envelope.", cacheKey);
420           return null;
421         }
422 
423         return new CacheEnvelope(envelopeFromCache);
424       }
425       catch (JsonPipelineInputException e) {
426         log.warn("Failed parse cached JSON document from " + cacheKey, e);
427         return null;
428       }
429     }
430 
431     /**
432      * Create a new CacheEnvelope to store in the couchbase cache
433      * @param contentNode Content node
434      * @param maxAge how many seconds 404 responses in cache should stay valid
435      * @param requests Requests
436      * @param cacheKey Cache key
437      * @param pipelineDescriptor Pipeline descriptor
438      * @param contextProperties Context properties
439      * @return the new CacheEnvelope instance
440      */
441     public static CacheEnvelope from200Response(JsonNode contentNode, int maxAge, List<CaravanHttpRequest> requests, String cacheKey,
442         String pipelineDescriptor, Map<String, String> contextProperties) {
443 
444       ObjectNode envelope = createEnvelopeNode(contentNode, HttpStatus.SC_OK, maxAge, requests, cacheKey, pipelineDescriptor, null, contextProperties);
445       return new CacheEnvelope(envelope);
446     }
447 
448     /**
449      * Create a new CacheEnvelope to store in the couchbase cache
450      * @param reason Reason
451      * @param maxAge how many seconds 404 responses in cache should stay valid
452      * @param requests Requests
453      * @param cacheKey Cache key
454      * @param pipelineDescriptor Pipeline descriptor
455      * @param contextProperties Context properties
456      * @return the new CacheEnvelope instance
457      */
458     public static CacheEnvelope from404Response(String reason, int maxAge, List<CaravanHttpRequest> requests, String cacheKey,
459         String pipelineDescriptor, Map<String, String> contextProperties) {
460 
461       JsonNode contentNode = JacksonFunctions.emptyObject();
462       int statusCode = HttpStatus.SC_NOT_FOUND;
463 
464       ObjectNode envelope = createEnvelopeNode(contentNode, statusCode, maxAge, requests, cacheKey, pipelineDescriptor, reason, contextProperties);
465       return new CacheEnvelope(envelope);
466     }
467 
468     static CacheEnvelope fromContentString(String contentJson, int age) {
469       ObjectNode envelopeNode = createEnvelopeNode(JacksonFunctions.stringToObjectNode(contentJson), 200, 0, ImmutableList.of(),
470           "Cache-Key", "Descriptor", null, ImmutableMap.of());
471 
472       CacheEnvelope envelope = new CacheEnvelope(envelopeNode);
473 
474       envelope.getMetadataNode().put("generated", CacheDateUtils.formatRelativeTime(-age));
475 
476       return envelope;
477     }
478 
479 
480     private static ObjectNode createEnvelopeNode(JsonNode contentNode, int statusCode, int maxAge, List<CaravanHttpRequest> requests,
481         String cacheKey, String pipelineDescriptor, String reason, Map<String, String> contextProperties) {
482 
483       ObjectNode envelope = JacksonFunctions.emptyObject();
484       ObjectNode metadata = envelope.putObject(CACHE_METADATA_PROPERTY);
485 
486       metadata.put("cacheKey", cacheKey);
487       metadata.set("sources", JacksonFunctions.pojoToNode(getSourceServiceIds(requests)));
488       metadata.put("pipeline", pipelineDescriptor);
489       metadata.put("generated", CacheDateUtils.formatCurrentTime());
490       if (maxAge > 0) {
491         metadata.put("expires", CacheDateUtils.formatRelativeTime(maxAge));
492       }
493       metadata.put("statusCode", statusCode);
494 
495       List<String> sourcePaths = new ArrayList<String>();
496       for (CaravanHttpRequest req : requests) {
497         sourcePaths.add(StringUtils.substringBefore(req.getUrl(), "?"));
498       }
499       metadata.set("sourcePaths", JacksonFunctions.pojoToNode(sourcePaths));
500 
501 
502       if (StringUtils.isNotBlank(reason)) {
503         metadata.put("reason", reason);
504       }
505       metadata.set("contextProperties", JacksonFunctions.pojoToNode(contextProperties));
506       envelope.set(CACHE_CONTENT_PROPERTY, contentNode);
507 
508       return envelope;
509     }
510 
511 
512     /**
513      * @return the full envelope (as JSON string) to be stored in the cache
514      */
515     public String getEnvelopeString() {
516       return JacksonFunctions.nodeToString(envelopeNode);
517     }
518 
519     JsonNode getContentNode() {
520       return contentNode;
521     }
522 
523     ObjectNode getMetadataNode() {
524       return metadataNode;
525     }
526 
527     int getStatusCode() {
528       return metadataNode.at("/statusCode").asInt(HttpStatus.SC_OK);
529     }
530 
531     String getReasonString() {
532       return metadataNode.at("/reason").asText("Not Found");
533     }
534 
535     String getSources() {
536       return metadataNode.at("/sources").toString();
537     }
538 
539     int getResponseAge() {
540       return CacheDateUtils.getSecondsSince(getGeneratedDate());
541     }
542 
543     String getGeneratedDate() {
544       return metadataNode.at("/generated").asText();
545     }
546 
547     int getExpirySeconds() {
548       if (!metadataNode.has("expires")) {
549         return (int)TimeUnit.DAYS.toSeconds(365);
550       }
551       String expiryDate = metadataNode.at("/expires").asText();
552       return CacheDateUtils.getSecondsUntil(expiryDate);
553     }
554 
555     /**
556      * @param newDate Generated date
557      */
558     public void setGeneratedDate(String newDate) {
559       metadataNode.put("generated", newDate);
560     }
561 
562     /**
563      * @param newDate Expires date
564      */
565     public void setExpiresDate(String newDate) {
566       metadataNode.put("expires", newDate);
567     }
568   }
569 }