1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package io.wcm.caravan.pipeline.impl.operators;
21
22 import io.wcm.caravan.io.http.IllegalResponseRuntimeException;
23 import io.wcm.caravan.io.http.request.CaravanHttpRequest;
24 import io.wcm.caravan.pipeline.JsonPipelineExceptionHandler;
25 import io.wcm.caravan.pipeline.JsonPipelineInputException;
26 import io.wcm.caravan.pipeline.JsonPipelineOutput;
27 import io.wcm.caravan.pipeline.impl.JacksonFunctions;
28 import io.wcm.caravan.pipeline.impl.JsonPipelineOutputImpl;
29
30 import java.util.List;
31
32 import rx.Observable;
33 import rx.Observable.Operator;
34 import rx.Subscriber;
35 import rx.exceptions.Exceptions;
36
37
38
39
40
41 public class HandleExceptionOperator implements Operator<JsonPipelineOutput, JsonPipelineOutput> {
42
43 private final List<CaravanHttpRequest> requests;
44 private final JsonPipelineExceptionHandler handler;
45
46
47
48
49
50 public HandleExceptionOperator(List<CaravanHttpRequest> requests, JsonPipelineExceptionHandler handler) {
51 this.requests = requests;
52 this.handler = handler;
53 }
54
55 @Override
56 public Subscriber<? super JsonPipelineOutput> call(Subscriber<? super JsonPipelineOutput> subscriber) {
57 return new Subscriber<JsonPipelineOutput>() {
58
59 @Override
60 public void onCompleted() {
61 subscriber.onCompleted();
62 }
63
64 @Override
65 public void onError(Throwable e) {
66
67 Exceptions.throwIfFatal(e);
68
69
70 int statusCode = 500;
71 if (e instanceof JsonPipelineInputException) {
72 statusCode = ((JsonPipelineInputException)e).getStatusCode();
73 }
74 if (e instanceof IllegalResponseRuntimeException) {
75 statusCode = ((IllegalResponseRuntimeException)e).getResponseStatusCode();
76 }
77
78 JsonPipelineOutput defaultFallbackContent = new JsonPipelineOutputImpl(JacksonFunctions.emptyObject(), requests)
79 .withStatusCode(statusCode)
80 .withMaxAge(0);
81
82 try {
83 Observable<JsonPipelineOutput> fallbackResponse = handler.call(defaultFallbackContent, (RuntimeException)e);
84
85 fallbackResponse.subscribe(subscriber);
86 }
87 catch (Throwable rethrown) {
88 subscriber.onError(rethrown);
89 }
90 }
91
92 @Override
93 public void onNext(JsonPipelineOutput output) {
94 subscriber.onNext(output);
95 }
96 };
97 }
98 }