MergeTransformer.java
/*
* #%L
* wcm.io
* %%
* Copyright (C) 2014 wcm.io
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package io.wcm.caravan.pipeline.impl.operators;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import io.wcm.caravan.pipeline.JsonPipeline;
import io.wcm.caravan.pipeline.JsonPipelineOutput;
import io.wcm.caravan.pipeline.JsonPipelineOutputException;
import java.util.Iterator;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observable.Transformer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* a transformer that merges the output of a secondary {@link JsonPipeline} into the output of the primary pipeline.
*/
public class MergeTransformer implements Transformer<JsonPipelineOutput, JsonPipelineOutput> {
private static final Logger log = LoggerFactory.getLogger(MergeTransformer.class);
private final String primaryDescriptor;
private final Observable<JsonPipelineOutput> secondaryOutput;
private final String targetProperty;
/**
* @param primaryDescriptor the descriptor of the primary pipeline
* @param secondaryOutput the observable that emits the output of the secondary pipeline
* @param targetProperty the property that will be added to the primary pipeline'S output
*/
public MergeTransformer(String primaryDescriptor, Observable<JsonPipelineOutput> secondaryOutput, String targetProperty) {
this.primaryDescriptor = primaryDescriptor;
this.secondaryOutput = secondaryOutput;
this.targetProperty = targetProperty;
}
@Override
public Observable<JsonPipelineOutput> call(Observable<JsonPipelineOutput> primaryOutput) {
return primaryOutput.zipWith(secondaryOutput, (primaryModel, secondaryModel) -> {
log.debug("zipping object from secondary source into target property " + targetProperty);
JsonNode jsonFromPrimary = primaryModel.getPayload();
JsonNode jsonFromSecondary = secondaryModel.getPayload();
if (!(jsonFromPrimary.isObject())) {
throw new JsonPipelineOutputException("Only pipelines with JSON *Objects* can be used as a target for a merge operation, but response data for "
+ primaryDescriptor + " contained " + jsonFromPrimary.getClass().getSimpleName());
}
// start with cloning the the response of the primary pipeline
ObjectNode mergedObject = jsonFromPrimary.deepCopy();
// if a target property is specified, the JSON to be merged is inserted into this property
if (isNotBlank(targetProperty)) {
if (!mergedObject.has(targetProperty)) {
// the target property does not exist yet, so we just can set the property
mergedObject.set(targetProperty, jsonFromSecondary);
}
else {
// the target property already exists - let's hope we can merge!
JsonNode targetNode = mergedObject.get(targetProperty);
if (!targetNode.isObject()) {
throw new JsonPipelineOutputException("When merging two pipelines into the same target property, both most contain JSON *Object* responses");
}
if (!(jsonFromSecondary.isObject())) {
throw new JsonPipelineOutputException("Only pipelines with JSON *Object* responses can be merged into an existing target property");
}
mergeAllPropertiesInto((ObjectNode)jsonFromSecondary, (ObjectNode)targetNode);
}
}
else {
// if no target property is specified, all properties of the secondary pipeline are copied into the merged object
if (!(jsonFromSecondary.isObject())) {
throw new JsonPipelineOutputException("Only pipelines with JSON *Object* responses can be merged without specify a target property");
}
mergeAllPropertiesInto((ObjectNode)jsonFromSecondary, mergedObject);
}
return primaryModel.withPayload(mergedObject).withMaxAge(Math.min(primaryModel.getMaxAge(), secondaryModel.getMaxAge()));
});
}
private void mergeAllPropertiesInto(ObjectNode nodeToMerge, ObjectNode targetNode) {
// iterate over all properties of the given node
Iterator<Entry<String, JsonNode>> propertyIterator = nodeToMerge.fields();
while (propertyIterator.hasNext()) {
Entry<String, JsonNode> nextProperty = propertyIterator.next();
String propertyName = nextProperty.getKey();
if (targetNode.has(propertyName)) {
// what to do if the property already exists? for now, just throw an exception,
throw new JsonPipelineOutputException("Target pipeline " + primaryDescriptor + " already has a property named " + propertyName);
}
targetNode.set(propertyName, nextProperty.getValue());
}
}
}