PerformanceMetricsTransformer.java

/*
 * #%L
 * wcm.io
 * %%
 * Copyright (C) 2014 wcm.io
 * %%
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */
package io.wcm.caravan.pipeline.impl.operators;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Observable.Transformer;
import rx.Observer;

/**
 * A class that measures the time spent by a specific {@link Transformer}, {@link Operator} or {@link Observable}.
 * Usage of this class is limited to observables with a single emission
 */
public final class PerformanceMetricsTransformer<T> implements Transformer<T, T> {

  private static final Logger log = LoggerFactory.getLogger(PerformanceMetricsTransformer.class);

  private final Transformer<T, T> toMeasure;

  private final Stopwatch subscriptionStopwatch;
  private final Stopwatch observationStopwatch;
  private final Stopwatch emissionStopwatch;

  /**
   * constructor for unit-tests that allows you to specify a custom ticker. For general usage use
   * {@link #withSystemTicker(Transformer)}
   * @param toMeasure the transformer that should be measured
   * @param ticker the time source
   */
  PerformanceMetricsTransformer(Transformer<T, T> toMeasure, Ticker ticker) {
    super();
    this.toMeasure = toMeasure;

    subscriptionStopwatch = Stopwatch.createUnstarted(ticker);
    observationStopwatch = Stopwatch.createUnstarted(ticker);
    emissionStopwatch = Stopwatch.createUnstarted(ticker);
  }

  /**
   * constructor for unit-tests that allows you to specify a custom ticker. For general usage use
   * {@link #withSystemTicker(Operator)}
   * @param toMeasure the operator that should be measured
   * @param ticker the time source
   */
  PerformanceMetricsTransformer(Operator<T, T> toMeasure, Ticker ticker) {

    // if the subject to be measured is an operator we wrap it in a trivial transformer
    this(new Transformer<T, T>() {

      @Override
      public Observable<T> call(Observable<T> sourceObservable) {
        return Observable.create(subscriber -> {
          // apply the operator to measure to the source observable before subscribing
          sourceObservable.lift(toMeasure).subscribe(subscriber);
        });
      }

    }, ticker);
  }

  /**
   * constructor for unit-tests that allows you to specify a custom ticker. For general usage use
   * {@link #withSystemTicker(Observable)}
   * @param toMeasure the operator that should be measured
   * @param ticker the time source
   */
  PerformanceMetricsTransformer(Observable<T> toMeasure, Ticker ticker) {

    // if the subject to be measured is an operator we wrap it in a trivial transformer
    this(new Transformer<T, T>() {

      @Override
      public Observable<T> call(Observable<T> sourceObservable) {
        return sourceObservable;
      }
    }, ticker);

  }

  /**
   * @param toMeasure the transformer that should be measured
   * @param <T> type
   * @return a PerformanceMetricsTransformer that uses the {@link Ticker#systemTicker()}
   */
  public static <T> PerformanceMetricsTransformer<T> withSystemTicker(Transformer<T, T> toMeasure) {
    return new PerformanceMetricsTransformer<T>(toMeasure, Ticker.systemTicker());
  }

  /**
   * @param toMeasure the operator that should be measured
   * @param <T> type
   * @return a PerformanceMetricsTransformer that uses the {@link Ticker#systemTicker()}
   */
  public static <T> PerformanceMetricsTransformer<T> withSystemTicker(Operator<T, T> toMeasure) {
    return new PerformanceMetricsTransformer<T>(toMeasure, Ticker.systemTicker());
  }

  /**
   * @param toMeasure the observable that should be measured
   * @param <T> type
   * @return a PerformanceMetricsTransformer that uses the {@link Ticker#systemTicker()}
   */
  public static <T> PerformanceMetricsTransformer<T> withSystemTicker(Observable<T> toMeasure) {
    return new PerformanceMetricsTransformer<T>(toMeasure, Ticker.systemTicker());
  }

  @Override
  public Observable<T> call(Observable<T> dataSource) {

    Observable<T> wrappedInputObservable = Observable.create(subscriberToMeasure -> {

      // 3. this is called after the subscriber to measure has finished its job and subscribes to this input observable
      if (subscriptionStopwatch.isRunning()) {
        subscriptionStopwatch.stop();
      }

      // 4. we now start the stop watch that measures how long the actual data source takes between subscription and emission
      observationStopwatch.start();
      dataSource.subscribe(new Observer<T>() {

        @Override
        public void onCompleted() {
          subscriberToMeasure.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
          subscriberToMeasure.onError(e);
        }

        @Override
        public void onNext(T emission) {

          // 5. the source data is available, we now stop the observation stop watch
          if (observationStopwatch.isRunning()) {
            observationStopwatch.stop();
          }

          // 6. now start the stop watch to measure the time it takes for the transfomer to handle the data
          emissionStopwatch.start();
          subscriberToMeasure.onNext(emission);
        }
      });
    });

    Observable<T> wrappedOutputObservable = Observable.create(actualOutputSubscriber -> {

      Observable<T> observableToMeasure = toMeasure.call(wrappedInputObservable);

      // 1. start the subscription stop watch
      subscriptionStopwatch.start();

      // 2. subscribing here will call onSubscribe function of the transformer that we want to measure
      observableToMeasure.subscribe(new Observer<T>() {

        @Override
        public void onCompleted() {
          actualOutputSubscriber.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
          actualOutputSubscriber.onError(e);
        }

        @Override
        public void onNext(T emission) {
          // 7. the transformer to measure has finished processing the data, so we stop the emission stop watch
          if (emissionStopwatch.isRunning()) {
            emissionStopwatch.stop();
          }

          log.info("subscription=" + getSubscriptionMillis() + "ms, observation=" + getObservationMillis() + "ms, emission: " + getEmissionMillis() + "ms");

          actualOutputSubscriber.onNext(emission);
        }
      });
    });

    return wrappedOutputObservable.cache();
  }


  /**
   * Calculate the time spent during the {@link OnSubscribe} function of the {@link Transformer} to measure. Will be 0
   * if you are using this class to measure a {@link Operator} or {@link Observable}
   * @return the time in milliseconds spent subscribing onto the source observable
   */
  public long getSubscriptionMillis() {
    return this.subscriptionStopwatch.elapsed(TimeUnit.MILLISECONDS);
  }

  /**
   * Calculate the time spent waiting for the source observable to emit the result
   * @return the time in milliseconds spent waiting for the source observable
   */
  public long getObservationMillis() {
    return this.observationStopwatch.elapsed(TimeUnit.MILLISECONDS);
  }

  /**
   * Calculate the time spent processing the emission of the source observable
   * @return the time in milliseconds spent processing the emission
   */
  public long getEmissionMillis() {
    return this.emissionStopwatch.elapsed(TimeUnit.MILLISECONDS);
  }
}