1 /* 2 * Copyright 2017, OpenCensus Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.opencensus.implcore.stats; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static io.opencensus.implcore.stats.RecordUtils.createAggregationMap; 21 import static io.opencensus.implcore.stats.RecordUtils.createMutableAggregation; 22 import static io.opencensus.implcore.stats.RecordUtils.getTagMap; 23 import static io.opencensus.implcore.stats.RecordUtils.getTagValues; 24 25 import com.google.common.annotations.VisibleForTesting; 26 import com.google.common.collect.LinkedHashMultimap; 27 import com.google.common.collect.Maps; 28 import com.google.common.collect.Multimap; 29 import io.opencensus.common.Duration; 30 import io.opencensus.common.Function; 31 import io.opencensus.common.Functions; 32 import io.opencensus.common.Timestamp; 33 import io.opencensus.implcore.internal.CheckerFrameworkUtils; 34 import io.opencensus.implcore.internal.CurrentState.State; 35 import io.opencensus.metrics.LabelValue; 36 import io.opencensus.metrics.data.AttachmentValue; 37 import io.opencensus.metrics.export.Metric; 38 import io.opencensus.metrics.export.MetricDescriptor; 39 import io.opencensus.metrics.export.MetricDescriptor.Type; 40 import io.opencensus.metrics.export.Point; 41 import io.opencensus.metrics.export.TimeSeries; 42 import io.opencensus.stats.Aggregation; 43 import io.opencensus.stats.AggregationData; 44 import io.opencensus.stats.Measure; 45 import io.opencensus.stats.View; 46 import io.opencensus.stats.ViewData; 47 import io.opencensus.tags.TagContext; 48 import io.opencensus.tags.TagValue; 49 import java.util.ArrayDeque; 50 import java.util.ArrayList; 51 import java.util.Collections; 52 import java.util.List; 53 import java.util.Map; 54 import java.util.Map.Entry; 55 56 /*>>> 57 import org.checkerframework.checker.nullness.qual.Nullable; 58 */ 59 60 /** A mutable version of {@link ViewData}, used for recording stats and start/end time. */ 61 @SuppressWarnings("deprecation") 62 abstract class MutableViewData { 63 64 @VisibleForTesting static final Timestamp ZERO_TIMESTAMP = Timestamp.create(0, 0); 65 66 private final View view; 67 MutableViewData(View view)68 private MutableViewData(View view) { 69 this.view = view; 70 } 71 72 /** 73 * Constructs a new {@link MutableViewData}. 74 * 75 * @param view the {@code View} linked with this {@code MutableViewData}. 76 * @param start the start {@code Timestamp}. 77 * @return a {@code MutableViewData}. 78 */ create(final View view, final Timestamp start)79 static MutableViewData create(final View view, final Timestamp start) { 80 return view.getWindow() 81 .match( 82 new CreateCumulative(view, start), 83 new CreateInterval(view, start), 84 Functions.<MutableViewData>throwAssertionError()); 85 } 86 87 /** The {@link View} associated with this {@link ViewData}. */ getView()88 View getView() { 89 return view; 90 } 91 92 @javax.annotation.Nullable toMetric(Timestamp now, State state)93 abstract Metric toMetric(Timestamp now, State state); 94 95 /** Record stats with the given tags. */ record( TagContext context, double value, Timestamp timestamp, Map<String, AttachmentValue> attachments)96 abstract void record( 97 TagContext context, 98 double value, 99 Timestamp timestamp, 100 Map<String, AttachmentValue> attachments); 101 102 /** Convert this {@link MutableViewData} to {@link ViewData}. */ toViewData(Timestamp now, State state)103 abstract ViewData toViewData(Timestamp now, State state); 104 105 // Clear recorded stats. clearStats()106 abstract void clearStats(); 107 108 // Resume stats collection, and reset Start Timestamp (for CumulativeMutableViewData), or refresh 109 // bucket list (for InternalMutableViewData). resumeStatsCollection(Timestamp now)110 abstract void resumeStatsCollection(Timestamp now); 111 112 private static final class CumulativeMutableViewData extends MutableViewData { 113 114 private Timestamp start; 115 private final Map<List</*@Nullable*/ TagValue>, MutableAggregation> tagValueAggregationMap = 116 Maps.newHashMap(); 117 // Cache a MetricDescriptor to avoid converting View to MetricDescriptor in the future. 118 private final MetricDescriptor metricDescriptor; 119 CumulativeMutableViewData(View view, Timestamp start)120 private CumulativeMutableViewData(View view, Timestamp start) { 121 super(view); 122 this.start = start; 123 MetricDescriptor metricDescriptor = MetricUtils.viewToMetricDescriptor(view); 124 if (metricDescriptor == null) { 125 throw new AssertionError( 126 "Cumulative view should be converted to a non-null MetricDescriptor."); 127 } else { 128 this.metricDescriptor = metricDescriptor; 129 } 130 } 131 132 @javax.annotation.Nullable 133 @Override toMetric(Timestamp now, State state)134 Metric toMetric(Timestamp now, State state) { 135 handleTimeRewinds(now); 136 if (state == State.DISABLED) { 137 return null; 138 } 139 Type type = metricDescriptor.getType(); 140 @javax.annotation.Nullable 141 Timestamp startTime = type == Type.GAUGE_INT64 || type == Type.GAUGE_DOUBLE ? null : start; 142 List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>(); 143 for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry : 144 tagValueAggregationMap.entrySet()) { 145 List<LabelValue> labelValues = MetricUtils.tagValuesToLabelValues(entry.getKey()); 146 Point point = entry.getValue().toPoint(now); 147 timeSeriesList.add(TimeSeries.createWithOnePoint(labelValues, point, startTime)); 148 } 149 return Metric.create(metricDescriptor, timeSeriesList); 150 } 151 152 @Override record( TagContext context, double value, Timestamp timestamp, Map<String, AttachmentValue> attachments)153 void record( 154 TagContext context, 155 double value, 156 Timestamp timestamp, 157 Map<String, AttachmentValue> attachments) { 158 List</*@Nullable*/ TagValue> tagValues = 159 getTagValues(getTagMap(context), super.view.getColumns()); 160 if (!tagValueAggregationMap.containsKey(tagValues)) { 161 tagValueAggregationMap.put( 162 tagValues, 163 createMutableAggregation(super.view.getAggregation(), super.getView().getMeasure())); 164 } 165 tagValueAggregationMap.get(tagValues).add(value, attachments, timestamp); 166 } 167 168 @Override toViewData(Timestamp now, State state)169 ViewData toViewData(Timestamp now, State state) { 170 handleTimeRewinds(now); 171 if (state == State.ENABLED) { 172 return ViewData.create( 173 super.view, 174 createAggregationMap(tagValueAggregationMap, super.view.getMeasure()), 175 ViewData.AggregationWindowData.CumulativeData.create(start, now)); 176 } else { 177 // If Stats state is DISABLED, return an empty ViewData. 178 return ViewData.create( 179 super.view, 180 Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(), 181 ViewData.AggregationWindowData.CumulativeData.create(ZERO_TIMESTAMP, ZERO_TIMESTAMP)); 182 } 183 } 184 185 /** 186 * This method attemps to migrate this view into a reasonable state in the event of time going 187 * backwards. 188 */ handleTimeRewinds(Timestamp now)189 private void handleTimeRewinds(Timestamp now) { 190 if (now.compareTo(start) < 0) { 191 // Time went backwards, physics is broken, forget what we know. 192 clearStats(); 193 start = now; 194 } 195 } 196 197 @Override clearStats()198 void clearStats() { 199 tagValueAggregationMap.clear(); 200 } 201 202 @Override resumeStatsCollection(Timestamp now)203 void resumeStatsCollection(Timestamp now) { 204 start = now; 205 } 206 } 207 208 /* 209 * For each IntervalView, we always keep a queue of N + 1 buckets (by default N is 4). 210 * Each bucket has a duration which is interval duration / N. 211 * Ideally: 212 * 1. the buckets should always be up-to-date, 213 * 2. current time should always be within the latest bucket, currently recorded stats should fall 214 * into the latest bucket, 215 * 3. there are always N buckets before the current one, which holds the stats in the past 216 * interval duration. 217 * 218 * When getView() is called, we will extract and combine the stats from the current and past 219 * buckets (part of the stats from the oldest bucket could have expired). 220 * 221 * However, in reality, we couldn't track the status of buckets all the time (keep monitoring and 222 * updating the bucket queue will be expensive). When we call record() or getView(), some or all 223 * of the buckets might be outdated, and we will need to "pad" new buckets to the queue and remove 224 * outdated ones. After refreshing buckets, the bucket queue will able to maintain the three 225 * invariants in the ideal situation. 226 * 227 * For example: 228 * 1. We have an IntervalView which has a duration of 8 seconds, we register this view at 10s. 229 * 2. Initially there will be 5 buckets: [2.0, 4.0), [4.0, 6.0), ..., [10.0, 12.0). 230 * 3. If users don't call record() or getView(), bucket queue will remain as it is, and some 231 * buckets could expire. 232 * 4. Suppose record() is called at 15s, now we need to refresh the bucket queue. We need to add 233 * two new buckets [12.0, 14.0) and [14.0, 16.0), and remove two expired buckets [2.0, 4.0) 234 * and [4.0, 6.0) 235 * 5. Suppose record() is called again at 30s, all the current buckets should have expired. We add 236 * 5 new buckets [22.0, 24.0) ... [30.0, 32.0) and remove all the previous buckets. 237 * 6. Suppose users call getView() at 35s, again we need to add two new buckets and remove two 238 * expired one, so that bucket queue is up-to-date. Now we combine stats from all buckets and 239 * return the combined IntervalViewData. 240 */ 241 private static final class IntervalMutableViewData extends MutableViewData { 242 243 // TODO(songya): allow customizable bucket size in the future. 244 private static final int N = 4; // IntervalView has N + 1 buckets 245 246 private final ArrayDeque<IntervalBucket> buckets = new ArrayDeque<IntervalBucket>(); 247 248 private final Duration totalDuration; // Duration of the whole interval. 249 private final Duration bucketDuration; // Duration of a single bucket (totalDuration / N) 250 IntervalMutableViewData(View view, Timestamp start)251 private IntervalMutableViewData(View view, Timestamp start) { 252 super(view); 253 Duration totalDuration = ((View.AggregationWindow.Interval) view.getWindow()).getDuration(); 254 this.totalDuration = totalDuration; 255 this.bucketDuration = Duration.fromMillis(totalDuration.toMillis() / N); 256 257 // When initializing. add N empty buckets prior to the start timestamp of this 258 // IntervalMutableViewData, so that the last bucket will be the current one in effect. 259 shiftBucketList(N + 1, start); 260 } 261 262 @javax.annotation.Nullable 263 @Override toMetric(Timestamp now, State state)264 Metric toMetric(Timestamp now, State state) { 265 return null; 266 } 267 268 @Override record( TagContext context, double value, Timestamp timestamp, Map<String, AttachmentValue> attachments)269 void record( 270 TagContext context, 271 double value, 272 Timestamp timestamp, 273 Map<String, AttachmentValue> attachments) { 274 List</*@Nullable*/ TagValue> tagValues = 275 getTagValues(getTagMap(context), super.view.getColumns()); 276 refreshBucketList(timestamp); 277 // It is always the last bucket that does the recording. 278 CheckerFrameworkUtils.castNonNull(buckets.peekLast()) 279 .record(tagValues, value, attachments, timestamp); 280 } 281 282 @Override toViewData(Timestamp now, State state)283 ViewData toViewData(Timestamp now, State state) { 284 refreshBucketList(now); 285 if (state == State.ENABLED) { 286 return ViewData.create( 287 super.view, 288 combineBucketsAndGetAggregationMap(now), 289 ViewData.AggregationWindowData.IntervalData.create(now)); 290 } else { 291 // If Stats state is DISABLED, return an empty ViewData. 292 return ViewData.create( 293 super.view, 294 Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(), 295 ViewData.AggregationWindowData.IntervalData.create(ZERO_TIMESTAMP)); 296 } 297 } 298 299 @Override clearStats()300 void clearStats() { 301 for (IntervalBucket bucket : buckets) { 302 bucket.clearStats(); 303 } 304 } 305 306 @Override resumeStatsCollection(Timestamp now)307 void resumeStatsCollection(Timestamp now) { 308 // Refresh bucket list to be ready for stats recording, so that if record() is called right 309 // after stats state is turned back on, record() will be faster. 310 refreshBucketList(now); 311 } 312 313 // Add new buckets and remove expired buckets by comparing the current timestamp with 314 // timestamp of the last bucket. refreshBucketList(Timestamp now)315 private void refreshBucketList(Timestamp now) { 316 if (buckets.size() != N + 1) { 317 throw new AssertionError("Bucket list must have exactly " + (N + 1) + " buckets."); 318 } 319 Timestamp startOfLastBucket = 320 CheckerFrameworkUtils.castNonNull(buckets.peekLast()).getStart(); 321 // Time went backwards! Physics has failed us! drop everything we know and relearn. 322 // Prioritize: Report data we're confident is correct. 323 if (now.compareTo(startOfLastBucket) < 0) { 324 // TODO: configurable time-skew handling with options: 325 // - Drop events in the future, keep others within a duration. 326 // - Drop all events on skew 327 // - Guess at time-skew and "fix" events 328 // - Reset our "start" time to now if necessary. 329 buckets.clear(); 330 shiftBucketList(N + 1, now); 331 return; 332 } 333 long elapsedTimeMillis = now.subtractTimestamp(startOfLastBucket).toMillis(); 334 long numOfPadBuckets = elapsedTimeMillis / bucketDuration.toMillis(); 335 336 shiftBucketList(numOfPadBuckets, now); 337 } 338 339 // Add specified number of new buckets, and remove expired buckets shiftBucketList(long numOfPadBuckets, Timestamp now)340 private void shiftBucketList(long numOfPadBuckets, Timestamp now) { 341 Timestamp startOfNewBucket; 342 343 if (!buckets.isEmpty()) { 344 startOfNewBucket = 345 CheckerFrameworkUtils.castNonNull(buckets.peekLast()) 346 .getStart() 347 .addDuration(bucketDuration); 348 } else { 349 // Initialize bucket list. Should only enter this block once. 350 startOfNewBucket = subtractDuration(now, totalDuration); 351 } 352 353 if (numOfPadBuckets > N + 1) { 354 // All current buckets expired, need to add N + 1 new buckets. The start time of the latest 355 // bucket will be current time. 356 startOfNewBucket = subtractDuration(now, totalDuration); 357 numOfPadBuckets = N + 1; 358 } 359 360 for (int i = 0; i < numOfPadBuckets; i++) { 361 buckets.add( 362 new IntervalBucket( 363 startOfNewBucket, 364 bucketDuration, 365 super.view.getAggregation(), 366 super.view.getMeasure())); 367 startOfNewBucket = startOfNewBucket.addDuration(bucketDuration); 368 } 369 370 // removed expired buckets 371 while (buckets.size() > N + 1) { 372 buckets.pollFirst(); 373 } 374 } 375 376 // Combine stats within each bucket, aggregate stats by tag values, and return the mapping from 377 // tag values to aggregation data. combineBucketsAndGetAggregationMap( Timestamp now)378 private Map<List</*@Nullable*/ TagValue>, AggregationData> combineBucketsAndGetAggregationMap( 379 Timestamp now) { 380 // Need to maintain the order of inserted MutableAggregations (inserted based on time order). 381 Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap = 382 LinkedHashMultimap.create(); 383 384 ArrayDeque<IntervalBucket> shallowCopy = new ArrayDeque<IntervalBucket>(buckets); 385 386 Aggregation aggregation = super.view.getAggregation(); 387 Measure measure = super.view.getMeasure(); 388 putBucketsIntoMultiMap(shallowCopy, multimap, aggregation, measure, now); 389 Map<List</*@Nullable*/ TagValue>, MutableAggregation> singleMap = 390 aggregateOnEachTagValueList(multimap, aggregation, measure); 391 return createAggregationMap(singleMap, super.getView().getMeasure()); 392 } 393 394 // Put stats within each bucket to a multimap. Each tag value list (map key) could have multiple 395 // mutable aggregations (map value) from different buckets. putBucketsIntoMultiMap( ArrayDeque<IntervalBucket> buckets, Multimap<List< TagValue>, MutableAggregation> multimap, Aggregation aggregation, Measure measure, Timestamp now)396 private static void putBucketsIntoMultiMap( 397 ArrayDeque<IntervalBucket> buckets, 398 Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap, 399 Aggregation aggregation, 400 Measure measure, 401 Timestamp now) { 402 // Put fractional stats of the head (oldest) bucket. 403 IntervalBucket head = CheckerFrameworkUtils.castNonNull(buckets.peekFirst()); 404 IntervalBucket tail = CheckerFrameworkUtils.castNonNull(buckets.peekLast()); 405 double fractionTail = tail.getFraction(now); 406 // TODO(songya): decide what to do when time goes backwards 407 checkArgument( 408 0.0 <= fractionTail && fractionTail <= 1.0, 409 "Fraction " + fractionTail + " should be within [0.0, 1.0]."); 410 double fractionHead = 1.0 - fractionTail; 411 putFractionalMutableAggregationsToMultiMap( 412 head.getTagValueAggregationMap(), multimap, aggregation, measure, fractionHead); 413 414 // Put whole data of other buckets. 415 boolean shouldSkipFirst = true; 416 for (IntervalBucket bucket : buckets) { 417 if (shouldSkipFirst) { 418 shouldSkipFirst = false; 419 continue; // skip the first bucket 420 } 421 for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry : 422 bucket.getTagValueAggregationMap().entrySet()) { 423 multimap.put(entry.getKey(), entry.getValue()); 424 } 425 } 426 } 427 428 // Put stats within one bucket into multimap, multiplied by a given fraction. putFractionalMutableAggregationsToMultiMap( Map<T, MutableAggregation> mutableAggrMap, Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure, double fraction)429 private static <T> void putFractionalMutableAggregationsToMultiMap( 430 Map<T, MutableAggregation> mutableAggrMap, 431 Multimap<T, MutableAggregation> multimap, 432 Aggregation aggregation, 433 Measure measure, 434 double fraction) { 435 for (Entry<T, MutableAggregation> entry : mutableAggrMap.entrySet()) { 436 // Initially empty MutableAggregations. 437 MutableAggregation fractionalMutableAgg = createMutableAggregation(aggregation, measure); 438 fractionalMutableAgg.combine(entry.getValue(), fraction); 439 multimap.put(entry.getKey(), fractionalMutableAgg); 440 } 441 } 442 443 // For each tag value list (key of AggregationMap), combine mutable aggregations into one 444 // mutable aggregation, thus convert the multimap into a single map. aggregateOnEachTagValueList( Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure)445 private static <T> Map<T, MutableAggregation> aggregateOnEachTagValueList( 446 Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure) { 447 Map<T, MutableAggregation> map = Maps.newHashMap(); 448 for (T tagValues : multimap.keySet()) { 449 // Initially empty MutableAggregations. 450 MutableAggregation combinedAggregation = createMutableAggregation(aggregation, measure); 451 for (MutableAggregation mutableAggregation : multimap.get(tagValues)) { 452 combinedAggregation.combine(mutableAggregation, 1.0); 453 } 454 map.put(tagValues, combinedAggregation); 455 } 456 return map; 457 } 458 459 // Subtract a Duration from a Timestamp, and return a new Timestamp. subtractDuration(Timestamp timestamp, Duration duration)460 private static Timestamp subtractDuration(Timestamp timestamp, Duration duration) { 461 return timestamp.addDuration(Duration.create(-duration.getSeconds(), -duration.getNanos())); 462 } 463 } 464 465 private static final class CreateCumulative 466 implements Function<View.AggregationWindow.Cumulative, MutableViewData> { 467 @Override apply(View.AggregationWindow.Cumulative arg)468 public MutableViewData apply(View.AggregationWindow.Cumulative arg) { 469 return new CumulativeMutableViewData(view, start); 470 } 471 472 private final View view; 473 private final Timestamp start; 474 CreateCumulative(View view, Timestamp start)475 private CreateCumulative(View view, Timestamp start) { 476 this.view = view; 477 this.start = start; 478 } 479 } 480 481 private static final class CreateInterval 482 implements Function<View.AggregationWindow.Interval, MutableViewData> { 483 @Override apply(View.AggregationWindow.Interval arg)484 public MutableViewData apply(View.AggregationWindow.Interval arg) { 485 return new IntervalMutableViewData(view, start); 486 } 487 488 private final View view; 489 private final Timestamp start; 490 CreateInterval(View view, Timestamp start)491 private CreateInterval(View view, Timestamp start) { 492 this.view = view; 493 this.start = start; 494 } 495 } 496 } 497