JsonPipelineFactoryImpl.java
/*
* #%L
* wcm.io
* %%
* Copyright (C) 2014 wcm.io
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package io.wcm.caravan.pipeline.impl;
import io.wcm.caravan.io.http.CaravanHttpClient;
import io.wcm.caravan.io.http.request.CaravanHttpRequest;
import io.wcm.caravan.io.http.request.CaravanHttpRequestBuilder;
import io.wcm.caravan.io.http.response.CaravanHttpResponse;
import io.wcm.caravan.io.http.response.CaravanHttpResponseBuilder;
import io.wcm.caravan.pipeline.JsonPipeline;
import io.wcm.caravan.pipeline.JsonPipelineFactory;
import io.wcm.caravan.pipeline.cache.spi.CacheAdapter;
import io.wcm.caravan.pipeline.impl.cache.MultiLayerCacheAdapter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.RankedServices;
import rx.Observable;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableListMultimap;
/**
* Default implementation of {@link JsonPipelineFactory}.
*/
@Component
@Service(JsonPipelineFactory.class)
public final class JsonPipelineFactoryImpl implements JsonPipelineFactory {
@Reference
private CaravanHttpClient transport;
@Reference(name = "cacheAdapter", referenceInterface = CacheAdapter.class,
cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, policy = ReferencePolicy.DYNAMIC)
private final RankedServices<CacheAdapter> cacheAdapters = new RankedServices<>();
@Reference
private MetricRegistry metricRegistry;
/** constructor used in a OSGi context */
public JsonPipelineFactoryImpl() {
// empty constructor
}
/**
* explicit dependency injection (to be used in unit-tests)
* @param transport the implementation to use to fetch responses
* @param metricRegistry for gathering monitoring statistics
*/
public JsonPipelineFactoryImpl(CaravanHttpClient transport, MetricRegistry metricRegistry) {
this.transport = transport;
this.metricRegistry = metricRegistry;
}
@Override
public JsonPipeline create(final CaravanHttpRequest request) {
return create(request, Collections.emptyMap());
}
@Override
public JsonPipeline create(final CaravanHttpRequest request, Map<String, String> contextProperties) {
// note that #execute will *not* actually start the request, but just create an observable that will initiate
// the request when #subscribe is called on the pipeline's output observable
Observable<CaravanHttpResponse> response = transport.execute(request);
return new JsonPipelineImpl(request, response,
new JsonPipelineContextImpl(this, createMultiLayerCacheAdapter(), metricRegistry, contextProperties));
}
@Override
public JsonPipeline createEmpty() {
return createEmpty(Collections.emptyMap());
}
@Override
public JsonPipeline createEmpty(Map<String, String> contextProperties) {
CaravanHttpRequest dummyRequest = new CaravanHttpRequestBuilder("").build();
// make sure to set a Cache-Control header to mark the empty response as indefinitely cacheable,
// otherwise the default max-age value of zero would become effective
ImmutableListMultimap<String, String> headers = ImmutableListMultimap.of("Cache-Control", "max-age=" + Long.toString(TimeUnit.DAYS.toSeconds(365)));
CaravanHttpResponse emptyJsonResponse = new CaravanHttpResponseBuilder()
.status(200)
.reason("OK")
.headers(headers)
.body("{}", Charset.forName("UTF-8"))
.build();
return new JsonPipelineImpl(dummyRequest, Observable.just(emptyJsonResponse),
new JsonPipelineContextImpl(this, createMultiLayerCacheAdapter(), metricRegistry, contextProperties));
}
MultiLayerCacheAdapter createMultiLayerCacheAdapter() {
return new MultiLayerCacheAdapter(new ArrayList<CacheAdapter>(cacheAdapters.get()));
}
void bindCacheAdapter(CacheAdapter service, Map<String, Object> props) {
cacheAdapters.bind(service, props);
}
void unbindCacheAdapter(CacheAdapter service, Map<String, Object> props) {
cacheAdapters.unbind(service, props);
}
}