View Javadoc
1   /*
2    * #%L
3    * wcm.io
4    * %%
5    * Copyright (C) 2014 wcm.io
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package io.wcm.caravan.pipeline.impl.operators;
21  
22  import io.wcm.caravan.io.http.IllegalResponseRuntimeException;
23  import io.wcm.caravan.io.http.request.CaravanHttpRequest;
24  import io.wcm.caravan.pipeline.JsonPipelineExceptionHandler;
25  import io.wcm.caravan.pipeline.JsonPipelineInputException;
26  import io.wcm.caravan.pipeline.JsonPipelineOutput;
27  import io.wcm.caravan.pipeline.impl.JacksonFunctions;
28  import io.wcm.caravan.pipeline.impl.JsonPipelineOutputImpl;
29  
30  import java.util.List;
31  
32  import rx.Observable;
33  import rx.Observable.Operator;
34  import rx.Subscriber;
35  import rx.exceptions.Exceptions;
36  
37  /**
38   * An operator that delegates all non-fatal exception-handling to the given function, allowing the user of the
39   * pipeline to specify fallback content for certain expected exception scenarios.
40   */
41  public class HandleExceptionOperator implements Operator<JsonPipelineOutput, JsonPipelineOutput> {
42  
43    private final List<CaravanHttpRequest> requests;
44    private final JsonPipelineExceptionHandler handler;
45  
46    /**
47     * @param requests the outgoing REST request(s)
48     * @param handler the function to call when an exception is caught
49     */
50    public HandleExceptionOperator(List<CaravanHttpRequest> requests, JsonPipelineExceptionHandler handler) {
51      this.requests = requests;
52      this.handler = handler;
53    }
54  
55    @Override
56    public Subscriber<? super JsonPipelineOutput> call(Subscriber<? super JsonPipelineOutput> subscriber) {
57      return new Subscriber<JsonPipelineOutput>() {
58  
59        @Override
60        public void onCompleted() {
61          subscriber.onCompleted();
62        }
63  
64        @Override
65        public void onError(Throwable e) {
66  
67          Exceptions.throwIfFatal(e);
68  
69          // extract the HTTP status code from the exceptions known to contain such information
70          int statusCode = 500;
71          if (e instanceof JsonPipelineInputException) {
72            statusCode = ((JsonPipelineInputException)e).getStatusCode();
73          }
74          if (e instanceof IllegalResponseRuntimeException) {
75            statusCode = ((IllegalResponseRuntimeException)e).getResponseStatusCode();
76          }
77  
78          JsonPipelineOutput defaultFallbackContent = new JsonPipelineOutputImpl(JacksonFunctions.emptyObject(), requests)
79              .withStatusCode(statusCode)
80              .withMaxAge(0);
81  
82          try {
83            Observable<JsonPipelineOutput> fallbackResponse = handler.call(defaultFallbackContent, (RuntimeException)e);
84  
85            fallbackResponse.subscribe(subscriber);
86          }
87          catch (Throwable rethrown) {
88            subscriber.onError(rethrown);
89          }
90        }
91  
92        @Override
93        public void onNext(JsonPipelineOutput output) {
94          subscriber.onNext(output);
95        }
96      };
97    }
98  }