JsonPipelineImpl.java

/*
 * #%L
 * wcm.io
 * %%
 * Copyright (C) 2014 wcm.io
 * %%
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */
package io.wcm.caravan.pipeline.impl;

import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.Validate.isTrue;
import io.wcm.caravan.common.performance.PerformanceMetrics;
import io.wcm.caravan.io.http.CaravanHttpClient;
import io.wcm.caravan.io.http.request.CaravanHttpRequest;
import io.wcm.caravan.io.http.response.CaravanHttpResponse;
import io.wcm.caravan.pipeline.JsonPipeline;
import io.wcm.caravan.pipeline.JsonPipelineAction;
import io.wcm.caravan.pipeline.JsonPipelineContext;
import io.wcm.caravan.pipeline.JsonPipelineExceptionHandler;
import io.wcm.caravan.pipeline.JsonPipelineOutput;
import io.wcm.caravan.pipeline.cache.CachePersistencyOptions;
import io.wcm.caravan.pipeline.cache.CacheStrategy;
import io.wcm.caravan.pipeline.impl.operators.AssertExistsOperator;
import io.wcm.caravan.pipeline.impl.operators.CachePointTransformer;
import io.wcm.caravan.pipeline.impl.operators.CollectOperator;
import io.wcm.caravan.pipeline.impl.operators.ExtractOperator;
import io.wcm.caravan.pipeline.impl.operators.HandleExceptionOperator;
import io.wcm.caravan.pipeline.impl.operators.MergeTransformer;
import io.wcm.caravan.pipeline.impl.operators.ResponseHandlingOperator;

import java.util.LinkedList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;

import com.fasterxml.jackson.databind.JsonNode;

/**
 * Default implementation of {@link JsonPipeline}.
 * Each new JsonPipeline instance created by this implementation provides a reuse of data, received after the first
 * subscription call. Reuse should reduce the number of actual calls to external resources while execution of pipeline
 * by multiple subscribers.
 */
public final class JsonPipelineImpl implements JsonPipeline {

  private static final Logger log = LoggerFactory.getLogger(JsonPipelineImpl.class);

  private final SortedSet<String> sourceServiceIds = new TreeSet<String>();
  private final List<CaravanHttpRequest> requests = new LinkedList<CaravanHttpRequest>();
  private JsonPipelineContextImpl context;
  private String descriptor;
  private Observable<JsonPipelineOutput> observable;
  private PerformanceMetrics performanceMetrics;

  /**
   * @param request the REST request that provides the source data
   * @param responseObservable the response observable obtained by the {@link CaravanHttpClient}
   * @param context preinitialized JSON pipeline context
   */
  public JsonPipelineImpl(final CaravanHttpRequest request, final Observable<CaravanHttpResponse> responseObservable, final JsonPipelineContextImpl context) {
    if (isNotBlank(request.getServiceId())) {
      this.sourceServiceIds.add(request.getServiceId());
    }
    this.requests.add(request);
    this.descriptor = isNotBlank(request.getUrl()) ? "GET(//" + request.getServiceId() + request.getUrl() + ")" : "EMPTY()";
    this.observable = responseObservable.lift(new ResponseHandlingOperator(request)).cache();
    this.context = context;
    if (request.getPerformanceMetrics() != null) {
      this.performanceMetrics = request.getPerformanceMetrics().createNext(isNotBlank(request.getUrl()) ? "GET" : "EMPTY", descriptor);
    } else {
      this.performanceMetrics = PerformanceMetrics.createNew(isNotBlank(request.getUrl()) ? "GET" : "EMPTY", descriptor, request.getCorrelationId());
    }
    this.observable = this.observable
        .doOnSubscribe(this.performanceMetrics.getStartAction())
        .doOnNext(this.performanceMetrics.getOnNextAction())
        .doOnTerminate(this.performanceMetrics.getEndAction());

  }

  private JsonPipelineImpl() {
    // only used internally
  }

  JsonPipelineImpl cloneWith(Observable<JsonPipelineOutput> newObservable, String descriptorSuffix, String action) {
    return cloneWith(newObservable, descriptorSuffix, action, null);
  }

  JsonPipelineImpl cloneWith(Observable<JsonPipelineOutput> newObservable, String descriptorSuffix, String action, Class actionClass) {
    JsonPipelineImpl clone = new JsonPipelineImpl();
    clone.sourceServiceIds.addAll(this.sourceServiceIds);
    clone.requests.addAll(this.requests);

    clone.descriptor = this.descriptor;
    if (StringUtils.isNotBlank(descriptorSuffix)) {
      clone.descriptor += "+" + descriptorSuffix;
    }

    clone.observable = newObservable.cache();
    clone.context = context;
    clone.performanceMetrics = performanceMetrics.createNext(action, clone.descriptor, actionClass);
    clone.observable = clone.observable
        .doOnSubscribe(clone.performanceMetrics.getStartAction())
        .doOnNext(clone.performanceMetrics.getOnNextAction())
        .doOnTerminate(clone.performanceMetrics.getEndAction());

    return clone;
  }

  @Override
  public String getDescriptor() {
    return descriptor;
  }

  @Override
  public SortedSet<String> getSourceServices() {
    return this.sourceServiceIds;
  }

  @Override
  public List<CaravanHttpRequest> getRequests() {
    return this.requests;
  }

  @Override
  public PerformanceMetrics getPerformanceMetrics() {
    return this.performanceMetrics;
  }

  @Override
  public JsonPipeline assertExists(String jsonPath, int statusCode, String msg) {

    Observable<JsonPipelineOutput> assertingObservable = observable.lift(new AssertExistsOperator(jsonPath, statusCode, msg));

    return cloneWith(assertingObservable, null, "ASSERT_EXISTS");
  }

  @Override
  public JsonPipeline extract(String jsonPath) {

    Observable<JsonPipelineOutput> extractingObservable = observable.lift(new ExtractOperator(jsonPath, null));
    String transformationDesc = "EXTRACT(" + jsonPath + ")";

    return cloneWith(extractingObservable, transformationDesc, "EXTRACT");
  }

  @Override
  public JsonPipeline extract(String jsonPath, String targetProperty) {

    isTrue(isNotBlank(targetProperty), "Target property is '" + targetProperty
        + "'. Please provide meaningfull targetProperty or use another extract method wothout targetProperty parameter, if any targetProperty isn't required.");

    Observable<JsonPipelineOutput> extractingObservable = observable.lift(new ExtractOperator(jsonPath, targetProperty));
    String transformationDesc = "EXTRACT(" + jsonPath + " INTO " + targetProperty + ")";

    return cloneWith(extractingObservable, transformationDesc, "EXTRACT");
  }

  @Override
  public JsonPipeline collect(String jsonPath) {

    Observable<JsonPipelineOutput> collectingObservable = observable.lift(new CollectOperator(jsonPath, null));
    String transformationDesc = "COLLECT(" + jsonPath + ")";

    return cloneWith(collectingObservable, transformationDesc, "COLLECT");
  }

  @Override
  public JsonPipeline collect(String jsonPath, String targetProperty) {

    isTrue(isNotBlank(targetProperty), "Target property is '" + targetProperty
        + "'. Please provide meaningfull targetProperty or use another collect method wothout targetProperty parameter, if any targetProperty isn't required.");

    Observable<JsonPipelineOutput> collectingObservable = observable.lift(new CollectOperator(jsonPath, targetProperty));
    String transformationDesc = "COLLECT(" + jsonPath + " INTO " + targetProperty + ")";

    return cloneWith(collectingObservable, transformationDesc, "COLLECT");
  }

  @Override
  public JsonPipeline merge(JsonPipeline secondarySource) {

    MergeTransformer transformer = new MergeTransformer(descriptor, secondarySource.getOutput(), null);
    Observable<JsonPipelineOutput> mergedObservable = observable.compose(transformer);
    String transformationDesc = "MERGE(" + secondarySource.getDescriptor() + ")";

    JsonPipelineImpl mergedPipeline = cloneWith(mergedObservable, transformationDesc, "MERGE");
    mergedPipeline.sourceServiceIds.addAll(secondarySource.getSourceServices());
    mergedPipeline.requests.addAll(secondarySource.getRequests());

    return mergedPipeline;
  }

  @Override
  public JsonPipeline merge(JsonPipeline secondarySource, String targetProperty) {

    isTrue(isNotBlank(targetProperty), "Target property is '" + targetProperty
        + "'. Please provide meaningfull targetProperty or use another merge method wothout targetProperty parameter, if any targetProperty isn't required.");

    MergeTransformer transformer = new MergeTransformer(descriptor, secondarySource.getOutput(), targetProperty);
    Observable<JsonPipelineOutput> mergedObservable = observable.compose(transformer);
    String transformationDesc = "MERGE(" + secondarySource.getDescriptor() + " INTO " + targetProperty + ")";

    JsonPipelineImpl mergedPipeline = cloneWith(mergedObservable, transformationDesc, "MERGE");
    mergedPipeline.sourceServiceIds.addAll(secondarySource.getSourceServices());
    mergedPipeline.requests.addAll(secondarySource.getRequests());

    return mergedPipeline;
  }

  @Override
  public JsonPipeline applyAction(JsonPipelineAction action) {
    String actionDesc = "ACTION(" + action.getId() + ")";

    Observable<JsonPipelineOutput> transformedObservable = observable.flatMap(output -> {
      try {
        return action.execute(output, context);
      }
      catch (Throwable e) {
        log.error("Failed to execute action " + action.getId(), e);
        return Observable.error(e);
      }
    });

    return cloneWith(transformedObservable, actionDesc, "ACTION", action.getClass());
  }

  @Override
  public JsonPipeline addCachePoint(CacheStrategy strategy) {
    CachePersistencyOptions options = strategy.getCachePersistencyOptions(requests);
    // skip all caching logic if the expiry time or refresh interval for this request is 0
    if (!options.isCacheable()) {
      return this;
    }

    CachePointTransformer transformer = new CachePointTransformer(context, requests, descriptor, strategy);
    Observable<JsonPipelineOutput> cachingObservable = observable.compose(transformer);

    return cloneWith(cachingObservable, null, "ADD_CACHEPOINT");
  }

  @Override
  public JsonPipeline handleException(JsonPipelineExceptionHandler handler) {

    Observable<JsonPipelineOutput> exceptionHandlingObservable = observable.lift(new HandleExceptionOperator(requests, handler));

    return cloneWith(exceptionHandlingObservable, null, "HANDLE_EXCEPTION");
  }

  @Override
  public Observable<JsonPipelineOutput> getOutput() {
    return observable.map(o -> o);
  }

  @Override
  public Observable<JsonNode> getJsonOutput() {
    return observable.map(model -> model.getPayload());
  }

  @Override
  public Observable<String> getStringOutput() {
    return getJsonOutput().map(JacksonFunctions::nodeToString);
  }

  JsonPipelineContext getJsonPipelineContext() {
    return this.context;
  }

}