CacheControlUtils.java
/*
* #%L
* wcm.io
* %%
* Copyright (C) 2014 wcm.io
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package io.wcm.caravan.pipeline.cache;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.osgi.annotation.versioning.ProviderType;
import io.wcm.caravan.pipeline.JsonPipeline;
import io.wcm.caravan.pipeline.JsonPipelineOutput;
import rx.Observable;
import rx.functions.Func1;
import rx.observers.SafeSubscriber;
/**
* Cache control utilities aid to manage pipeline output cache control meta data.
*/
@ProviderType
public final class CacheControlUtils {
private CacheControlUtils() {
// static methods only
}
/**
* @param pipelineOutputs Pipeline outputs with {@code max-age} values
* @return the lowest max-age value of all the given pipeline outputs
*/
public static int getLowestMaxAge(Iterable<JsonPipelineOutput> pipelineOutputs) {
return Observable
.from(pipelineOutputs)
.filter(output -> output != null)
.map(output -> output.getMaxAge())
.reduce(Math::min)
.toBlocking().single();
}
/**
* @param pipelineOutputs Pipeline outputs with {@code max-age} values
* @return the lowest max-age value of all the given pipeline outputs
*/
public static int getLowestMaxAge(JsonPipelineOutput... pipelineOutputs) {
return getLowestMaxAge(Arrays.asList(pipelineOutputs));
}
/**
* Aggregates multiple resources fetched with different {@link JsonPipeline} instances, into a single
* {@link JsonPipelineOutput} and ensures sure that the max-age Cache-Control-Header is set to the minimum value of
* all aggregated responses.
* @param pipelines an observable that emits MULTIPLE {@link JsonPipeline}s
* @param zipFunc a lambda that is given the list of all {@link JsonPipelineOutput}s when they have been retrieved
* @return a new observable that emits the aggregated JsonPipelineOutput with the correct max-age
*/
public static Observable<JsonPipelineOutput> zipWithLowestMaxAge(
Observable<JsonPipeline> pipelines,
Func1<List<JsonPipelineOutput>, JsonPipelineOutput> zipFunc) {
Observable<Observable<JsonPipelineOutput>> outputs = pipelines.map(pipeline -> pipeline.getOutput());
return zipWithLowestMaxAgeInternal(outputs, zipFunc);
}
/**
* Aggregates multiple resources fetched with different {@link JsonPipeline} instances, into a single
* {@link JsonPipelineOutput} and ensures sure that the max-age Cache-Control-Header is set to the minimum value of
* all aggregated responses.
* @param pipelines the {@link JsonPipeline}s to zip
* @param zipFunc a lambda that is given the list of all {@link JsonPipelineOutput}s when they have been retrieved
* @return a new observable that emits the aggregated JsonPipelineOutput with the correct max-age
*/
public static Observable<JsonPipelineOutput> zipWithLowestMaxAge(
Iterable<JsonPipeline> pipelines,
Func1<List<JsonPipelineOutput>, JsonPipelineOutput> zipFunc) {
Observable<Observable<JsonPipelineOutput>> outputs = Observable.from(pipelines).map(pipeline -> pipeline.getOutput());
return zipWithLowestMaxAgeInternal(outputs, zipFunc);
}
private static Observable<JsonPipelineOutput> zipWithLowestMaxAgeInternal(
Observable<Observable<JsonPipelineOutput>> multipleOutputs,
Func1<List<JsonPipelineOutput>, JsonPipelineOutput> zipFunc) {
// this method could just return the zippingObservable created below, but this lead to an odd bug when this
// method was called with an empty "multipleOutputs" observable. In that case, the OperatorZip called
// #onCompleted twice (is this an implementation bug in rx.java?), which was causing weird behaviour
// if #defaultIfEmpty was used on the zipped result (the default value was emitted twice)
// that behaviour can be avoided by adding a SafeSubscriber layer
return Observable.create(subscriber -> {
Observable<JsonPipelineOutput> zippingObservable = Observable.zip(multipleOutputs, (arrayOfOutputs) -> {
// collect the JsonPipelineOutput instances in a generic list (casting is required since FuncN only gives as a Object[])
List<JsonPipelineOutput> listOfOutputs = new ArrayList<>(arrayOfOutputs.length);
for (Object o : arrayOfOutputs) {
listOfOutputs.add((JsonPipelineOutput)o);
}
// calculate the zipping function to produce the aggregated resposne from all JsonPipelineOutputs
JsonPipelineOutput zippedOutput = zipFunc.call(listOfOutputs);
// then update the max age of the overall output with the lowest max-age values of all outputs in the list
int lowestMaxAge = getLowestMaxAge(listOfOutputs);
return zippedOutput.withMaxAge(lowestMaxAge);
});
zippingObservable.subscribe(new SafeSubscriber<JsonPipelineOutput>(subscriber));
});
}
}