CouchbaseCacheAdapter.java
/*
* #%L
* wcm.io
* %%
* Copyright (C) 2014 wcm.io
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package io.wcm.caravan.pipeline.cache.couchbase.impl;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.nosql.couchbase.client.CouchbaseClient;
import org.apache.sling.nosql.couchbase.client.CouchbaseKey;
import org.osgi.framework.Constants;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.codahale.metrics.health.HealthCheck;
import com.codahale.metrics.health.HealthCheckRegistry;
import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.document.RawJsonDocument;
import io.wcm.caravan.commons.metrics.rx.HitsAndMissesCountingMetricsOperator;
import io.wcm.caravan.commons.metrics.rx.TimerMetricsOperator;
import io.wcm.caravan.pipeline.cache.CachePersistencyOptions;
import io.wcm.caravan.pipeline.cache.spi.CacheAdapter;
import rx.Observable;
import rx.Observer;
/**
* {@link CacheAdapter} implementation for Couchbase.
*/
@Component(immediate = true, metatype = true,
label = "wcm.io Caravan Pipeline Cache Adapter for Couchbase",
description = "Configure pipeline caching in couchbase.")
@Service(CacheAdapter.class)
public class CouchbaseCacheAdapter implements CacheAdapter {
static final String COUCHBASE_CLIENT_ID = "caravan-pipeline-cacheadapter-couchbase";
@Property(label = "Service Ranking",
description = "Used to determine the of caching layers if you are using multiple Cache Adapters. "
+ "Fast system-internal caches should have a lower service than slower network caches, so that they are queried first.",
intValue = CouchbaseCacheAdapter.DEFAULT_RANKING,
propertyPrivate = false)
static final String PROPERTY_RANKING = Constants.SERVICE_RANKING;
static final int DEFAULT_RANKING = 2000;
@Property(label = "Cache Key Prefix", description = "Prefix for caching keys.",
value = CouchbaseCacheAdapter.CACHE_KEY_PREFIX_DEFAULT)
static final String CACHE_KEY_PREFIX_PROPERTY = "cacheKeyPrefix";
private static final String CACHE_KEY_PREFIX_DEFAULT = "json-pipeline:";
@Property(label = "Cache Timeout", description = "Timeout in ms for Coucbase cache operations.",
intValue = CouchbaseCacheAdapter.CACHE_TIMEOUT_DEFAULT)
static final String CACHE_TIMEOUT_PROPERTY = "cacheTimeout";
private static final int CACHE_TIMEOUT_DEFAULT = 1000;
@Property(label = "Cache Writable",
description = "Determines if this system should be allowed to write into the couchbase cache. If disabled it will only be able to read existing entries.",
boolValue = CouchbaseCacheAdapter.CACHE_WRITABLE_DEFAULT)
static final String CACHE_WRITABLE_PROPERTY = "cacheWritable";
private static final boolean CACHE_WRITABLE_DEFAULT = true;
@Property(label = "Cache Isolated",
description = "If enabled, this system's hostname will be appended to all cache keys. " +
"This can be used in development or test environments to share the same couchbase bucket without actually sharing any cached data",
boolValue = CouchbaseCacheAdapter.CACHE_ISOLATED_DEFAULT)
static final String CACHE_ISOLATED_PROPERTY = "cacheIsolated";
private static final boolean CACHE_ISOLATED_DEFAULT = false;
@Property(label = "Enabled",
description = "Enables or disables the whole cache adapter and all operations.",
boolValue = CouchbaseCacheAdapter.CACHE_ENABLED_DEFAULT)
static final String CACHE_ENABLED_PROPERTY = "enabled";
private static final boolean CACHE_ENABLED_DEFAULT = true;
private static final Logger log = LoggerFactory.getLogger(CouchbaseCacheAdapter.class);
@Reference(target = "(" + CouchbaseClient.CLIENT_ID_PROPERTY + "=" + COUCHBASE_CLIENT_ID + ")")
private CouchbaseClient couchbaseClient;
@Reference
private MetricRegistry metricRegistry;
private Timer getLatencyTimer;
private Timer putLatencyTimer;
private Counter hitsCounter;
private Counter missesCounter;
@Reference
private HealthCheckRegistry healthCheckRegistry;
private String keyPrefix;
private int timeout;
private boolean writable;
private boolean isolated;
private boolean enabled;
@Activate
private void activate(ComponentContext componentContext, Map<String, Object> config) {
keyPrefix = PropertiesUtil.toString(config.get(CACHE_KEY_PREFIX_PROPERTY), CACHE_KEY_PREFIX_DEFAULT);
timeout = PropertiesUtil.toInteger(config.get(CACHE_TIMEOUT_PROPERTY), CACHE_TIMEOUT_DEFAULT);
writable = PropertiesUtil.toBoolean(config.get(CACHE_WRITABLE_PROPERTY), CACHE_WRITABLE_DEFAULT);
isolated = PropertiesUtil.toBoolean(config.get(CACHE_ISOLATED_PROPERTY), CACHE_ISOLATED_DEFAULT);
enabled = PropertiesUtil.toBoolean(config.get(CACHE_ENABLED_PROPERTY), CACHE_ENABLED_DEFAULT);
getLatencyTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "latency", "get"));
putLatencyTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "latency", "put"));
hitsCounter = metricRegistry.counter(MetricRegistry.name(getClass(), "hits"));
missesCounter = metricRegistry.counter(MetricRegistry.name(getClass(), "misses"));
healthCheckRegistry.register(MetricRegistry.name(getClass()), new HealthCheck() {
@Override
protected Result check() throws Exception {
return couchbaseClient != null && couchbaseClient.isEnabled() ? Result.healthy() : Result.unhealthy("No cache bucket");
}
});
}
@Deactivate
private void deactivate(ComponentContext componentContext) {
metricRegistry.remove(MetricRegistry.name(getClass(), "latency", "get"));
metricRegistry.remove(MetricRegistry.name(getClass(), "latency", "put"));
metricRegistry.remove(MetricRegistry.name(getClass(), "hits"));
metricRegistry.remove(MetricRegistry.name(getClass(), "misses"));
healthCheckRegistry.unregister(MetricRegistry.name(getClass()));
}
@Override
public Observable<String> get(String cacheKey, CachePersistencyOptions options) {
if (!enabled || options == null || !options.shouldUsePersistentCaches()) {
return Observable.empty();
}
if (!couchbaseClient.isEnabled()) {
log.warn("Couchbase client '{}' is disabled, please check the configuration.", COUCHBASE_CLIENT_ID);
return Observable.empty();
}
AsyncBucket bucket = couchbaseClient.getAsyncBucket();
if (bucket == null) {
log.error("Failed to obtain couchase bucket from " + couchbaseClient.getBucketName() + ", couchbase client " + couchbaseClient.getClientId());
return Observable.empty();
}
Observable<RawJsonDocument> fromCache;
if (options.isExtendStorageTimeOnGet()) {
fromCache = bucket.getAndTouch(getCacheKey(cacheKey), options.getStorageTime(), RawJsonDocument.class);
}
else {
fromCache = bucket.get(getCacheKey(cacheKey), RawJsonDocument.class);
}
return fromCache
.timeout(timeout, TimeUnit.MILLISECONDS, Observable.create(f -> {
log.warn("Timeout accessing Couchbase cache");
f.onCompleted();
}))
.lift(new TimerMetricsOperator<RawJsonDocument>(getLatencyTimer))
.lift(new HitsAndMissesCountingMetricsOperator<RawJsonDocument>(hitsCounter, missesCounter))
.map(doc -> {
String content = doc.content();
log.trace("Succesfully retrieved document with id {}: {}", doc.id(), doc.content());
return content;
});
}
@Override
public void put(String cacheKey, String jsonString, CachePersistencyOptions options) {
if (!enabled || !writable || options == null || !options.shouldUsePersistentCaches()) {
return;
}
if (!couchbaseClient.isEnabled()) {
log.warn("Couchbase client '{}' is disabled, please check the configuration.", COUCHBASE_CLIENT_ID);
return;
}
AsyncBucket bucket = couchbaseClient.getAsyncBucket();
if (bucket == null) {
log.error("Failed to obtain couchase bucket from " + couchbaseClient.getBucketName() + ", couchbase client " + couchbaseClient.getClientId());
return;
}
RawJsonDocument doc = RawJsonDocument.create(getCacheKey(cacheKey), options.getStorageTime(), jsonString);
Observable<RawJsonDocument> insertionObservable = bucket.upsert(doc);
insertionObservable
.timeout(timeout, TimeUnit.MILLISECONDS, Observable.create(f -> {
log.warn("Timeout writing into Couchbase cache");
f.onCompleted();
}))
.lift(new TimerMetricsOperator<RawJsonDocument>(putLatencyTimer))
.subscribe(new Observer<RawJsonDocument>() {
@Override
public void onNext(RawJsonDocument insertedDoc) {
log.trace("Succesfully put into Couchbase cache document with id {}:\n{}", insertedDoc.id(), insertedDoc.content());
}
@Override
public void onCompleted() {
// nothing
}
@Override
public void onError(Throwable e) {
log.error("Failed to put document " + getCacheKey(cacheKey) + " into the Couchbase cache", e);
}
});
}
private String getCacheKey(String cacheKey) {
String fullCacheKey = cacheKey;
if (isolated) {
try {
String hostName = InetAddress.getLocalHost().getHostName();
fullCacheKey += "_" + hostName;
}
catch (UnknownHostException ex) {
log.error("Failed to obtain this system's own host name to append it to the cache key", ex);
isolated = false;
}
}
return CouchbaseKey.build(fullCacheKey, keyPrefix);
}
}