HalCrawler.java

/*
 * #%L
 * wcm.io
 * %%
 * Copyright (C) 2014 wcm.io
 * %%
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */
package io.wcm.caravan.pipeline.extensions.hal.crawler;

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.osgi.annotation.versioning.ProviderType;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Sets;

import io.wcm.caravan.hal.resource.HalResource;
import io.wcm.caravan.hal.resource.Link;
import io.wcm.caravan.io.http.request.CaravanHttpRequest;
import io.wcm.caravan.pipeline.JsonPipeline;
import io.wcm.caravan.pipeline.JsonPipelineAction;
import io.wcm.caravan.pipeline.JsonPipelineContext;
import io.wcm.caravan.pipeline.JsonPipelineOutput;
import io.wcm.caravan.pipeline.cache.CacheStrategy;
import io.wcm.caravan.pipeline.extensions.hal.client.HalClient;
import io.wcm.caravan.pipeline.extensions.hal.client.action.LoadLink;
import rx.Observable;

/**
 * Crawler walking on a HAL resource graph.
 */
@ProviderType
public final class HalCrawler implements JsonPipelineAction {

  /**
   * HTTP header for HAL link relation
   */
  public static final String HEADER_CRAWLER_RELATION = "Caravan-Crawler-HAL-Link-Relation";

  private final Set<String> startedUrls = Sets.newConcurrentHashSet();
  private final Set<String> processedUrls = Sets.newConcurrentHashSet();

  private final HalClient client;
  private final LinkExtractor linkExtractor;
  private final UriParametersProvider uriParametersProvider;
  private final OutputProcessor outputProcessor;
  private final StopCriterion stopCriterion;

  private CacheStrategy cacheStrategy;

  /**
   * @param client HAL client
   * @param linkExtractor Link extractor
   * @param uriParametersProvider URI parameter provider
   * @param outputProcessor Output processor
   */
  public HalCrawler(HalClient client, LinkExtractor linkExtractor, UriParametersProvider uriParametersProvider, OutputProcessor outputProcessor) {
    this.client = client;
    this.linkExtractor = linkExtractor;
    this.uriParametersProvider = uriParametersProvider;
    this.outputProcessor = outputProcessor;
    this.stopCriterion = StopCriteria.alwaysEnabled();

  }

  /**
   * @param client HAL client
   * @param linkExtractor Link extractor
   * @param uriParametersProvider URI parameter provider
   * @param outputProcessor Output processor
   * @param stopCriterion Stop Criterion
   */
  public HalCrawler(HalClient client, LinkExtractor linkExtractor, UriParametersProvider uriParametersProvider, OutputProcessor outputProcessor,
      StopCriterion stopCriterion) {
    this.client = client;
    this.linkExtractor = linkExtractor;
    this.uriParametersProvider = uriParametersProvider;
    this.outputProcessor = outputProcessor;
    this.stopCriterion = stopCriterion;

  }

  /**
   * @param strategy The cacheStrategy to set.
   * @return This crawler
   */
  public HalCrawler setCacheStrategy(CacheStrategy strategy) {
    this.cacheStrategy = strategy;
    return this;
  }

  @Override
  public String getId() {
    return "HAL-CRAWLER-(" + linkExtractor.getId() + '-' + uriParametersProvider.getId() + '-' + outputProcessor.getId() + ')';
  }

  @Override
  public Observable<JsonPipelineOutput> execute(JsonPipelineOutput previousStepOutput, JsonPipelineContext pipelineContext) {

    String currentUrl = getCurrentUrl(previousStepOutput);
    processedUrls.add(currentUrl);
    JsonPipeline startingPipeline = pipelineContext.getFactory().createEmpty(pipelineContext.getProperties());

    HalResource currentHalResource = getCurrentHalResource(previousStepOutput, currentUrl);
    ListMultimap<String, Link> links = linkExtractor.extract(currentHalResource);


    return Observable.from(links.entries())
        // create pipeline action
        .map(entry -> {
          String relation = entry.getKey();
          Link link = entry.getValue();
          Map<String, Object> parameters = uriParametersProvider.getParameters(currentHalResource, relation, link);
          LoadLink action = client.load(link, parameters);
          action.setHttpHeaders(ImmutableMultimap.of(HEADER_CRAWLER_RELATION, relation));
          if (cacheStrategy != null) {
            action.setCacheStrategy(cacheStrategy);
          }
          return action;
        })
        // filter unique by URL
        .distinct(action -> action.getUrl())
        // filter already processed URLs
        .filter(action -> !startedUrls.contains(action.getUrl()) && !processedUrls.contains(action.getUrl()))
        // filter actions for stopped crawler
        .filter(action -> !stopCriterion.isStopRequested())
        // add URL to processed and create pipeline
        .map(action -> {
          startedUrls.add(action.getUrl());
          return startingPipeline.applyAction(action);
        })
        // add this action to the pipeline
        .map(pipeline -> pipeline.applyAction(this))
        // get pipeline outputs
        .flatMap(JsonPipeline::getOutput)
        // get pipeline outputs list
        .toList()
        // process output
        .map(linkOutputs -> outputProcessor.process(previousStepOutput, linkOutputs));

  }

  private String getCurrentUrl(JsonPipelineOutput output) {

    List<CaravanHttpRequest> requests = output.getRequests();
    if (requests.isEmpty()) {
      return "unknown";
    }
    CaravanHttpRequest request = requests.get(requests.size() - 1);
    return request.getUrl();

  }

  private HalResource getCurrentHalResource(JsonPipelineOutput previousStepOutput, String currentUrl) {

    JsonNode json = previousStepOutput.getPayload();
    if (!(json instanceof ObjectNode)) {
      HalResource hal = new HalResource(currentUrl);
      hal.getModel().set("content", json);
      return hal;
    }
    return new HalResource(json);

  }

  /**
   * @return true if crawling has been stopped by the {@link StopCriterion}
   */
  public boolean isStopRequested() {
    return stopCriterion.isStopRequested();
  }

}