• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2017, OpenCensus Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.opencensus.implcore.stats;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static io.opencensus.implcore.stats.RecordUtils.createAggregationMap;
21 import static io.opencensus.implcore.stats.RecordUtils.createMutableAggregation;
22 import static io.opencensus.implcore.stats.RecordUtils.getTagMap;
23 import static io.opencensus.implcore.stats.RecordUtils.getTagValues;
24 
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.collect.LinkedHashMultimap;
27 import com.google.common.collect.Maps;
28 import com.google.common.collect.Multimap;
29 import io.opencensus.common.Duration;
30 import io.opencensus.common.Function;
31 import io.opencensus.common.Functions;
32 import io.opencensus.common.Timestamp;
33 import io.opencensus.implcore.internal.CheckerFrameworkUtils;
34 import io.opencensus.implcore.internal.CurrentState.State;
35 import io.opencensus.metrics.LabelValue;
36 import io.opencensus.metrics.data.AttachmentValue;
37 import io.opencensus.metrics.export.Metric;
38 import io.opencensus.metrics.export.MetricDescriptor;
39 import io.opencensus.metrics.export.MetricDescriptor.Type;
40 import io.opencensus.metrics.export.Point;
41 import io.opencensus.metrics.export.TimeSeries;
42 import io.opencensus.stats.Aggregation;
43 import io.opencensus.stats.AggregationData;
44 import io.opencensus.stats.Measure;
45 import io.opencensus.stats.View;
46 import io.opencensus.stats.ViewData;
47 import io.opencensus.tags.TagContext;
48 import io.opencensus.tags.TagValue;
49 import java.util.ArrayDeque;
50 import java.util.ArrayList;
51 import java.util.Collections;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.Map.Entry;
55 
56 /*>>>
57 import org.checkerframework.checker.nullness.qual.Nullable;
58 */
59 
60 /** A mutable version of {@link ViewData}, used for recording stats and start/end time. */
61 @SuppressWarnings("deprecation")
62 abstract class MutableViewData {
63 
64   @VisibleForTesting static final Timestamp ZERO_TIMESTAMP = Timestamp.create(0, 0);
65 
66   private final View view;
67 
MutableViewData(View view)68   private MutableViewData(View view) {
69     this.view = view;
70   }
71 
72   /**
73    * Constructs a new {@link MutableViewData}.
74    *
75    * @param view the {@code View} linked with this {@code MutableViewData}.
76    * @param start the start {@code Timestamp}.
77    * @return a {@code MutableViewData}.
78    */
create(final View view, final Timestamp start)79   static MutableViewData create(final View view, final Timestamp start) {
80     return view.getWindow()
81         .match(
82             new CreateCumulative(view, start),
83             new CreateInterval(view, start),
84             Functions.<MutableViewData>throwAssertionError());
85   }
86 
87   /** The {@link View} associated with this {@link ViewData}. */
getView()88   View getView() {
89     return view;
90   }
91 
92   @javax.annotation.Nullable
toMetric(Timestamp now, State state)93   abstract Metric toMetric(Timestamp now, State state);
94 
95   /** Record stats with the given tags. */
record( TagContext context, double value, Timestamp timestamp, Map<String, AttachmentValue> attachments)96   abstract void record(
97       TagContext context,
98       double value,
99       Timestamp timestamp,
100       Map<String, AttachmentValue> attachments);
101 
102   /** Convert this {@link MutableViewData} to {@link ViewData}. */
toViewData(Timestamp now, State state)103   abstract ViewData toViewData(Timestamp now, State state);
104 
105   // Clear recorded stats.
clearStats()106   abstract void clearStats();
107 
108   // Resume stats collection, and reset Start Timestamp (for CumulativeMutableViewData), or refresh
109   // bucket list (for InternalMutableViewData).
resumeStatsCollection(Timestamp now)110   abstract void resumeStatsCollection(Timestamp now);
111 
112   private static final class CumulativeMutableViewData extends MutableViewData {
113 
114     private Timestamp start;
115     private final Map<List</*@Nullable*/ TagValue>, MutableAggregation> tagValueAggregationMap =
116         Maps.newHashMap();
117     // Cache a MetricDescriptor to avoid converting View to MetricDescriptor in the future.
118     private final MetricDescriptor metricDescriptor;
119 
CumulativeMutableViewData(View view, Timestamp start)120     private CumulativeMutableViewData(View view, Timestamp start) {
121       super(view);
122       this.start = start;
123       MetricDescriptor metricDescriptor = MetricUtils.viewToMetricDescriptor(view);
124       if (metricDescriptor == null) {
125         throw new AssertionError(
126             "Cumulative view should be converted to a non-null MetricDescriptor.");
127       } else {
128         this.metricDescriptor = metricDescriptor;
129       }
130     }
131 
132     @javax.annotation.Nullable
133     @Override
toMetric(Timestamp now, State state)134     Metric toMetric(Timestamp now, State state) {
135       handleTimeRewinds(now);
136       if (state == State.DISABLED) {
137         return null;
138       }
139       Type type = metricDescriptor.getType();
140       @javax.annotation.Nullable
141       Timestamp startTime = type == Type.GAUGE_INT64 || type == Type.GAUGE_DOUBLE ? null : start;
142       List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>();
143       for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry :
144           tagValueAggregationMap.entrySet()) {
145         List<LabelValue> labelValues = MetricUtils.tagValuesToLabelValues(entry.getKey());
146         Point point = entry.getValue().toPoint(now);
147         timeSeriesList.add(TimeSeries.createWithOnePoint(labelValues, point, startTime));
148       }
149       return Metric.create(metricDescriptor, timeSeriesList);
150     }
151 
152     @Override
record( TagContext context, double value, Timestamp timestamp, Map<String, AttachmentValue> attachments)153     void record(
154         TagContext context,
155         double value,
156         Timestamp timestamp,
157         Map<String, AttachmentValue> attachments) {
158       List</*@Nullable*/ TagValue> tagValues =
159           getTagValues(getTagMap(context), super.view.getColumns());
160       if (!tagValueAggregationMap.containsKey(tagValues)) {
161         tagValueAggregationMap.put(
162             tagValues,
163             createMutableAggregation(super.view.getAggregation(), super.getView().getMeasure()));
164       }
165       tagValueAggregationMap.get(tagValues).add(value, attachments, timestamp);
166     }
167 
168     @Override
toViewData(Timestamp now, State state)169     ViewData toViewData(Timestamp now, State state) {
170       handleTimeRewinds(now);
171       if (state == State.ENABLED) {
172         return ViewData.create(
173             super.view,
174             createAggregationMap(tagValueAggregationMap, super.view.getMeasure()),
175             ViewData.AggregationWindowData.CumulativeData.create(start, now));
176       } else {
177         // If Stats state is DISABLED, return an empty ViewData.
178         return ViewData.create(
179             super.view,
180             Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(),
181             ViewData.AggregationWindowData.CumulativeData.create(ZERO_TIMESTAMP, ZERO_TIMESTAMP));
182       }
183     }
184 
185     /**
186      * This method attemps to migrate this view into a reasonable state in the event of time going
187      * backwards.
188      */
handleTimeRewinds(Timestamp now)189     private void handleTimeRewinds(Timestamp now) {
190       if (now.compareTo(start) < 0) {
191         // Time went backwards, physics is broken, forget what we know.
192         clearStats();
193         start = now;
194       }
195     }
196 
197     @Override
clearStats()198     void clearStats() {
199       tagValueAggregationMap.clear();
200     }
201 
202     @Override
resumeStatsCollection(Timestamp now)203     void resumeStatsCollection(Timestamp now) {
204       start = now;
205     }
206   }
207 
208   /*
209    * For each IntervalView, we always keep a queue of N + 1 buckets (by default N is 4).
210    * Each bucket has a duration which is interval duration / N.
211    * Ideally:
212    * 1. the buckets should always be up-to-date,
213    * 2. current time should always be within the latest bucket, currently recorded stats should fall
214    *    into the latest bucket,
215    * 3. there are always N buckets before the current one, which holds the stats in the past
216    *    interval duration.
217    *
218    * When getView() is called, we will extract and combine the stats from the current and past
219    * buckets (part of the stats from the oldest bucket could have expired).
220    *
221    * However, in reality, we couldn't track the status of buckets all the time (keep monitoring and
222    * updating the bucket queue will be expensive). When we call record() or getView(), some or all
223    * of the buckets might be outdated, and we will need to "pad" new buckets to the queue and remove
224    * outdated ones. After refreshing buckets, the bucket queue will able to maintain the three
225    * invariants in the ideal situation.
226    *
227    * For example:
228    * 1. We have an IntervalView which has a duration of 8 seconds, we register this view at 10s.
229    * 2. Initially there will be 5 buckets: [2.0, 4.0), [4.0, 6.0), ..., [10.0, 12.0).
230    * 3. If users don't call record() or getView(), bucket queue will remain as it is, and some
231    *    buckets could expire.
232    * 4. Suppose record() is called at 15s, now we need to refresh the bucket queue. We need to add
233    *    two new buckets [12.0, 14.0) and [14.0, 16.0), and remove two expired buckets [2.0, 4.0)
234    *    and [4.0, 6.0)
235    * 5. Suppose record() is called again at 30s, all the current buckets should have expired. We add
236    *    5 new buckets [22.0, 24.0) ... [30.0, 32.0) and remove all the previous buckets.
237    * 6. Suppose users call getView() at 35s, again we need to add two new buckets and remove two
238    *    expired one, so that bucket queue is up-to-date. Now we combine stats from all buckets and
239    *    return the combined IntervalViewData.
240    */
241   private static final class IntervalMutableViewData extends MutableViewData {
242 
243     // TODO(songya): allow customizable bucket size in the future.
244     private static final int N = 4; // IntervalView has N + 1 buckets
245 
246     private final ArrayDeque<IntervalBucket> buckets = new ArrayDeque<IntervalBucket>();
247 
248     private final Duration totalDuration; // Duration of the whole interval.
249     private final Duration bucketDuration; // Duration of a single bucket (totalDuration / N)
250 
IntervalMutableViewData(View view, Timestamp start)251     private IntervalMutableViewData(View view, Timestamp start) {
252       super(view);
253       Duration totalDuration = ((View.AggregationWindow.Interval) view.getWindow()).getDuration();
254       this.totalDuration = totalDuration;
255       this.bucketDuration = Duration.fromMillis(totalDuration.toMillis() / N);
256 
257       // When initializing. add N empty buckets prior to the start timestamp of this
258       // IntervalMutableViewData, so that the last bucket will be the current one in effect.
259       shiftBucketList(N + 1, start);
260     }
261 
262     @javax.annotation.Nullable
263     @Override
toMetric(Timestamp now, State state)264     Metric toMetric(Timestamp now, State state) {
265       return null;
266     }
267 
268     @Override
record( TagContext context, double value, Timestamp timestamp, Map<String, AttachmentValue> attachments)269     void record(
270         TagContext context,
271         double value,
272         Timestamp timestamp,
273         Map<String, AttachmentValue> attachments) {
274       List</*@Nullable*/ TagValue> tagValues =
275           getTagValues(getTagMap(context), super.view.getColumns());
276       refreshBucketList(timestamp);
277       // It is always the last bucket that does the recording.
278       CheckerFrameworkUtils.castNonNull(buckets.peekLast())
279           .record(tagValues, value, attachments, timestamp);
280     }
281 
282     @Override
toViewData(Timestamp now, State state)283     ViewData toViewData(Timestamp now, State state) {
284       refreshBucketList(now);
285       if (state == State.ENABLED) {
286         return ViewData.create(
287             super.view,
288             combineBucketsAndGetAggregationMap(now),
289             ViewData.AggregationWindowData.IntervalData.create(now));
290       } else {
291         // If Stats state is DISABLED, return an empty ViewData.
292         return ViewData.create(
293             super.view,
294             Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(),
295             ViewData.AggregationWindowData.IntervalData.create(ZERO_TIMESTAMP));
296       }
297     }
298 
299     @Override
clearStats()300     void clearStats() {
301       for (IntervalBucket bucket : buckets) {
302         bucket.clearStats();
303       }
304     }
305 
306     @Override
resumeStatsCollection(Timestamp now)307     void resumeStatsCollection(Timestamp now) {
308       // Refresh bucket list to be ready for stats recording, so that if record() is called right
309       // after stats state is turned back on, record() will be faster.
310       refreshBucketList(now);
311     }
312 
313     // Add new buckets and remove expired buckets by comparing the current timestamp with
314     // timestamp of the last bucket.
refreshBucketList(Timestamp now)315     private void refreshBucketList(Timestamp now) {
316       if (buckets.size() != N + 1) {
317         throw new AssertionError("Bucket list must have exactly " + (N + 1) + " buckets.");
318       }
319       Timestamp startOfLastBucket =
320           CheckerFrameworkUtils.castNonNull(buckets.peekLast()).getStart();
321       // Time went backwards!  Physics has failed us!  drop everything we know and relearn.
322       // Prioritize:  Report data we're confident is correct.
323       if (now.compareTo(startOfLastBucket) < 0) {
324         // TODO: configurable time-skew handling with options:
325         // - Drop events in the future, keep others within a duration.
326         // - Drop all events on skew
327         // - Guess at time-skew and "fix" events
328         // - Reset our "start" time to now if necessary.
329         buckets.clear();
330         shiftBucketList(N + 1, now);
331         return;
332       }
333       long elapsedTimeMillis = now.subtractTimestamp(startOfLastBucket).toMillis();
334       long numOfPadBuckets = elapsedTimeMillis / bucketDuration.toMillis();
335 
336       shiftBucketList(numOfPadBuckets, now);
337     }
338 
339     // Add specified number of new buckets, and remove expired buckets
shiftBucketList(long numOfPadBuckets, Timestamp now)340     private void shiftBucketList(long numOfPadBuckets, Timestamp now) {
341       Timestamp startOfNewBucket;
342 
343       if (!buckets.isEmpty()) {
344         startOfNewBucket =
345             CheckerFrameworkUtils.castNonNull(buckets.peekLast())
346                 .getStart()
347                 .addDuration(bucketDuration);
348       } else {
349         // Initialize bucket list. Should only enter this block once.
350         startOfNewBucket = subtractDuration(now, totalDuration);
351       }
352 
353       if (numOfPadBuckets > N + 1) {
354         // All current buckets expired, need to add N + 1 new buckets. The start time of the latest
355         // bucket will be current time.
356         startOfNewBucket = subtractDuration(now, totalDuration);
357         numOfPadBuckets = N + 1;
358       }
359 
360       for (int i = 0; i < numOfPadBuckets; i++) {
361         buckets.add(
362             new IntervalBucket(
363                 startOfNewBucket,
364                 bucketDuration,
365                 super.view.getAggregation(),
366                 super.view.getMeasure()));
367         startOfNewBucket = startOfNewBucket.addDuration(bucketDuration);
368       }
369 
370       // removed expired buckets
371       while (buckets.size() > N + 1) {
372         buckets.pollFirst();
373       }
374     }
375 
376     // Combine stats within each bucket, aggregate stats by tag values, and return the mapping from
377     // tag values to aggregation data.
combineBucketsAndGetAggregationMap( Timestamp now)378     private Map<List</*@Nullable*/ TagValue>, AggregationData> combineBucketsAndGetAggregationMap(
379         Timestamp now) {
380       // Need to maintain the order of inserted MutableAggregations (inserted based on time order).
381       Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap =
382           LinkedHashMultimap.create();
383 
384       ArrayDeque<IntervalBucket> shallowCopy = new ArrayDeque<IntervalBucket>(buckets);
385 
386       Aggregation aggregation = super.view.getAggregation();
387       Measure measure = super.view.getMeasure();
388       putBucketsIntoMultiMap(shallowCopy, multimap, aggregation, measure, now);
389       Map<List</*@Nullable*/ TagValue>, MutableAggregation> singleMap =
390           aggregateOnEachTagValueList(multimap, aggregation, measure);
391       return createAggregationMap(singleMap, super.getView().getMeasure());
392     }
393 
394     // Put stats within each bucket to a multimap. Each tag value list (map key) could have multiple
395     // mutable aggregations (map value) from different buckets.
putBucketsIntoMultiMap( ArrayDeque<IntervalBucket> buckets, Multimap<List< TagValue>, MutableAggregation> multimap, Aggregation aggregation, Measure measure, Timestamp now)396     private static void putBucketsIntoMultiMap(
397         ArrayDeque<IntervalBucket> buckets,
398         Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap,
399         Aggregation aggregation,
400         Measure measure,
401         Timestamp now) {
402       // Put fractional stats of the head (oldest) bucket.
403       IntervalBucket head = CheckerFrameworkUtils.castNonNull(buckets.peekFirst());
404       IntervalBucket tail = CheckerFrameworkUtils.castNonNull(buckets.peekLast());
405       double fractionTail = tail.getFraction(now);
406       // TODO(songya): decide what to do when time goes backwards
407       checkArgument(
408           0.0 <= fractionTail && fractionTail <= 1.0,
409           "Fraction " + fractionTail + " should be within [0.0, 1.0].");
410       double fractionHead = 1.0 - fractionTail;
411       putFractionalMutableAggregationsToMultiMap(
412           head.getTagValueAggregationMap(), multimap, aggregation, measure, fractionHead);
413 
414       // Put whole data of other buckets.
415       boolean shouldSkipFirst = true;
416       for (IntervalBucket bucket : buckets) {
417         if (shouldSkipFirst) {
418           shouldSkipFirst = false;
419           continue; // skip the first bucket
420         }
421         for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry :
422             bucket.getTagValueAggregationMap().entrySet()) {
423           multimap.put(entry.getKey(), entry.getValue());
424         }
425       }
426     }
427 
428     // Put stats within one bucket into multimap, multiplied by a given fraction.
putFractionalMutableAggregationsToMultiMap( Map<T, MutableAggregation> mutableAggrMap, Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure, double fraction)429     private static <T> void putFractionalMutableAggregationsToMultiMap(
430         Map<T, MutableAggregation> mutableAggrMap,
431         Multimap<T, MutableAggregation> multimap,
432         Aggregation aggregation,
433         Measure measure,
434         double fraction) {
435       for (Entry<T, MutableAggregation> entry : mutableAggrMap.entrySet()) {
436         // Initially empty MutableAggregations.
437         MutableAggregation fractionalMutableAgg = createMutableAggregation(aggregation, measure);
438         fractionalMutableAgg.combine(entry.getValue(), fraction);
439         multimap.put(entry.getKey(), fractionalMutableAgg);
440       }
441     }
442 
443     // For each tag value list (key of AggregationMap), combine mutable aggregations into one
444     // mutable aggregation, thus convert the multimap into a single map.
aggregateOnEachTagValueList( Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure)445     private static <T> Map<T, MutableAggregation> aggregateOnEachTagValueList(
446         Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure) {
447       Map<T, MutableAggregation> map = Maps.newHashMap();
448       for (T tagValues : multimap.keySet()) {
449         // Initially empty MutableAggregations.
450         MutableAggregation combinedAggregation = createMutableAggregation(aggregation, measure);
451         for (MutableAggregation mutableAggregation : multimap.get(tagValues)) {
452           combinedAggregation.combine(mutableAggregation, 1.0);
453         }
454         map.put(tagValues, combinedAggregation);
455       }
456       return map;
457     }
458 
459     // Subtract a Duration from a Timestamp, and return a new Timestamp.
subtractDuration(Timestamp timestamp, Duration duration)460     private static Timestamp subtractDuration(Timestamp timestamp, Duration duration) {
461       return timestamp.addDuration(Duration.create(-duration.getSeconds(), -duration.getNanos()));
462     }
463   }
464 
465   private static final class CreateCumulative
466       implements Function<View.AggregationWindow.Cumulative, MutableViewData> {
467     @Override
apply(View.AggregationWindow.Cumulative arg)468     public MutableViewData apply(View.AggregationWindow.Cumulative arg) {
469       return new CumulativeMutableViewData(view, start);
470     }
471 
472     private final View view;
473     private final Timestamp start;
474 
CreateCumulative(View view, Timestamp start)475     private CreateCumulative(View view, Timestamp start) {
476       this.view = view;
477       this.start = start;
478     }
479   }
480 
481   private static final class CreateInterval
482       implements Function<View.AggregationWindow.Interval, MutableViewData> {
483     @Override
apply(View.AggregationWindow.Interval arg)484     public MutableViewData apply(View.AggregationWindow.Interval arg) {
485       return new IntervalMutableViewData(view, start);
486     }
487 
488     private final View view;
489     private final Timestamp start;
490 
CreateInterval(View view, Timestamp start)491     private CreateInterval(View view, Timestamp start) {
492       this.view = view;
493       this.start = start;
494     }
495   }
496 }
497