1// Copyright 2015 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5// Package timeseries implements a time series structure for stats collection. 6package timeseries 7 8import ( 9 "fmt" 10 "log" 11 "time" 12) 13 14const ( 15 timeSeriesNumBuckets = 64 16 minuteHourSeriesNumBuckets = 60 17) 18 19var timeSeriesResolutions = []time.Duration{ 20 1 * time.Second, 21 10 * time.Second, 22 1 * time.Minute, 23 10 * time.Minute, 24 1 * time.Hour, 25 6 * time.Hour, 26 24 * time.Hour, // 1 day 27 7 * 24 * time.Hour, // 1 week 28 4 * 7 * 24 * time.Hour, // 4 weeks 29 16 * 7 * 24 * time.Hour, // 16 weeks 30} 31 32var minuteHourSeriesResolutions = []time.Duration{ 33 1 * time.Second, 34 1 * time.Minute, 35} 36 37// An Observable is a kind of data that can be aggregated in a time series. 38type Observable interface { 39 Multiply(ratio float64) // Multiplies the data in self by a given ratio 40 Add(other Observable) // Adds the data from a different observation to self 41 Clear() // Clears the observation so it can be reused. 42 CopyFrom(other Observable) // Copies the contents of a given observation to self 43} 44 45// Float attaches the methods of Observable to a float64. 46type Float float64 47 48// NewFloat returns a Float. 49func NewFloat() Observable { 50 f := Float(0) 51 return &f 52} 53 54// String returns the float as a string. 55func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) } 56 57// Value returns the float's value. 58func (f *Float) Value() float64 { return float64(*f) } 59 60func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) } 61 62func (f *Float) Add(other Observable) { 63 o := other.(*Float) 64 *f += *o 65} 66 67func (f *Float) Clear() { *f = 0 } 68 69func (f *Float) CopyFrom(other Observable) { 70 o := other.(*Float) 71 *f = *o 72} 73 74// A Clock tells the current time. 75type Clock interface { 76 Time() time.Time 77} 78 79type defaultClock int 80 81var defaultClockInstance defaultClock 82 83func (defaultClock) Time() time.Time { return time.Now() } 84 85// Information kept per level. Each level consists of a circular list of 86// observations. The start of the level may be derived from end and the 87// len(buckets) * sizeInMillis. 88type tsLevel struct { 89 oldest int // index to oldest bucketed Observable 90 newest int // index to newest bucketed Observable 91 end time.Time // end timestamp for this level 92 size time.Duration // duration of the bucketed Observable 93 buckets []Observable // collections of observations 94 provider func() Observable // used for creating new Observable 95} 96 97func (l *tsLevel) Clear() { 98 l.oldest = 0 99 l.newest = len(l.buckets) - 1 100 l.end = time.Time{} 101 for i := range l.buckets { 102 if l.buckets[i] != nil { 103 l.buckets[i].Clear() 104 l.buckets[i] = nil 105 } 106 } 107} 108 109func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) { 110 l.size = size 111 l.provider = f 112 l.buckets = make([]Observable, numBuckets) 113} 114 115// Keeps a sequence of levels. Each level is responsible for storing data at 116// a given resolution. For example, the first level stores data at a one 117// minute resolution while the second level stores data at a one hour 118// resolution. 119 120// Each level is represented by a sequence of buckets. Each bucket spans an 121// interval equal to the resolution of the level. New observations are added 122// to the last bucket. 123type timeSeries struct { 124 provider func() Observable // make more Observable 125 numBuckets int // number of buckets in each level 126 levels []*tsLevel // levels of bucketed Observable 127 lastAdd time.Time // time of last Observable tracked 128 total Observable // convenient aggregation of all Observable 129 clock Clock // Clock for getting current time 130 pending Observable // observations not yet bucketed 131 pendingTime time.Time // what time are we keeping in pending 132 dirty bool // if there are pending observations 133} 134 135// init initializes a level according to the supplied criteria. 136func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) { 137 ts.provider = f 138 ts.numBuckets = numBuckets 139 ts.clock = clock 140 ts.levels = make([]*tsLevel, len(resolutions)) 141 142 for i := range resolutions { 143 if i > 0 && resolutions[i-1] >= resolutions[i] { 144 log.Print("timeseries: resolutions must be monotonically increasing") 145 break 146 } 147 newLevel := new(tsLevel) 148 newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider) 149 ts.levels[i] = newLevel 150 } 151 152 ts.Clear() 153} 154 155// Clear removes all observations from the time series. 156func (ts *timeSeries) Clear() { 157 ts.lastAdd = time.Time{} 158 ts.total = ts.resetObservation(ts.total) 159 ts.pending = ts.resetObservation(ts.pending) 160 ts.pendingTime = time.Time{} 161 ts.dirty = false 162 163 for i := range ts.levels { 164 ts.levels[i].Clear() 165 } 166} 167 168// Add records an observation at the current time. 169func (ts *timeSeries) Add(observation Observable) { 170 ts.AddWithTime(observation, ts.clock.Time()) 171} 172 173// AddWithTime records an observation at the specified time. 174func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) { 175 176 smallBucketDuration := ts.levels[0].size 177 178 if t.After(ts.lastAdd) { 179 ts.lastAdd = t 180 } 181 182 if t.After(ts.pendingTime) { 183 ts.advance(t) 184 ts.mergePendingUpdates() 185 ts.pendingTime = ts.levels[0].end 186 ts.pending.CopyFrom(observation) 187 ts.dirty = true 188 } else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) { 189 // The observation is close enough to go into the pending bucket. 190 // This compensates for clock skewing and small scheduling delays 191 // by letting the update stay in the fast path. 192 ts.pending.Add(observation) 193 ts.dirty = true 194 } else { 195 ts.mergeValue(observation, t) 196 } 197} 198 199// mergeValue inserts the observation at the specified time in the past into all levels. 200func (ts *timeSeries) mergeValue(observation Observable, t time.Time) { 201 for _, level := range ts.levels { 202 index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size) 203 if 0 <= index && index < ts.numBuckets { 204 bucketNumber := (level.oldest + index) % ts.numBuckets 205 if level.buckets[bucketNumber] == nil { 206 level.buckets[bucketNumber] = level.provider() 207 } 208 level.buckets[bucketNumber].Add(observation) 209 } 210 } 211 ts.total.Add(observation) 212} 213 214// mergePendingUpdates applies the pending updates into all levels. 215func (ts *timeSeries) mergePendingUpdates() { 216 if ts.dirty { 217 ts.mergeValue(ts.pending, ts.pendingTime) 218 ts.pending = ts.resetObservation(ts.pending) 219 ts.dirty = false 220 } 221} 222 223// advance cycles the buckets at each level until the latest bucket in 224// each level can hold the time specified. 225func (ts *timeSeries) advance(t time.Time) { 226 if !t.After(ts.levels[0].end) { 227 return 228 } 229 for i := 0; i < len(ts.levels); i++ { 230 level := ts.levels[i] 231 if !level.end.Before(t) { 232 break 233 } 234 235 // If the time is sufficiently far, just clear the level and advance 236 // directly. 237 if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) { 238 for _, b := range level.buckets { 239 ts.resetObservation(b) 240 } 241 level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds()) 242 } 243 244 for t.After(level.end) { 245 level.end = level.end.Add(level.size) 246 level.newest = level.oldest 247 level.oldest = (level.oldest + 1) % ts.numBuckets 248 ts.resetObservation(level.buckets[level.newest]) 249 } 250 251 t = level.end 252 } 253} 254 255// Latest returns the sum of the num latest buckets from the level. 256func (ts *timeSeries) Latest(level, num int) Observable { 257 now := ts.clock.Time() 258 if ts.levels[0].end.Before(now) { 259 ts.advance(now) 260 } 261 262 ts.mergePendingUpdates() 263 264 result := ts.provider() 265 l := ts.levels[level] 266 index := l.newest 267 268 for i := 0; i < num; i++ { 269 if l.buckets[index] != nil { 270 result.Add(l.buckets[index]) 271 } 272 if index == 0 { 273 index = ts.numBuckets 274 } 275 index-- 276 } 277 278 return result 279} 280 281// LatestBuckets returns a copy of the num latest buckets from level. 282func (ts *timeSeries) LatestBuckets(level, num int) []Observable { 283 if level < 0 || level > len(ts.levels) { 284 log.Print("timeseries: bad level argument: ", level) 285 return nil 286 } 287 if num < 0 || num >= ts.numBuckets { 288 log.Print("timeseries: bad num argument: ", num) 289 return nil 290 } 291 292 results := make([]Observable, num) 293 now := ts.clock.Time() 294 if ts.levels[0].end.Before(now) { 295 ts.advance(now) 296 } 297 298 ts.mergePendingUpdates() 299 300 l := ts.levels[level] 301 index := l.newest 302 303 for i := 0; i < num; i++ { 304 result := ts.provider() 305 results[i] = result 306 if l.buckets[index] != nil { 307 result.CopyFrom(l.buckets[index]) 308 } 309 310 if index == 0 { 311 index = ts.numBuckets 312 } 313 index -= 1 314 } 315 return results 316} 317 318// ScaleBy updates observations by scaling by factor. 319func (ts *timeSeries) ScaleBy(factor float64) { 320 for _, l := range ts.levels { 321 for i := 0; i < ts.numBuckets; i++ { 322 l.buckets[i].Multiply(factor) 323 } 324 } 325 326 ts.total.Multiply(factor) 327 ts.pending.Multiply(factor) 328} 329 330// Range returns the sum of observations added over the specified time range. 331// If start or finish times don't fall on bucket boundaries of the same 332// level, then return values are approximate answers. 333func (ts *timeSeries) Range(start, finish time.Time) Observable { 334 return ts.ComputeRange(start, finish, 1)[0] 335} 336 337// Recent returns the sum of observations from the last delta. 338func (ts *timeSeries) Recent(delta time.Duration) Observable { 339 now := ts.clock.Time() 340 return ts.Range(now.Add(-delta), now) 341} 342 343// Total returns the total of all observations. 344func (ts *timeSeries) Total() Observable { 345 ts.mergePendingUpdates() 346 return ts.total 347} 348 349// ComputeRange computes a specified number of values into a slice using 350// the observations recorded over the specified time period. The return 351// values are approximate if the start or finish times don't fall on the 352// bucket boundaries at the same level or if the number of buckets spanning 353// the range is not an integral multiple of num. 354func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable { 355 if start.After(finish) { 356 log.Printf("timeseries: start > finish, %v>%v", start, finish) 357 return nil 358 } 359 360 if num < 0 { 361 log.Printf("timeseries: num < 0, %v", num) 362 return nil 363 } 364 365 results := make([]Observable, num) 366 367 for _, l := range ts.levels { 368 if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) { 369 ts.extract(l, start, finish, num, results) 370 return results 371 } 372 } 373 374 // Failed to find a level that covers the desired range. So just 375 // extract from the last level, even if it doesn't cover the entire 376 // desired range. 377 ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results) 378 379 return results 380} 381 382// RecentList returns the specified number of values in slice over the most 383// recent time period of the specified range. 384func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable { 385 if delta < 0 { 386 return nil 387 } 388 now := ts.clock.Time() 389 return ts.ComputeRange(now.Add(-delta), now, num) 390} 391 392// extract returns a slice of specified number of observations from a given 393// level over a given range. 394func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) { 395 ts.mergePendingUpdates() 396 397 srcInterval := l.size 398 dstInterval := finish.Sub(start) / time.Duration(num) 399 dstStart := start 400 srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets)) 401 402 srcIndex := 0 403 404 // Where should scanning start? 405 if dstStart.After(srcStart) { 406 advance := dstStart.Sub(srcStart) / srcInterval 407 srcIndex += int(advance) 408 srcStart = srcStart.Add(advance * srcInterval) 409 } 410 411 // The i'th value is computed as show below. 412 // interval = (finish/start)/num 413 // i'th value = sum of observation in range 414 // [ start + i * interval, 415 // start + (i + 1) * interval ) 416 for i := 0; i < num; i++ { 417 results[i] = ts.resetObservation(results[i]) 418 dstEnd := dstStart.Add(dstInterval) 419 for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) { 420 srcEnd := srcStart.Add(srcInterval) 421 if srcEnd.After(ts.lastAdd) { 422 srcEnd = ts.lastAdd 423 } 424 425 if !srcEnd.Before(dstStart) { 426 srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets] 427 if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) { 428 // dst completely contains src. 429 if srcValue != nil { 430 results[i].Add(srcValue) 431 } 432 } else { 433 // dst partially overlaps src. 434 overlapStart := maxTime(srcStart, dstStart) 435 overlapEnd := minTime(srcEnd, dstEnd) 436 base := srcEnd.Sub(srcStart) 437 fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds() 438 439 used := ts.provider() 440 if srcValue != nil { 441 used.CopyFrom(srcValue) 442 } 443 used.Multiply(fraction) 444 results[i].Add(used) 445 } 446 447 if srcEnd.After(dstEnd) { 448 break 449 } 450 } 451 srcIndex++ 452 srcStart = srcStart.Add(srcInterval) 453 } 454 dstStart = dstStart.Add(dstInterval) 455 } 456} 457 458// resetObservation clears the content so the struct may be reused. 459func (ts *timeSeries) resetObservation(observation Observable) Observable { 460 if observation == nil { 461 observation = ts.provider() 462 } else { 463 observation.Clear() 464 } 465 return observation 466} 467 468// TimeSeries tracks data at granularities from 1 second to 16 weeks. 469type TimeSeries struct { 470 timeSeries 471} 472 473// NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable. 474func NewTimeSeries(f func() Observable) *TimeSeries { 475 return NewTimeSeriesWithClock(f, defaultClockInstance) 476} 477 478// NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for 479// assigning timestamps. 480func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries { 481 ts := new(TimeSeries) 482 ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock) 483 return ts 484} 485 486// MinuteHourSeries tracks data at granularities of 1 minute and 1 hour. 487type MinuteHourSeries struct { 488 timeSeries 489} 490 491// NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable. 492func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries { 493 return NewMinuteHourSeriesWithClock(f, defaultClockInstance) 494} 495 496// NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for 497// assigning timestamps. 498func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries { 499 ts := new(MinuteHourSeries) 500 ts.timeSeries.init(minuteHourSeriesResolutions, f, 501 minuteHourSeriesNumBuckets, clock) 502 return ts 503} 504 505func (ts *MinuteHourSeries) Minute() Observable { 506 return ts.timeSeries.Latest(0, 60) 507} 508 509func (ts *MinuteHourSeries) Hour() Observable { 510 return ts.timeSeries.Latest(1, 60) 511} 512 513func minTime(a, b time.Time) time.Time { 514 if a.Before(b) { 515 return a 516 } 517 return b 518} 519 520func maxTime(a, b time.Time) time.Time { 521 if a.After(b) { 522 return a 523 } 524 return b 525} 526