1// Copyright (c) 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5'use strict'; 6 7/** 8 * Namespace for async utility functions. 9 */ 10var AsyncUtil = {}; 11 12/** 13 * Asynchronous version of Array.forEach. 14 * This executes a provided function callback once per array element, then 15 * run completionCallback to notify the completion. 16 * The callback can be an asynchronous function, but the execution is 17 * sequentially done. 18 * 19 * @param {Array.<T>} array The array to be iterated. 20 * @param {function(function(), T, number, Array.<T>} callback The iteration 21 * callback. The first argument is a callback to notify the completion of 22 * the iteration. 23 * @param {function()} completionCallback Called when all iterations are 24 * completed. 25 * @param {Object=} opt_thisObject Bound to callback if given. 26 * @template T 27 */ 28AsyncUtil.forEach = function( 29 array, callback, completionCallback, opt_thisObject) { 30 if (opt_thisObject) 31 callback = callback.bind(opt_thisObject); 32 33 var queue = new AsyncUtil.Queue(); 34 for (var i = 0; i < array.length; i++) { 35 queue.run(function(element, index, iterationCompletionCallback) { 36 callback(iterationCompletionCallback, element, index, array); 37 }.bind(null, array[i], i)); 38 } 39 queue.run(function(iterationCompletionCallback) { 40 completionCallback(); // Don't pass iteration completion callback. 41 }); 42}; 43 44/** 45 * Creates a class for executing several asynchronous closures in a fifo queue. 46 * Added tasks will be started in order they were added. Tasks are run 47 * concurrently. At most, |limit| jobs will be run at the same time. 48 * 49 * @param {number} limit The number of jobs to run at the same time. 50 * @constructor 51 */ 52AsyncUtil.ConcurrentQueue = function(limit) { 53 console.assert(limit > 0, '|limit| must be larger than 0'); 54 55 this.limit_ = limit; 56 this.addedTasks_ = []; 57 this.pendingTasks_ = []; 58 this.isCancelled_ = false; 59 60 Object.seal(this); 61}; 62 63/** 64 * @return {boolean} True when a task is running, otherwise false. 65 */ 66AsyncUtil.ConcurrentQueue.prototype.isRunning = function() { 67 return this.pendingTasks_.length !== 0; 68}; 69 70/** 71 * @return {number} Number of waiting tasks. 72 */ 73AsyncUtil.ConcurrentQueue.prototype.getWaitingTasksCount = function() { 74 return this.addedTasks_.length; 75}; 76 77/** 78 * @return {boolean} Number of running tasks. 79 */ 80AsyncUtil.ConcurrentQueue.prototype.getRunningTasksCount = function() { 81 return this.pendingTasks_.length; 82}; 83 84/** 85 * Enqueues a closure to be executed. 86 * @param {function(function())} closure Closure with a completion 87 * callback to be executed. 88 */ 89AsyncUtil.ConcurrentQueue.prototype.run = function(closure) { 90 if (this.isCancelled_) { 91 console.error('Queue is calcelled. Cannot add a new task.'); 92 return; 93 } 94 95 this.addedTasks_.push(closure); 96 this.continue_(); 97}; 98 99/** 100 * Cancels the queue. It removes all the not-run (yet) tasks. Note that this 101 * does NOT stop tasks currently running. 102 */ 103AsyncUtil.ConcurrentQueue.prototype.cancel = function() { 104 this.isCancelled_ = true; 105 this.addedTasks_ = []; 106}; 107 108/** 109 * @return {boolean} True when the queue have been requested to cancel or is 110 * already cancelled. Otherwise false. 111 */ 112AsyncUtil.ConcurrentQueue.prototype.isCancelled = function() { 113 return this.isCancelled_; 114}; 115 116/** 117 * Runs the next tasks if available. 118 * @private 119 */ 120AsyncUtil.ConcurrentQueue.prototype.continue_ = function() { 121 if (this.addedTasks_.length === 0) 122 return; 123 124 console.assert( 125 this.pendingTasks_.length <= this.limit_, 126 'Too many jobs are running (' + this.pendingTasks_.length + ')'); 127 128 if (this.pendingTasks_.length >= this.limit_) 129 return; 130 131 // Run the next closure. 132 var closure = this.addedTasks_.shift(); 133 this.pendingTasks_.push(closure); 134 closure(this.onTaskFinished_.bind(this, closure)); 135 136 this.continue_(); 137}; 138 139/** 140 * Called when a task is finished. Removes the tasks from pending task list. 141 * @param {function()} closure Finished task, which has been bound in 142 * |continue_|. 143 * @private 144 */ 145AsyncUtil.ConcurrentQueue.prototype.onTaskFinished_ = function(closure) { 146 var index = this.pendingTasks_.indexOf(closure); 147 console.assert(index >= 0, 'Invalid task is finished'); 148 this.pendingTasks_.splice(index, 1); 149 150 this.continue_(); 151}; 152 153/** 154 * Creates a class for executing several asynchronous closures in a fifo queue. 155 * Added tasks will be executed sequentially in order they were added. 156 * 157 * @constructor 158 * @extends {AsyncUtil.ConcurrentQueue} 159 */ 160AsyncUtil.Queue = function() { 161 AsyncUtil.ConcurrentQueue.call(this, 1); 162}; 163 164AsyncUtil.Queue.prototype = { 165 __proto__: AsyncUtil.ConcurrentQueue.prototype 166}; 167 168/** 169 * Creates a class for executing several asynchronous closures in a group in 170 * a dependency order. 171 * 172 * @constructor 173 */ 174AsyncUtil.Group = function() { 175 this.addedTasks_ = {}; 176 this.pendingTasks_ = {}; 177 this.finishedTasks_ = {}; 178 this.completionCallbacks_ = []; 179}; 180 181/** 182 * Enqueues a closure to be executed after dependencies are completed. 183 * 184 * @param {function(function())} closure Closure with a completion callback to 185 * be executed. 186 * @param {Array.<string>=} opt_dependencies Array of dependencies. If no 187 * dependencies, then the the closure will be executed immediately. 188 * @param {string=} opt_name Task identifier. Specify to use in dependencies. 189 */ 190AsyncUtil.Group.prototype.add = function(closure, opt_dependencies, opt_name) { 191 var length = Object.keys(this.addedTasks_).length; 192 var name = opt_name || ('(unnamed#' + (length + 1) + ')'); 193 194 var task = { 195 closure: closure, 196 dependencies: opt_dependencies || [], 197 name: name 198 }; 199 200 this.addedTasks_[name] = task; 201 this.pendingTasks_[name] = task; 202}; 203 204/** 205 * Runs the enqueued closured in order of dependencies. 206 * 207 * @param {function()=} opt_onCompletion Completion callback. 208 */ 209AsyncUtil.Group.prototype.run = function(opt_onCompletion) { 210 if (opt_onCompletion) 211 this.completionCallbacks_.push(opt_onCompletion); 212 this.continue_(); 213}; 214 215/** 216 * Runs enqueued pending tasks whose dependencies are completed. 217 * @private 218 */ 219AsyncUtil.Group.prototype.continue_ = function() { 220 // If all of the added tasks have finished, then call completion callbacks. 221 if (Object.keys(this.addedTasks_).length == 222 Object.keys(this.finishedTasks_).length) { 223 for (var index = 0; index < this.completionCallbacks_.length; index++) { 224 var callback = this.completionCallbacks_[index]; 225 callback(); 226 } 227 this.completionCallbacks_ = []; 228 return; 229 } 230 231 for (var name in this.pendingTasks_) { 232 var task = this.pendingTasks_[name]; 233 var dependencyMissing = false; 234 for (var index = 0; index < task.dependencies.length; index++) { 235 var dependency = task.dependencies[index]; 236 // Check if the dependency has finished. 237 if (!this.finishedTasks_[dependency]) 238 dependencyMissing = true; 239 } 240 // All dependences finished, therefore start the task. 241 if (!dependencyMissing) { 242 delete this.pendingTasks_[task.name]; 243 task.closure(this.finish_.bind(this, task)); 244 } 245 } 246}; 247 248/** 249 * Finishes the passed task and continues executing enqueued closures. 250 * 251 * @param {Object} task Task object. 252 * @private 253 */ 254AsyncUtil.Group.prototype.finish_ = function(task) { 255 this.finishedTasks_[task.name] = task; 256 this.continue_(); 257}; 258 259/** 260 * Aggregates consecutive calls and executes the closure only once instead of 261 * several times. The first call is always called immediately, and the next 262 * consecutive ones are aggregated and the closure is called only once once 263 * |delay| amount of time passes after the last call to run(). 264 * 265 * @param {function()} closure Closure to be aggregated. 266 * @param {number=} opt_delay Minimum aggregation time in milliseconds. Default 267 * is 50 milliseconds. 268 * @constructor 269 */ 270AsyncUtil.Aggregator = function(closure, opt_delay) { 271 /** 272 * @type {number} 273 * @private 274 */ 275 this.delay_ = opt_delay || 50; 276 277 /** 278 * @type {function()} 279 * @private 280 */ 281 this.closure_ = closure; 282 283 /** 284 * @type {number?} 285 * @private 286 */ 287 this.scheduledRunsTimer_ = null; 288 289 /** 290 * @type {number} 291 * @private 292 */ 293 this.lastRunTime_ = 0; 294}; 295 296/** 297 * Runs a closure. Skips consecutive calls. The first call is called 298 * immediately. 299 */ 300AsyncUtil.Aggregator.prototype.run = function() { 301 // If recently called, then schedule the consecutive call with a delay. 302 if (Date.now() - this.lastRunTime_ < this.delay_) { 303 this.cancelScheduledRuns_(); 304 this.scheduledRunsTimer_ = setTimeout(this.runImmediately_.bind(this), 305 this.delay_ + 1); 306 this.lastRunTime_ = Date.now(); 307 return; 308 } 309 310 // Otherwise, run immediately. 311 this.runImmediately_(); 312}; 313 314/** 315 * Calls the schedule immediately and cancels any scheduled calls. 316 * @private 317 */ 318AsyncUtil.Aggregator.prototype.runImmediately_ = function() { 319 this.cancelScheduledRuns_(); 320 this.closure_(); 321 this.lastRunTime_ = Date.now(); 322}; 323 324/** 325 * Cancels all scheduled runs (if any). 326 * @private 327 */ 328AsyncUtil.Aggregator.prototype.cancelScheduledRuns_ = function() { 329 if (this.scheduledRunsTimer_) { 330 clearTimeout(this.scheduledRunsTimer_); 331 this.scheduledRunsTimer_ = null; 332 } 333}; 334 335/** 336 * Samples calls so that they are not called too frequently. 337 * The first call is always called immediately, and the following calls may 338 * be skipped or delayed to keep each interval no less than |minInterval_|. 339 * 340 * @param {function()} closure Closure to be called. 341 * @param {number=} opt_minInterval Minimum interval between each call in 342 * milliseconds. Default is 200 milliseconds. 343 * @constructor 344 */ 345AsyncUtil.RateLimiter = function(closure, opt_minInterval) { 346 /** 347 * @type {function()} 348 * @private 349 */ 350 this.closure_ = closure; 351 352 /** 353 * @type {number} 354 * @private 355 */ 356 this.minInterval_ = opt_minInterval || 200; 357 358 /** 359 * @type {number} 360 * @private 361 */ 362 this.scheduledRunsTimer_ = 0; 363 364 /** 365 * This variable remembers the last time the closure is called. 366 * @type {number} 367 * @private 368 */ 369 this.lastRunTime_ = 0; 370 371 Object.seal(this); 372}; 373 374/** 375 * Requests to run the closure. 376 * Skips or delays calls so that the intervals between calls are no less than 377 * |minInteval_| milliseconds. 378 */ 379AsyncUtil.RateLimiter.prototype.run = function() { 380 var now = Date.now(); 381 // If |minInterval| has not passed since the closure is run, skips or delays 382 // this run. 383 if (now - this.lastRunTime_ < this.minInterval_) { 384 // Delays this run only when there is no scheduled run. 385 // Otherwise, simply skip this run. 386 if (!this.scheduledRunsTimer_) { 387 this.scheduledRunsTimer_ = setTimeout( 388 this.runImmediately.bind(this), 389 this.lastRunTime_ + this.minInterval_ - now); 390 } 391 return; 392 } 393 394 // Otherwise, run immediately 395 this.runImmediately(); 396}; 397 398/** 399 * Calls the scheduled run immediately and cancels any scheduled calls. 400 */ 401AsyncUtil.RateLimiter.prototype.runImmediately = function() { 402 this.cancelScheduledRuns_(); 403 this.closure_(); 404 this.lastRunTime_ = Date.now(); 405}; 406 407/** 408 * Cancels all scheduled runs (if any). 409 * @private 410 */ 411AsyncUtil.RateLimiter.prototype.cancelScheduledRuns_ = function() { 412 if (this.scheduledRunsTimer_) { 413 clearTimeout(this.scheduledRunsTimer_); 414 this.scheduledRunsTimer_ = 0; 415 } 416}; 417