1// Copyright (c) 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5'use strict'; 6 7/** 8 * Namespace for async utility functions. 9 */ 10var AsyncUtil = {}; 11 12/** 13 * Asynchronous version of Array.forEach. 14 * This executes a provided function callback once per array element, then 15 * run completionCallback to notify the completion. 16 * The callback can be an asynchronous function, but the execution is 17 * sequentially done. 18 * 19 * @param {Array.<T>} array The array to be iterated. 20 * @param {function(function(), T, number, Array.<T>} callback The iteration 21 * callback. The first argument is a callback to notify the completion of 22 * the iteration. 23 * @param {function()} completionCallback Called when all iterations are 24 * completed. 25 * @param {Object=} opt_thisObject Bound to callback if given. 26 * @template T 27 */ 28AsyncUtil.forEach = function( 29 array, callback, completionCallback, opt_thisObject) { 30 if (opt_thisObject) 31 callback = callback.bind(opt_thisObject); 32 33 var queue = new AsyncUtil.Queue(); 34 for (var i = 0; i < array.length; i++) { 35 queue.run(function(element, index, iterationCompletionCallback) { 36 callback(iterationCompletionCallback, element, index, array); 37 }.bind(null, array[i], i)); 38 } 39 queue.run(function(iterationCompletionCallback) { 40 completionCallback(); // Don't pass iteration completion callback. 41 }); 42}; 43 44/** 45 * Creates a class for executing several asynchronous closures in a fifo queue. 46 * Added tasks will be started in order they were added. Tasks are run 47 * concurrently. At most, |limit| jobs will be run at the same time. 48 * 49 * @param {number} limit The number of jobs to run at the same time. 50 * @constructor 51 */ 52AsyncUtil.ConcurrentQueue = function(limit) { 53 console.assert(limit > 0, '|limit| must be larger than 0'); 54 55 this.limit_ = limit; 56 this.addedTasks_ = []; 57 this.pendingTasks_ = []; 58 this.isCancelled_ = false; 59 60 Object.seal(this); 61}; 62 63/** 64 * @return {boolean} True when a task is running, otherwise false. 65 */ 66AsyncUtil.ConcurrentQueue.prototype.isRunning = function() { 67 return this.pendingTasks_.length !== 0; 68}; 69 70/** 71 * @return {number} Number of waiting tasks. 72 */ 73AsyncUtil.ConcurrentQueue.prototype.getWaitingTasksCount = function() { 74 return this.addedTasks_.length; 75}; 76 77/** 78 * @return {boolean} Number of running tasks. 79 */ 80AsyncUtil.ConcurrentQueue.prototype.getRunningTasksCount = function() { 81 return this.pendingTasks_.length; 82}; 83 84/** 85 * Enqueues a closure to be executed. 86 * @param {function(function())} closure Closure with a completion 87 * callback to be executed. 88 */ 89AsyncUtil.ConcurrentQueue.prototype.run = function(closure) { 90 if (this.isCancelled_) { 91 console.error('Queue is calcelled. Cannot add a new task.'); 92 return; 93 } 94 95 this.addedTasks_.push(closure); 96 this.continue_(); 97}; 98 99/** 100 * Cancels the queue. It removes all the not-run (yet) tasks. Note that this 101 * does NOT stop tasks currently running. 102 */ 103AsyncUtil.ConcurrentQueue.prototype.cancel = function() { 104 this.isCancelled_ = true; 105 this.addedTasks_ = []; 106}; 107 108/** 109 * @return {boolean} True when the queue have been requested to cancel or is 110 * already cancelled. Otherwise false. 111 */ 112AsyncUtil.ConcurrentQueue.prototype.isCancelled = function() { 113 return this.isCancelled_; 114}; 115 116/** 117 * Runs the next tasks if available. 118 * @private 119 */ 120AsyncUtil.ConcurrentQueue.prototype.continue_ = function() { 121 if (this.addedTasks_.length === 0) 122 return; 123 124 console.assert( 125 this.pendingTasks_.length <= this.limit_, 126 'Too many jobs are running (' + this.pendingTasks_.length + ')'); 127 128 if (this.pendingTasks_.length >= this.limit_) 129 return; 130 131 // Run the next closure. 132 var closure = this.addedTasks_.shift(); 133 this.pendingTasks_.push(closure); 134 closure(this.onTaskFinished_.bind(this, closure)); 135 136 this.continue_(); 137}; 138 139/** 140 * Called when a task is finished. Removes the tasks from pending task list. 141 * @param {function()} closure Finished task, which has been bound in 142 * |continue_|. 143 * @private 144 */ 145AsyncUtil.ConcurrentQueue.prototype.onTaskFinished_ = function(closure) { 146 var index = this.pendingTasks_.indexOf(closure); 147 console.assert(index >= 0, 'Invalid task is finished'); 148 this.pendingTasks_.splice(index, 1); 149 150 this.continue_(); 151}; 152 153/** 154 * Creates a class for executing several asynchronous closures in a fifo queue. 155 * Added tasks will be executed sequentially in order they were added. 156 * 157 * @constructor 158 * @extends {AsyncUtil.ConcurrentQueue} 159 */ 160AsyncUtil.Queue = function() { 161 AsyncUtil.ConcurrentQueue.call(this, 1); 162}; 163 164AsyncUtil.Queue.prototype.__proto__ = AsyncUtil.ConcurrentQueue.prototype; 165 166/** 167 * Creates a class for executing several asynchronous closures in a group in 168 * a dependency order. 169 * 170 * @constructor 171 */ 172AsyncUtil.Group = function() { 173 this.addedTasks_ = {}; 174 this.pendingTasks_ = {}; 175 this.finishedTasks_ = {}; 176 this.completionCallbacks_ = []; 177}; 178 179/** 180 * Enqueues a closure to be executed after dependencies are completed. 181 * 182 * @param {function(function())} closure Closure with a completion callback to 183 * be executed. 184 * @param {Array.<string>=} opt_dependencies Array of dependencies. If no 185 * dependencies, then the the closure will be executed immediately. 186 * @param {string=} opt_name Task identifier. Specify to use in dependencies. 187 */ 188AsyncUtil.Group.prototype.add = function(closure, opt_dependencies, opt_name) { 189 var length = Object.keys(this.addedTasks_).length; 190 var name = opt_name || ('(unnamed#' + (length + 1) + ')'); 191 192 var task = { 193 closure: closure, 194 dependencies: opt_dependencies || [], 195 name: name 196 }; 197 198 this.addedTasks_[name] = task; 199 this.pendingTasks_[name] = task; 200}; 201 202/** 203 * Runs the enqueued closured in order of dependencies. 204 * 205 * @param {function()=} opt_onCompletion Completion callback. 206 */ 207AsyncUtil.Group.prototype.run = function(opt_onCompletion) { 208 if (opt_onCompletion) 209 this.completionCallbacks_.push(opt_onCompletion); 210 this.continue_(); 211}; 212 213/** 214 * Runs enqueued pending tasks whose dependencies are completed. 215 * @private 216 */ 217AsyncUtil.Group.prototype.continue_ = function() { 218 // If all of the added tasks have finished, then call completion callbacks. 219 if (Object.keys(this.addedTasks_).length == 220 Object.keys(this.finishedTasks_).length) { 221 for (var index = 0; index < this.completionCallbacks_.length; index++) { 222 var callback = this.completionCallbacks_[index]; 223 callback(); 224 } 225 this.completionCallbacks_ = []; 226 return; 227 } 228 229 for (var name in this.pendingTasks_) { 230 var task = this.pendingTasks_[name]; 231 var dependencyMissing = false; 232 for (var index = 0; index < task.dependencies.length; index++) { 233 var dependency = task.dependencies[index]; 234 // Check if the dependency has finished. 235 if (!this.finishedTasks_[dependency]) 236 dependencyMissing = true; 237 } 238 // All dependences finished, therefore start the task. 239 if (!dependencyMissing) { 240 delete this.pendingTasks_[task.name]; 241 task.closure(this.finish_.bind(this, task)); 242 } 243 } 244}; 245 246/** 247 * Finishes the passed task and continues executing enqueued closures. 248 * 249 * @param {Object} task Task object. 250 * @private 251 */ 252AsyncUtil.Group.prototype.finish_ = function(task) { 253 this.finishedTasks_[task.name] = task; 254 this.continue_(); 255}; 256 257/** 258 * Samples calls so that they are not called too frequently. 259 * The first call is always called immediately, and the following calls may 260 * be skipped or delayed to keep each interval no less than |minInterval_|. 261 * 262 * @param {function()} closure Closure to be called. 263 * @param {number=} opt_minInterval Minimum interval between each call in 264 * milliseconds. Default is 200 milliseconds. 265 * @constructor 266 */ 267AsyncUtil.RateLimiter = function(closure, opt_minInterval) { 268 /** 269 * @type {function()} 270 * @private 271 */ 272 this.closure_ = closure; 273 274 /** 275 * @type {number} 276 * @private 277 */ 278 this.minInterval_ = opt_minInterval || 200; 279 280 /** 281 * @type {number} 282 * @private 283 */ 284 this.scheduledRunsTimer_ = 0; 285 286 /** 287 * This variable remembers the last time the closure is called. 288 * @type {number} 289 * @private 290 */ 291 this.lastRunTime_ = 0; 292 293 Object.seal(this); 294}; 295 296/** 297 * Requests to run the closure. 298 * Skips or delays calls so that the intervals between calls are no less than 299 * |minInteval_| milliseconds. 300 */ 301AsyncUtil.RateLimiter.prototype.run = function() { 302 var now = Date.now(); 303 // If |minInterval| has not passed since the closure is run, skips or delays 304 // this run. 305 if (now - this.lastRunTime_ < this.minInterval_) { 306 // Delays this run only when there is no scheduled run. 307 // Otherwise, simply skip this run. 308 if (!this.scheduledRunsTimer_) { 309 this.scheduledRunsTimer_ = setTimeout( 310 this.runImmediately.bind(this), 311 this.lastRunTime_ + this.minInterval_ - now); 312 } 313 return; 314 } 315 316 // Otherwise, run immediately 317 this.runImmediately(); 318}; 319 320/** 321 * Calls the scheduled run immediately and cancels any scheduled calls. 322 */ 323AsyncUtil.RateLimiter.prototype.runImmediately = function() { 324 this.cancelScheduledRuns_(); 325 this.closure_(); 326 this.lastRunTime_ = Date.now(); 327}; 328 329/** 330 * Cancels all scheduled runs (if any). 331 * @private 332 */ 333AsyncUtil.RateLimiter.prototype.cancelScheduledRuns_ = function() { 334 if (this.scheduledRunsTimer_) { 335 clearTimeout(this.scheduledRunsTimer_); 336 this.scheduledRunsTimer_ = 0; 337 } 338}; 339