HandleExceptionOperator.java

/*
 * #%L
 * wcm.io
 * %%
 * Copyright (C) 2014 wcm.io
 * %%
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */
package io.wcm.caravan.pipeline.impl.operators;

import io.wcm.caravan.io.http.IllegalResponseRuntimeException;
import io.wcm.caravan.io.http.request.CaravanHttpRequest;
import io.wcm.caravan.pipeline.JsonPipelineExceptionHandler;
import io.wcm.caravan.pipeline.JsonPipelineInputException;
import io.wcm.caravan.pipeline.JsonPipelineOutput;
import io.wcm.caravan.pipeline.impl.JacksonFunctions;
import io.wcm.caravan.pipeline.impl.JsonPipelineOutputImpl;

import java.util.List;

import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.Exceptions;

/**
 * An operator that delegates all non-fatal exception-handling to the given function, allowing the user of the
 * pipeline to specify fallback content for certain expected exception scenarios.
 */
public class HandleExceptionOperator implements Operator<JsonPipelineOutput, JsonPipelineOutput> {

  private final List<CaravanHttpRequest> requests;
  private final JsonPipelineExceptionHandler handler;

  /**
   * @param requests the outgoing REST request(s)
   * @param handler the function to call when an exception is caught
   */
  public HandleExceptionOperator(List<CaravanHttpRequest> requests, JsonPipelineExceptionHandler handler) {
    this.requests = requests;
    this.handler = handler;
  }

  @Override
  public Subscriber<? super JsonPipelineOutput> call(Subscriber<? super JsonPipelineOutput> subscriber) {
    return new Subscriber<JsonPipelineOutput>() {

      @Override
      public void onCompleted() {
        subscriber.onCompleted();
      }

      @Override
      public void onError(Throwable e) {

        Exceptions.throwIfFatal(e);

        // extract the HTTP status code from the exceptions known to contain such information
        int statusCode = 500;
        if (e instanceof JsonPipelineInputException) {
          statusCode = ((JsonPipelineInputException)e).getStatusCode();
        }
        if (e instanceof IllegalResponseRuntimeException) {
          statusCode = ((IllegalResponseRuntimeException)e).getResponseStatusCode();
        }

        JsonPipelineOutput defaultFallbackContent = new JsonPipelineOutputImpl(JacksonFunctions.emptyObject(), requests)
            .withStatusCode(statusCode)
            .withMaxAge(0);

        try {
          Observable<JsonPipelineOutput> fallbackResponse = handler.call(defaultFallbackContent, (RuntimeException)e);

          fallbackResponse.subscribe(subscriber);
        }
        catch (Throwable rethrown) {
          subscriber.onError(rethrown);
        }
      }

      @Override
      public void onNext(JsonPipelineOutput output) {
        subscriber.onNext(output);
      }
    };
  }
}