1 /* 2 * Copyright 2017, OpenCensus Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.opencensus.implcore.stats; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static io.opencensus.implcore.stats.RecordUtils.createAggregationMap; 21 import static io.opencensus.implcore.stats.RecordUtils.createMutableAggregation; 22 import static io.opencensus.implcore.stats.RecordUtils.getTagMap; 23 import static io.opencensus.implcore.stats.RecordUtils.getTagValues; 24 25 import com.google.common.annotations.VisibleForTesting; 26 import com.google.common.collect.LinkedHashMultimap; 27 import com.google.common.collect.Maps; 28 import com.google.common.collect.Multimap; 29 import io.opencensus.common.Duration; 30 import io.opencensus.common.Function; 31 import io.opencensus.common.Functions; 32 import io.opencensus.common.Timestamp; 33 import io.opencensus.implcore.internal.CheckerFrameworkUtils; 34 import io.opencensus.implcore.internal.CurrentState.State; 35 import io.opencensus.metrics.LabelValue; 36 import io.opencensus.metrics.export.Metric; 37 import io.opencensus.metrics.export.MetricDescriptor; 38 import io.opencensus.metrics.export.MetricDescriptor.Type; 39 import io.opencensus.metrics.export.Point; 40 import io.opencensus.metrics.export.TimeSeries; 41 import io.opencensus.stats.Aggregation; 42 import io.opencensus.stats.AggregationData; 43 import io.opencensus.stats.Measure; 44 import io.opencensus.stats.View; 45 import io.opencensus.stats.ViewData; 46 import io.opencensus.tags.TagContext; 47 import io.opencensus.tags.TagValue; 48 import java.util.ArrayDeque; 49 import java.util.ArrayList; 50 import java.util.Collections; 51 import java.util.List; 52 import java.util.Map; 53 import java.util.Map.Entry; 54 55 /*>>> 56 import org.checkerframework.checker.nullness.qual.Nullable; 57 */ 58 59 /** A mutable version of {@link ViewData}, used for recording stats and start/end time. */ 60 @SuppressWarnings("deprecation") 61 abstract class MutableViewData { 62 63 @VisibleForTesting static final Timestamp ZERO_TIMESTAMP = Timestamp.create(0, 0); 64 65 private final View view; 66 MutableViewData(View view)67 private MutableViewData(View view) { 68 this.view = view; 69 } 70 71 /** 72 * Constructs a new {@link MutableViewData}. 73 * 74 * @param view the {@code View} linked with this {@code MutableViewData}. 75 * @param start the start {@code Timestamp}. 76 * @return a {@code MutableViewData}. 77 */ create(final View view, final Timestamp start)78 static MutableViewData create(final View view, final Timestamp start) { 79 return view.getWindow() 80 .match( 81 new CreateCumulative(view, start), 82 new CreateInterval(view, start), 83 Functions.<MutableViewData>throwAssertionError()); 84 } 85 86 /** The {@link View} associated with this {@link ViewData}. */ getView()87 View getView() { 88 return view; 89 } 90 91 @javax.annotation.Nullable toMetric(Timestamp now, State state)92 abstract Metric toMetric(Timestamp now, State state); 93 94 /** Record stats with the given tags. */ record( TagContext context, double value, Timestamp timestamp, Map<String, String> attachments)95 abstract void record( 96 TagContext context, double value, Timestamp timestamp, Map<String, String> attachments); 97 98 /** Convert this {@link MutableViewData} to {@link ViewData}. */ toViewData(Timestamp now, State state)99 abstract ViewData toViewData(Timestamp now, State state); 100 101 // Clear recorded stats. clearStats()102 abstract void clearStats(); 103 104 // Resume stats collection, and reset Start Timestamp (for CumulativeMutableViewData), or refresh 105 // bucket list (for InternalMutableViewData). resumeStatsCollection(Timestamp now)106 abstract void resumeStatsCollection(Timestamp now); 107 108 private static final class CumulativeMutableViewData extends MutableViewData { 109 110 private Timestamp start; 111 private final Map<List</*@Nullable*/ TagValue>, MutableAggregation> tagValueAggregationMap = 112 Maps.newHashMap(); 113 // Cache a MetricDescriptor to avoid converting View to MetricDescriptor in the future. 114 private final MetricDescriptor metricDescriptor; 115 CumulativeMutableViewData(View view, Timestamp start)116 private CumulativeMutableViewData(View view, Timestamp start) { 117 super(view); 118 this.start = start; 119 MetricDescriptor metricDescriptor = MetricUtils.viewToMetricDescriptor(view); 120 if (metricDescriptor == null) { 121 throw new AssertionError( 122 "Cumulative view should be converted to a non-null MetricDescriptor."); 123 } else { 124 this.metricDescriptor = metricDescriptor; 125 } 126 } 127 128 @javax.annotation.Nullable 129 @Override toMetric(Timestamp now, State state)130 Metric toMetric(Timestamp now, State state) { 131 if (state == State.DISABLED) { 132 return null; 133 } 134 Type type = metricDescriptor.getType(); 135 @javax.annotation.Nullable 136 Timestamp startTime = type == Type.GAUGE_INT64 || type == Type.GAUGE_DOUBLE ? null : start; 137 List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>(); 138 for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry : 139 tagValueAggregationMap.entrySet()) { 140 List<LabelValue> labelValues = MetricUtils.tagValuesToLabelValues(entry.getKey()); 141 Point point = entry.getValue().toPoint(now); 142 timeSeriesList.add(TimeSeries.createWithOnePoint(labelValues, point, startTime)); 143 } 144 return Metric.create(metricDescriptor, timeSeriesList); 145 } 146 147 @Override record( TagContext context, double value, Timestamp timestamp, Map<String, String> attachments)148 void record( 149 TagContext context, double value, Timestamp timestamp, Map<String, String> attachments) { 150 List</*@Nullable*/ TagValue> tagValues = 151 getTagValues(getTagMap(context), super.view.getColumns()); 152 if (!tagValueAggregationMap.containsKey(tagValues)) { 153 tagValueAggregationMap.put( 154 tagValues, 155 createMutableAggregation(super.view.getAggregation(), super.getView().getMeasure())); 156 } 157 tagValueAggregationMap.get(tagValues).add(value, attachments, timestamp); 158 } 159 160 @Override toViewData(Timestamp now, State state)161 ViewData toViewData(Timestamp now, State state) { 162 if (state == State.ENABLED) { 163 return ViewData.create( 164 super.view, 165 createAggregationMap(tagValueAggregationMap, super.view.getMeasure()), 166 ViewData.AggregationWindowData.CumulativeData.create(start, now)); 167 } else { 168 // If Stats state is DISABLED, return an empty ViewData. 169 return ViewData.create( 170 super.view, 171 Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(), 172 ViewData.AggregationWindowData.CumulativeData.create(ZERO_TIMESTAMP, ZERO_TIMESTAMP)); 173 } 174 } 175 176 @Override clearStats()177 void clearStats() { 178 tagValueAggregationMap.clear(); 179 } 180 181 @Override resumeStatsCollection(Timestamp now)182 void resumeStatsCollection(Timestamp now) { 183 start = now; 184 } 185 } 186 187 /* 188 * For each IntervalView, we always keep a queue of N + 1 buckets (by default N is 4). 189 * Each bucket has a duration which is interval duration / N. 190 * Ideally: 191 * 1. the buckets should always be up-to-date, 192 * 2. current time should always be within the latest bucket, currently recorded stats should fall 193 * into the latest bucket, 194 * 3. there are always N buckets before the current one, which holds the stats in the past 195 * interval duration. 196 * 197 * When getView() is called, we will extract and combine the stats from the current and past 198 * buckets (part of the stats from the oldest bucket could have expired). 199 * 200 * However, in reality, we couldn't track the status of buckets all the time (keep monitoring and 201 * updating the bucket queue will be expensive). When we call record() or getView(), some or all 202 * of the buckets might be outdated, and we will need to "pad" new buckets to the queue and remove 203 * outdated ones. After refreshing buckets, the bucket queue will able to maintain the three 204 * invariants in the ideal situation. 205 * 206 * For example: 207 * 1. We have an IntervalView which has a duration of 8 seconds, we register this view at 10s. 208 * 2. Initially there will be 5 buckets: [2.0, 4.0), [4.0, 6.0), ..., [10.0, 12.0). 209 * 3. If users don't call record() or getView(), bucket queue will remain as it is, and some 210 * buckets could expire. 211 * 4. Suppose record() is called at 15s, now we need to refresh the bucket queue. We need to add 212 * two new buckets [12.0, 14.0) and [14.0, 16.0), and remove two expired buckets [2.0, 4.0) 213 * and [4.0, 6.0) 214 * 5. Suppose record() is called again at 30s, all the current buckets should have expired. We add 215 * 5 new buckets [22.0, 24.0) ... [30.0, 32.0) and remove all the previous buckets. 216 * 6. Suppose users call getView() at 35s, again we need to add two new buckets and remove two 217 * expired one, so that bucket queue is up-to-date. Now we combine stats from all buckets and 218 * return the combined IntervalViewData. 219 */ 220 private static final class IntervalMutableViewData extends MutableViewData { 221 222 // TODO(songya): allow customizable bucket size in the future. 223 private static final int N = 4; // IntervalView has N + 1 buckets 224 225 private final ArrayDeque<IntervalBucket> buckets = new ArrayDeque<IntervalBucket>(); 226 227 private final Duration totalDuration; // Duration of the whole interval. 228 private final Duration bucketDuration; // Duration of a single bucket (totalDuration / N) 229 IntervalMutableViewData(View view, Timestamp start)230 private IntervalMutableViewData(View view, Timestamp start) { 231 super(view); 232 Duration totalDuration = ((View.AggregationWindow.Interval) view.getWindow()).getDuration(); 233 this.totalDuration = totalDuration; 234 this.bucketDuration = Duration.fromMillis(totalDuration.toMillis() / N); 235 236 // When initializing. add N empty buckets prior to the start timestamp of this 237 // IntervalMutableViewData, so that the last bucket will be the current one in effect. 238 shiftBucketList(N + 1, start); 239 } 240 241 @javax.annotation.Nullable 242 @Override toMetric(Timestamp now, State state)243 Metric toMetric(Timestamp now, State state) { 244 return null; 245 } 246 247 @Override record( TagContext context, double value, Timestamp timestamp, Map<String, String> attachments)248 void record( 249 TagContext context, double value, Timestamp timestamp, Map<String, String> attachments) { 250 List</*@Nullable*/ TagValue> tagValues = 251 getTagValues(getTagMap(context), super.view.getColumns()); 252 refreshBucketList(timestamp); 253 // It is always the last bucket that does the recording. 254 CheckerFrameworkUtils.castNonNull(buckets.peekLast()) 255 .record(tagValues, value, attachments, timestamp); 256 } 257 258 @Override toViewData(Timestamp now, State state)259 ViewData toViewData(Timestamp now, State state) { 260 refreshBucketList(now); 261 if (state == State.ENABLED) { 262 return ViewData.create( 263 super.view, 264 combineBucketsAndGetAggregationMap(now), 265 ViewData.AggregationWindowData.IntervalData.create(now)); 266 } else { 267 // If Stats state is DISABLED, return an empty ViewData. 268 return ViewData.create( 269 super.view, 270 Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(), 271 ViewData.AggregationWindowData.IntervalData.create(ZERO_TIMESTAMP)); 272 } 273 } 274 275 @Override clearStats()276 void clearStats() { 277 for (IntervalBucket bucket : buckets) { 278 bucket.clearStats(); 279 } 280 } 281 282 @Override resumeStatsCollection(Timestamp now)283 void resumeStatsCollection(Timestamp now) { 284 // Refresh bucket list to be ready for stats recording, so that if record() is called right 285 // after stats state is turned back on, record() will be faster. 286 refreshBucketList(now); 287 } 288 289 // Add new buckets and remove expired buckets by comparing the current timestamp with 290 // timestamp of the last bucket. refreshBucketList(Timestamp now)291 private void refreshBucketList(Timestamp now) { 292 if (buckets.size() != N + 1) { 293 throw new AssertionError("Bucket list must have exactly " + (N + 1) + " buckets."); 294 } 295 Timestamp startOfLastBucket = 296 CheckerFrameworkUtils.castNonNull(buckets.peekLast()).getStart(); 297 // TODO(songya): decide what to do when time goes backwards 298 checkArgument( 299 now.compareTo(startOfLastBucket) >= 0, 300 "Current time must be within or after the last bucket."); 301 long elapsedTimeMillis = now.subtractTimestamp(startOfLastBucket).toMillis(); 302 long numOfPadBuckets = elapsedTimeMillis / bucketDuration.toMillis(); 303 304 shiftBucketList(numOfPadBuckets, now); 305 } 306 307 // Add specified number of new buckets, and remove expired buckets shiftBucketList(long numOfPadBuckets, Timestamp now)308 private void shiftBucketList(long numOfPadBuckets, Timestamp now) { 309 Timestamp startOfNewBucket; 310 311 if (!buckets.isEmpty()) { 312 startOfNewBucket = 313 CheckerFrameworkUtils.castNonNull(buckets.peekLast()) 314 .getStart() 315 .addDuration(bucketDuration); 316 } else { 317 // Initialize bucket list. Should only enter this block once. 318 startOfNewBucket = subtractDuration(now, totalDuration); 319 } 320 321 if (numOfPadBuckets > N + 1) { 322 // All current buckets expired, need to add N + 1 new buckets. The start time of the latest 323 // bucket will be current time. 324 startOfNewBucket = subtractDuration(now, totalDuration); 325 numOfPadBuckets = N + 1; 326 } 327 328 for (int i = 0; i < numOfPadBuckets; i++) { 329 buckets.add( 330 new IntervalBucket( 331 startOfNewBucket, 332 bucketDuration, 333 super.view.getAggregation(), 334 super.view.getMeasure())); 335 startOfNewBucket = startOfNewBucket.addDuration(bucketDuration); 336 } 337 338 // removed expired buckets 339 while (buckets.size() > N + 1) { 340 buckets.pollFirst(); 341 } 342 } 343 344 // Combine stats within each bucket, aggregate stats by tag values, and return the mapping from 345 // tag values to aggregation data. combineBucketsAndGetAggregationMap( Timestamp now)346 private Map<List</*@Nullable*/ TagValue>, AggregationData> combineBucketsAndGetAggregationMap( 347 Timestamp now) { 348 // Need to maintain the order of inserted MutableAggregations (inserted based on time order). 349 Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap = 350 LinkedHashMultimap.create(); 351 352 ArrayDeque<IntervalBucket> shallowCopy = new ArrayDeque<IntervalBucket>(buckets); 353 354 Aggregation aggregation = super.view.getAggregation(); 355 Measure measure = super.view.getMeasure(); 356 putBucketsIntoMultiMap(shallowCopy, multimap, aggregation, measure, now); 357 Map<List</*@Nullable*/ TagValue>, MutableAggregation> singleMap = 358 aggregateOnEachTagValueList(multimap, aggregation, measure); 359 return createAggregationMap(singleMap, super.getView().getMeasure()); 360 } 361 362 // Put stats within each bucket to a multimap. Each tag value list (map key) could have multiple 363 // mutable aggregations (map value) from different buckets. putBucketsIntoMultiMap( ArrayDeque<IntervalBucket> buckets, Multimap<List< TagValue>, MutableAggregation> multimap, Aggregation aggregation, Measure measure, Timestamp now)364 private static void putBucketsIntoMultiMap( 365 ArrayDeque<IntervalBucket> buckets, 366 Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap, 367 Aggregation aggregation, 368 Measure measure, 369 Timestamp now) { 370 // Put fractional stats of the head (oldest) bucket. 371 IntervalBucket head = CheckerFrameworkUtils.castNonNull(buckets.peekFirst()); 372 IntervalBucket tail = CheckerFrameworkUtils.castNonNull(buckets.peekLast()); 373 double fractionTail = tail.getFraction(now); 374 // TODO(songya): decide what to do when time goes backwards 375 checkArgument( 376 0.0 <= fractionTail && fractionTail <= 1.0, 377 "Fraction " + fractionTail + " should be within [0.0, 1.0]."); 378 double fractionHead = 1.0 - fractionTail; 379 putFractionalMutableAggregationsToMultiMap( 380 head.getTagValueAggregationMap(), multimap, aggregation, measure, fractionHead); 381 382 // Put whole data of other buckets. 383 boolean shouldSkipFirst = true; 384 for (IntervalBucket bucket : buckets) { 385 if (shouldSkipFirst) { 386 shouldSkipFirst = false; 387 continue; // skip the first bucket 388 } 389 for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry : 390 bucket.getTagValueAggregationMap().entrySet()) { 391 multimap.put(entry.getKey(), entry.getValue()); 392 } 393 } 394 } 395 396 // Put stats within one bucket into multimap, multiplied by a given fraction. putFractionalMutableAggregationsToMultiMap( Map<T, MutableAggregation> mutableAggrMap, Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure, double fraction)397 private static <T> void putFractionalMutableAggregationsToMultiMap( 398 Map<T, MutableAggregation> mutableAggrMap, 399 Multimap<T, MutableAggregation> multimap, 400 Aggregation aggregation, 401 Measure measure, 402 double fraction) { 403 for (Entry<T, MutableAggregation> entry : mutableAggrMap.entrySet()) { 404 // Initially empty MutableAggregations. 405 MutableAggregation fractionalMutableAgg = createMutableAggregation(aggregation, measure); 406 fractionalMutableAgg.combine(entry.getValue(), fraction); 407 multimap.put(entry.getKey(), fractionalMutableAgg); 408 } 409 } 410 411 // For each tag value list (key of AggregationMap), combine mutable aggregations into one 412 // mutable aggregation, thus convert the multimap into a single map. aggregateOnEachTagValueList( Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure)413 private static <T> Map<T, MutableAggregation> aggregateOnEachTagValueList( 414 Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure) { 415 Map<T, MutableAggregation> map = Maps.newHashMap(); 416 for (T tagValues : multimap.keySet()) { 417 // Initially empty MutableAggregations. 418 MutableAggregation combinedAggregation = createMutableAggregation(aggregation, measure); 419 for (MutableAggregation mutableAggregation : multimap.get(tagValues)) { 420 combinedAggregation.combine(mutableAggregation, 1.0); 421 } 422 map.put(tagValues, combinedAggregation); 423 } 424 return map; 425 } 426 427 // Subtract a Duration from a Timestamp, and return a new Timestamp. subtractDuration(Timestamp timestamp, Duration duration)428 private static Timestamp subtractDuration(Timestamp timestamp, Duration duration) { 429 return timestamp.addDuration(Duration.create(-duration.getSeconds(), -duration.getNanos())); 430 } 431 } 432 433 private static final class CreateCumulative 434 implements Function<View.AggregationWindow.Cumulative, MutableViewData> { 435 @Override apply(View.AggregationWindow.Cumulative arg)436 public MutableViewData apply(View.AggregationWindow.Cumulative arg) { 437 return new CumulativeMutableViewData(view, start); 438 } 439 440 private final View view; 441 private final Timestamp start; 442 CreateCumulative(View view, Timestamp start)443 private CreateCumulative(View view, Timestamp start) { 444 this.view = view; 445 this.start = start; 446 } 447 } 448 449 private static final class CreateInterval 450 implements Function<View.AggregationWindow.Interval, MutableViewData> { 451 @Override apply(View.AggregationWindow.Interval arg)452 public MutableViewData apply(View.AggregationWindow.Interval arg) { 453 return new IntervalMutableViewData(view, start); 454 } 455 456 private final View view; 457 private final Timestamp start; 458 CreateInterval(View view, Timestamp start)459 private CreateInterval(View view, Timestamp start) { 460 this.view = view; 461 this.start = start; 462 } 463 } 464 } 465