• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
18	"runtime"
19)
20
21type RateLimit struct {
22	requests chan struct{}
23	finished chan int
24	released chan int
25	stop     chan struct{}
26}
27
28// NewRateLimit starts a new rate limiter with maxExecs number of executions
29// allowed to happen at a time. If maxExecs is <= 0, it will default to the
30// number of logical CPUs on the system.
31//
32// With Finish and Release, we'll keep track of outstanding buffer sizes to be
33// written. If that size goes above maxMem, we'll prevent starting new
34// executions.
35//
36// The total memory use may be higher due to current executions. This just
37// prevents runaway memory use due to slower writes.
38func NewRateLimit(maxExecs int, maxMem int64) *RateLimit {
39	if maxExecs <= 0 {
40		maxExecs = runtime.NumCPU()
41	}
42	if maxMem <= 0 {
43		// Default to 512MB
44		maxMem = 512 * 1024 * 1024
45	}
46
47	ret := &RateLimit{
48		requests: make(chan struct{}),
49
50		// Let all of the pending executions to mark themselves as finished,
51		// even if our goroutine isn't processing input.
52		finished: make(chan int, maxExecs),
53
54		released: make(chan int),
55		stop:     make(chan struct{}),
56	}
57
58	go ret.goFunc(maxExecs, maxMem)
59
60	return ret
61}
62
63// RequestExecution blocks until another execution can be allowed to run.
64func (r *RateLimit) RequestExecution() Execution {
65	<-r.requests
66	return r.finished
67}
68
69type Execution chan<- int
70
71// Finish will mark your execution as finished, and allow another request to be
72// approved.
73//
74// bufferSize may be specified to count memory buffer sizes, and must be
75// matched with calls to RateLimit.Release to mark the buffers as released.
76func (e Execution) Finish(bufferSize int) {
77	e <- bufferSize
78}
79
80// Call Release when finished with a buffer recorded with Finish.
81func (r *RateLimit) Release(bufferSize int) {
82	r.released <- bufferSize
83}
84
85// Stop the background goroutine
86func (r *RateLimit) Stop() {
87	close(r.stop)
88}
89
90func (r *RateLimit) goFunc(maxExecs int, maxMem int64) {
91	var curExecs int
92	var curMemory int64
93
94	for {
95		var requests chan struct{}
96		if curExecs < maxExecs && curMemory < maxMem {
97			requests = r.requests
98		}
99
100		select {
101		case requests <- struct{}{}:
102			curExecs++
103		case amount := <-r.finished:
104			curExecs--
105			curMemory += int64(amount)
106			if curExecs < 0 {
107				panic("curExecs < 0")
108			}
109		case amount := <-r.released:
110			curMemory -= int64(amount)
111		case <-r.stop:
112			return
113		}
114	}
115}
116