• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5'use strict';
6
7/**
8 * Namespace for async utility functions.
9 */
10var AsyncUtil = {};
11
12/**
13 * Asynchronous version of Array.forEach.
14 * This executes a provided function callback once per array element, then
15 * run completionCallback to notify the completion.
16 * The callback can be an asynchronous function, but the execution is
17 * sequentially done.
18 *
19 * @param {Array.<T>} array The array to be iterated.
20 * @param {function(function(), T, number, Array.<T>} callback The iteration
21 *     callback. The first argument is a callback to notify the completion of
22 *     the iteration.
23 * @param {function()} completionCallback Called when all iterations are
24 *     completed.
25 * @param {Object=} opt_thisObject Bound to callback if given.
26 * @template T
27 */
28AsyncUtil.forEach = function(
29    array, callback, completionCallback, opt_thisObject) {
30  if (opt_thisObject)
31    callback = callback.bind(opt_thisObject);
32
33  var queue = new AsyncUtil.Queue();
34  for (var i = 0; i < array.length; i++) {
35    queue.run(function(element, index, iterationCompletionCallback) {
36      callback(iterationCompletionCallback, element, index, array);
37    }.bind(null, array[i], i));
38  }
39  queue.run(function(iterationCompletionCallback) {
40    completionCallback();  // Don't pass iteration completion callback.
41  });
42};
43
44/**
45 * Creates a class for executing several asynchronous closures in a fifo queue.
46 * Added tasks will be started in order they were added. Tasks are run
47 * concurrently. At most, |limit| jobs will be run at the same time.
48 *
49 * @param {number} limit The number of jobs to run at the same time.
50 * @constructor
51 */
52AsyncUtil.ConcurrentQueue = function(limit) {
53  console.assert(limit > 0, '|limit| must be larger than 0');
54
55  this.limit_ = limit;
56  this.addedTasks_ = [];
57  this.pendingTasks_ = [];
58  this.isCancelled_ = false;
59
60  Object.seal(this);
61};
62
63/**
64 * @return {boolean} True when a task is running, otherwise false.
65 */
66AsyncUtil.ConcurrentQueue.prototype.isRunning = function() {
67  return this.pendingTasks_.length !== 0;
68};
69
70/**
71 * @return {number} Number of waiting tasks.
72 */
73AsyncUtil.ConcurrentQueue.prototype.getWaitingTasksCount = function() {
74  return this.addedTasks_.length;
75};
76
77/**
78 * @return {boolean} Number of running tasks.
79 */
80AsyncUtil.ConcurrentQueue.prototype.getRunningTasksCount = function() {
81  return this.pendingTasks_.length;
82};
83
84/**
85 * Enqueues a closure to be executed.
86 * @param {function(function())} closure Closure with a completion
87 *     callback to be executed.
88 */
89AsyncUtil.ConcurrentQueue.prototype.run = function(closure) {
90  if (this.isCancelled_) {
91    console.error('Queue is calcelled. Cannot add a new task.');
92    return;
93  }
94
95  this.addedTasks_.push(closure);
96  this.continue_();
97};
98
99/**
100 * Cancels the queue. It removes all the not-run (yet) tasks. Note that this
101 * does NOT stop tasks currently running.
102 */
103AsyncUtil.ConcurrentQueue.prototype.cancel = function() {
104  this.isCancelled_ = true;
105  this.addedTasks_ = [];
106};
107
108/**
109 * @return {boolean} True when the queue have been requested to cancel or is
110 *      already cancelled. Otherwise false.
111 */
112AsyncUtil.ConcurrentQueue.prototype.isCancelled = function() {
113  return this.isCancelled_;
114};
115
116/**
117 * Runs the next tasks if available.
118 * @private
119 */
120AsyncUtil.ConcurrentQueue.prototype.continue_ = function() {
121  if (this.addedTasks_.length === 0)
122    return;
123
124  console.assert(
125      this.pendingTasks_.length <= this.limit_,
126      'Too many jobs are running (' + this.pendingTasks_.length + ')');
127
128  if (this.pendingTasks_.length >= this.limit_)
129    return;
130
131  // Run the next closure.
132  var closure = this.addedTasks_.shift();
133  this.pendingTasks_.push(closure);
134  closure(this.onTaskFinished_.bind(this, closure));
135
136  this.continue_();
137};
138
139/**
140 * Called when a task is finished. Removes the tasks from pending task list.
141 * @param {function()} closure Finished task, which has been bound in
142 *     |continue_|.
143 * @private
144 */
145AsyncUtil.ConcurrentQueue.prototype.onTaskFinished_ = function(closure) {
146  var index = this.pendingTasks_.indexOf(closure);
147  console.assert(index >= 0, 'Invalid task is finished');
148  this.pendingTasks_.splice(index, 1);
149
150  this.continue_();
151};
152
153/**
154 * Creates a class for executing several asynchronous closures in a fifo queue.
155 * Added tasks will be executed sequentially in order they were added.
156 *
157 * @constructor
158 * @extends {AsyncUtil.ConcurrentQueue}
159 */
160AsyncUtil.Queue = function() {
161  AsyncUtil.ConcurrentQueue.call(this, 1);
162};
163
164AsyncUtil.Queue.prototype = {
165  __proto__: AsyncUtil.ConcurrentQueue.prototype
166};
167
168/**
169 * Creates a class for executing several asynchronous closures in a group in
170 * a dependency order.
171 *
172 * @constructor
173 */
174AsyncUtil.Group = function() {
175  this.addedTasks_ = {};
176  this.pendingTasks_ = {};
177  this.finishedTasks_ = {};
178  this.completionCallbacks_ = [];
179};
180
181/**
182 * Enqueues a closure to be executed after dependencies are completed.
183 *
184 * @param {function(function())} closure Closure with a completion callback to
185 *     be executed.
186 * @param {Array.<string>=} opt_dependencies Array of dependencies. If no
187 *     dependencies, then the the closure will be executed immediately.
188 * @param {string=} opt_name Task identifier. Specify to use in dependencies.
189 */
190AsyncUtil.Group.prototype.add = function(closure, opt_dependencies, opt_name) {
191  var length = Object.keys(this.addedTasks_).length;
192  var name = opt_name || ('(unnamed#' + (length + 1) + ')');
193
194  var task = {
195    closure: closure,
196    dependencies: opt_dependencies || [],
197    name: name
198  };
199
200  this.addedTasks_[name] = task;
201  this.pendingTasks_[name] = task;
202};
203
204/**
205 * Runs the enqueued closured in order of dependencies.
206 *
207 * @param {function()=} opt_onCompletion Completion callback.
208 */
209AsyncUtil.Group.prototype.run = function(opt_onCompletion) {
210  if (opt_onCompletion)
211    this.completionCallbacks_.push(opt_onCompletion);
212  this.continue_();
213};
214
215/**
216 * Runs enqueued pending tasks whose dependencies are completed.
217 * @private
218 */
219AsyncUtil.Group.prototype.continue_ = function() {
220  // If all of the added tasks have finished, then call completion callbacks.
221  if (Object.keys(this.addedTasks_).length ==
222      Object.keys(this.finishedTasks_).length) {
223    for (var index = 0; index < this.completionCallbacks_.length; index++) {
224      var callback = this.completionCallbacks_[index];
225      callback();
226    }
227    this.completionCallbacks_ = [];
228    return;
229  }
230
231  for (var name in this.pendingTasks_) {
232    var task = this.pendingTasks_[name];
233    var dependencyMissing = false;
234    for (var index = 0; index < task.dependencies.length; index++) {
235      var dependency = task.dependencies[index];
236      // Check if the dependency has finished.
237      if (!this.finishedTasks_[dependency])
238        dependencyMissing = true;
239    }
240    // All dependences finished, therefore start the task.
241    if (!dependencyMissing) {
242      delete this.pendingTasks_[task.name];
243      task.closure(this.finish_.bind(this, task));
244    }
245  }
246};
247
248/**
249 * Finishes the passed task and continues executing enqueued closures.
250 *
251 * @param {Object} task Task object.
252 * @private
253 */
254AsyncUtil.Group.prototype.finish_ = function(task) {
255  this.finishedTasks_[task.name] = task;
256  this.continue_();
257};
258
259/**
260 * Aggregates consecutive calls and executes the closure only once instead of
261 * several times. The first call is always called immediately, and the next
262 * consecutive ones are aggregated and the closure is called only once once
263 * |delay| amount of time passes after the last call to run().
264 *
265 * @param {function()} closure Closure to be aggregated.
266 * @param {number=} opt_delay Minimum aggregation time in milliseconds. Default
267 *     is 50 milliseconds.
268 * @constructor
269 */
270AsyncUtil.Aggregator = function(closure, opt_delay) {
271  /**
272   * @type {number}
273   * @private
274   */
275  this.delay_ = opt_delay || 50;
276
277  /**
278   * @type {function()}
279   * @private
280   */
281  this.closure_ = closure;
282
283  /**
284   * @type {number?}
285   * @private
286   */
287  this.scheduledRunsTimer_ = null;
288
289  /**
290   * @type {number}
291   * @private
292   */
293  this.lastRunTime_ = 0;
294};
295
296/**
297 * Runs a closure. Skips consecutive calls. The first call is called
298 * immediately.
299 */
300AsyncUtil.Aggregator.prototype.run = function() {
301  // If recently called, then schedule the consecutive call with a delay.
302  if (Date.now() - this.lastRunTime_ < this.delay_) {
303    this.cancelScheduledRuns_();
304    this.scheduledRunsTimer_ = setTimeout(this.runImmediately_.bind(this),
305                                          this.delay_ + 1);
306    this.lastRunTime_ = Date.now();
307    return;
308  }
309
310  // Otherwise, run immediately.
311  this.runImmediately_();
312};
313
314/**
315 * Calls the schedule immediately and cancels any scheduled calls.
316 * @private
317 */
318AsyncUtil.Aggregator.prototype.runImmediately_ = function() {
319  this.cancelScheduledRuns_();
320  this.closure_();
321  this.lastRunTime_ = Date.now();
322};
323
324/**
325 * Cancels all scheduled runs (if any).
326 * @private
327 */
328AsyncUtil.Aggregator.prototype.cancelScheduledRuns_ = function() {
329  if (this.scheduledRunsTimer_) {
330    clearTimeout(this.scheduledRunsTimer_);
331    this.scheduledRunsTimer_ = null;
332  }
333};
334
335/**
336 * Samples calls so that they are not called too frequently.
337 * The first call is always called immediately, and the following calls may
338 * be skipped or delayed to keep each interval no less than |minInterval_|.
339 *
340 * @param {function()} closure Closure to be called.
341 * @param {number=} opt_minInterval Minimum interval between each call in
342 *     milliseconds. Default is 200 milliseconds.
343 * @constructor
344 */
345AsyncUtil.RateLimiter = function(closure, opt_minInterval) {
346  /**
347   * @type {function()}
348   * @private
349   */
350  this.closure_ = closure;
351
352  /**
353   * @type {number}
354   * @private
355   */
356  this.minInterval_ = opt_minInterval || 200;
357
358  /**
359   * @type {number}
360   * @private
361   */
362  this.scheduledRunsTimer_ = 0;
363
364  /**
365   * This variable remembers the last time the closure is called.
366   * @type {number}
367   * @private
368   */
369  this.lastRunTime_ = 0;
370
371  Object.seal(this);
372};
373
374/**
375 * Requests to run the closure.
376 * Skips or delays calls so that the intervals between calls are no less than
377 * |minInteval_| milliseconds.
378 */
379AsyncUtil.RateLimiter.prototype.run = function() {
380  var now = Date.now();
381  // If |minInterval| has not passed since the closure is run, skips or delays
382  // this run.
383  if (now - this.lastRunTime_ < this.minInterval_) {
384    // Delays this run only when there is no scheduled run.
385    // Otherwise, simply skip this run.
386    if (!this.scheduledRunsTimer_) {
387      this.scheduledRunsTimer_ = setTimeout(
388          this.runImmediately.bind(this),
389          this.lastRunTime_ + this.minInterval_ - now);
390    }
391    return;
392  }
393
394  // Otherwise, run immediately
395  this.runImmediately();
396};
397
398/**
399 * Calls the scheduled run immediately and cancels any scheduled calls.
400 */
401AsyncUtil.RateLimiter.prototype.runImmediately = function() {
402  this.cancelScheduledRuns_();
403  this.closure_();
404  this.lastRunTime_ = Date.now();
405};
406
407/**
408 * Cancels all scheduled runs (if any).
409 * @private
410 */
411AsyncUtil.RateLimiter.prototype.cancelScheduledRuns_ = function() {
412  if (this.scheduledRunsTimer_) {
413    clearTimeout(this.scheduledRunsTimer_);
414    this.scheduledRunsTimer_ = 0;
415  }
416};
417