HandleExceptionOperator.java
/*
* #%L
* wcm.io
* %%
* Copyright (C) 2014 wcm.io
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package io.wcm.caravan.pipeline.impl.operators;
import io.wcm.caravan.io.http.IllegalResponseRuntimeException;
import io.wcm.caravan.io.http.request.CaravanHttpRequest;
import io.wcm.caravan.pipeline.JsonPipelineExceptionHandler;
import io.wcm.caravan.pipeline.JsonPipelineInputException;
import io.wcm.caravan.pipeline.JsonPipelineOutput;
import io.wcm.caravan.pipeline.impl.JacksonFunctions;
import io.wcm.caravan.pipeline.impl.JsonPipelineOutputImpl;
import java.util.List;
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.Exceptions;
/**
* An operator that delegates all non-fatal exception-handling to the given function, allowing the user of the
* pipeline to specify fallback content for certain expected exception scenarios.
*/
public class HandleExceptionOperator implements Operator<JsonPipelineOutput, JsonPipelineOutput> {
private final List<CaravanHttpRequest> requests;
private final JsonPipelineExceptionHandler handler;
/**
* @param requests the outgoing REST request(s)
* @param handler the function to call when an exception is caught
*/
public HandleExceptionOperator(List<CaravanHttpRequest> requests, JsonPipelineExceptionHandler handler) {
this.requests = requests;
this.handler = handler;
}
@Override
public Subscriber<? super JsonPipelineOutput> call(Subscriber<? super JsonPipelineOutput> subscriber) {
return new Subscriber<JsonPipelineOutput>() {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
Exceptions.throwIfFatal(e);
// extract the HTTP status code from the exceptions known to contain such information
int statusCode = 500;
if (e instanceof JsonPipelineInputException) {
statusCode = ((JsonPipelineInputException)e).getStatusCode();
}
if (e instanceof IllegalResponseRuntimeException) {
statusCode = ((IllegalResponseRuntimeException)e).getResponseStatusCode();
}
JsonPipelineOutput defaultFallbackContent = new JsonPipelineOutputImpl(JacksonFunctions.emptyObject(), requests)
.withStatusCode(statusCode)
.withMaxAge(0);
try {
Observable<JsonPipelineOutput> fallbackResponse = handler.call(defaultFallbackContent, (RuntimeException)e);
fallbackResponse.subscribe(subscriber);
}
catch (Throwable rethrown) {
subscriber.onError(rethrown);
}
}
@Override
public void onNext(JsonPipelineOutput output) {
subscriber.onNext(output);
}
};
}
}