• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5'use strict';
6
7/**
8 * Namespace for async utility functions.
9 */
10var AsyncUtil = {};
11
12/**
13 * Asynchronous version of Array.forEach.
14 * This executes a provided function callback once per array element, then
15 * run completionCallback to notify the completion.
16 * The callback can be an asynchronous function, but the execution is
17 * sequentially done.
18 *
19 * @param {Array.<T>} array The array to be iterated.
20 * @param {function(function(), T, number, Array.<T>} callback The iteration
21 *     callback. The first argument is a callback to notify the completion of
22 *     the iteration.
23 * @param {function()} completionCallback Called when all iterations are
24 *     completed.
25 * @param {Object=} opt_thisObject Bound to callback if given.
26 * @template T
27 */
28AsyncUtil.forEach = function(
29    array, callback, completionCallback, opt_thisObject) {
30  if (opt_thisObject)
31    callback = callback.bind(opt_thisObject);
32
33  var queue = new AsyncUtil.Queue();
34  for (var i = 0; i < array.length; i++) {
35    queue.run(function(element, index, iterationCompletionCallback) {
36      callback(iterationCompletionCallback, element, index, array);
37    }.bind(null, array[i], i));
38  }
39  queue.run(function(iterationCompletionCallback) {
40    completionCallback();  // Don't pass iteration completion callback.
41  });
42};
43
44/**
45 * Creates a class for executing several asynchronous closures in a fifo queue.
46 * Added tasks will be started in order they were added. Tasks are run
47 * concurrently. At most, |limit| jobs will be run at the same time.
48 *
49 * @param {number} limit The number of jobs to run at the same time.
50 * @constructor
51 */
52AsyncUtil.ConcurrentQueue = function(limit) {
53  console.assert(limit > 0, '|limit| must be larger than 0');
54
55  this.limit_ = limit;
56  this.addedTasks_ = [];
57  this.pendingTasks_ = [];
58  this.isCancelled_ = false;
59
60  Object.seal(this);
61};
62
63/**
64 * @return {boolean} True when a task is running, otherwise false.
65 */
66AsyncUtil.ConcurrentQueue.prototype.isRunning = function() {
67  return this.pendingTasks_.length !== 0;
68};
69
70/**
71 * @return {number} Number of waiting tasks.
72 */
73AsyncUtil.ConcurrentQueue.prototype.getWaitingTasksCount = function() {
74  return this.addedTasks_.length;
75};
76
77/**
78 * @return {boolean} Number of running tasks.
79 */
80AsyncUtil.ConcurrentQueue.prototype.getRunningTasksCount = function() {
81  return this.pendingTasks_.length;
82};
83
84/**
85 * Enqueues a closure to be executed.
86 * @param {function(function())} closure Closure with a completion
87 *     callback to be executed.
88 */
89AsyncUtil.ConcurrentQueue.prototype.run = function(closure) {
90  if (this.isCancelled_) {
91    console.error('Queue is calcelled. Cannot add a new task.');
92    return;
93  }
94
95  this.addedTasks_.push(closure);
96  this.continue_();
97};
98
99/**
100 * Cancels the queue. It removes all the not-run (yet) tasks. Note that this
101 * does NOT stop tasks currently running.
102 */
103AsyncUtil.ConcurrentQueue.prototype.cancel = function() {
104  this.isCancelled_ = true;
105  this.addedTasks_ = [];
106};
107
108/**
109 * @return {boolean} True when the queue have been requested to cancel or is
110 *      already cancelled. Otherwise false.
111 */
112AsyncUtil.ConcurrentQueue.prototype.isCancelled = function() {
113  return this.isCancelled_;
114};
115
116/**
117 * Runs the next tasks if available.
118 * @private
119 */
120AsyncUtil.ConcurrentQueue.prototype.continue_ = function() {
121  if (this.addedTasks_.length === 0)
122    return;
123
124  console.assert(
125      this.pendingTasks_.length <= this.limit_,
126      'Too many jobs are running (' + this.pendingTasks_.length + ')');
127
128  if (this.pendingTasks_.length >= this.limit_)
129    return;
130
131  // Run the next closure.
132  var closure = this.addedTasks_.shift();
133  this.pendingTasks_.push(closure);
134  closure(this.onTaskFinished_.bind(this, closure));
135
136  this.continue_();
137};
138
139/**
140 * Called when a task is finished. Removes the tasks from pending task list.
141 * @param {function()} closure Finished task, which has been bound in
142 *     |continue_|.
143 * @private
144 */
145AsyncUtil.ConcurrentQueue.prototype.onTaskFinished_ = function(closure) {
146  var index = this.pendingTasks_.indexOf(closure);
147  console.assert(index >= 0, 'Invalid task is finished');
148  this.pendingTasks_.splice(index, 1);
149
150  this.continue_();
151};
152
153/**
154 * Creates a class for executing several asynchronous closures in a fifo queue.
155 * Added tasks will be executed sequentially in order they were added.
156 *
157 * @constructor
158 * @extends {AsyncUtil.ConcurrentQueue}
159 */
160AsyncUtil.Queue = function() {
161  AsyncUtil.ConcurrentQueue.call(this, 1);
162};
163
164AsyncUtil.Queue.prototype.__proto__ = AsyncUtil.ConcurrentQueue.prototype;
165
166/**
167 * Creates a class for executing several asynchronous closures in a group in
168 * a dependency order.
169 *
170 * @constructor
171 */
172AsyncUtil.Group = function() {
173  this.addedTasks_ = {};
174  this.pendingTasks_ = {};
175  this.finishedTasks_ = {};
176  this.completionCallbacks_ = [];
177};
178
179/**
180 * Enqueues a closure to be executed after dependencies are completed.
181 *
182 * @param {function(function())} closure Closure with a completion callback to
183 *     be executed.
184 * @param {Array.<string>=} opt_dependencies Array of dependencies. If no
185 *     dependencies, then the the closure will be executed immediately.
186 * @param {string=} opt_name Task identifier. Specify to use in dependencies.
187 */
188AsyncUtil.Group.prototype.add = function(closure, opt_dependencies, opt_name) {
189  var length = Object.keys(this.addedTasks_).length;
190  var name = opt_name || ('(unnamed#' + (length + 1) + ')');
191
192  var task = {
193    closure: closure,
194    dependencies: opt_dependencies || [],
195    name: name
196  };
197
198  this.addedTasks_[name] = task;
199  this.pendingTasks_[name] = task;
200};
201
202/**
203 * Runs the enqueued closured in order of dependencies.
204 *
205 * @param {function()=} opt_onCompletion Completion callback.
206 */
207AsyncUtil.Group.prototype.run = function(opt_onCompletion) {
208  if (opt_onCompletion)
209    this.completionCallbacks_.push(opt_onCompletion);
210  this.continue_();
211};
212
213/**
214 * Runs enqueued pending tasks whose dependencies are completed.
215 * @private
216 */
217AsyncUtil.Group.prototype.continue_ = function() {
218  // If all of the added tasks have finished, then call completion callbacks.
219  if (Object.keys(this.addedTasks_).length ==
220      Object.keys(this.finishedTasks_).length) {
221    for (var index = 0; index < this.completionCallbacks_.length; index++) {
222      var callback = this.completionCallbacks_[index];
223      callback();
224    }
225    this.completionCallbacks_ = [];
226    return;
227  }
228
229  for (var name in this.pendingTasks_) {
230    var task = this.pendingTasks_[name];
231    var dependencyMissing = false;
232    for (var index = 0; index < task.dependencies.length; index++) {
233      var dependency = task.dependencies[index];
234      // Check if the dependency has finished.
235      if (!this.finishedTasks_[dependency])
236        dependencyMissing = true;
237    }
238    // All dependences finished, therefore start the task.
239    if (!dependencyMissing) {
240      delete this.pendingTasks_[task.name];
241      task.closure(this.finish_.bind(this, task));
242    }
243  }
244};
245
246/**
247 * Finishes the passed task and continues executing enqueued closures.
248 *
249 * @param {Object} task Task object.
250 * @private
251 */
252AsyncUtil.Group.prototype.finish_ = function(task) {
253  this.finishedTasks_[task.name] = task;
254  this.continue_();
255};
256
257/**
258 * Samples calls so that they are not called too frequently.
259 * The first call is always called immediately, and the following calls may
260 * be skipped or delayed to keep each interval no less than |minInterval_|.
261 *
262 * @param {function()} closure Closure to be called.
263 * @param {number=} opt_minInterval Minimum interval between each call in
264 *     milliseconds. Default is 200 milliseconds.
265 * @constructor
266 */
267AsyncUtil.RateLimiter = function(closure, opt_minInterval) {
268  /**
269   * @type {function()}
270   * @private
271   */
272  this.closure_ = closure;
273
274  /**
275   * @type {number}
276   * @private
277   */
278  this.minInterval_ = opt_minInterval || 200;
279
280  /**
281   * @type {number}
282   * @private
283   */
284  this.scheduledRunsTimer_ = 0;
285
286  /**
287   * This variable remembers the last time the closure is called.
288   * @type {number}
289   * @private
290   */
291  this.lastRunTime_ = 0;
292
293  Object.seal(this);
294};
295
296/**
297 * Requests to run the closure.
298 * Skips or delays calls so that the intervals between calls are no less than
299 * |minInteval_| milliseconds.
300 */
301AsyncUtil.RateLimiter.prototype.run = function() {
302  var now = Date.now();
303  // If |minInterval| has not passed since the closure is run, skips or delays
304  // this run.
305  if (now - this.lastRunTime_ < this.minInterval_) {
306    // Delays this run only when there is no scheduled run.
307    // Otherwise, simply skip this run.
308    if (!this.scheduledRunsTimer_) {
309      this.scheduledRunsTimer_ = setTimeout(
310          this.runImmediately.bind(this),
311          this.lastRunTime_ + this.minInterval_ - now);
312    }
313    return;
314  }
315
316  // Otherwise, run immediately
317  this.runImmediately();
318};
319
320/**
321 * Calls the scheduled run immediately and cancels any scheduled calls.
322 */
323AsyncUtil.RateLimiter.prototype.runImmediately = function() {
324  this.cancelScheduledRuns_();
325  this.closure_();
326  this.lastRunTime_ = Date.now();
327};
328
329/**
330 * Cancels all scheduled runs (if any).
331 * @private
332 */
333AsyncUtil.RateLimiter.prototype.cancelScheduledRuns_ = function() {
334  if (this.scheduledRunsTimer_) {
335    clearTimeout(this.scheduledRunsTimer_);
336    this.scheduledRunsTimer_ = 0;
337  }
338};
339