1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5define('data_receiver', [ 6 'async_waiter', 7 'device/serial/data_stream.mojom', 8 'device/serial/data_stream_serialization.mojom', 9 'mojo/public/js/bindings/core', 10 'mojo/public/js/bindings/router', 11], function(asyncWaiter, dataStream, serialization, core, router) { 12 /** 13 * @module data_receiver 14 */ 15 16 /** 17 * A pending receive operation. 18 * @constructor 19 * @alias module:data_receiver~PendingReceive 20 * @private 21 */ 22 function PendingReceive() { 23 /** 24 * The promise that will be resolved or rejected when this receive completes 25 * or fails, respectively. 26 * @type {!Promise.<ArrayBuffer>} 27 * @private 28 */ 29 this.promise_ = new Promise(function(resolve, reject) { 30 /** 31 * The callback to call with the data received on success. 32 * @type {Function} 33 * @private 34 */ 35 this.dataCallback_ = resolve; 36 /** 37 * The callback to call with the error on failure. 38 * @type {Function} 39 * @private 40 */ 41 this.errorCallback_ = reject; 42 }.bind(this)); 43 } 44 45 /** 46 * Returns the promise that will be resolved when this operation completes or 47 * rejected if an error occurs. 48 * @return {Promise.<ArrayBuffer>} A promise to the data received. 49 */ 50 PendingReceive.prototype.getPromise = function() { 51 return this.promise_; 52 }; 53 54 /** 55 * Dispatches received data to the promise returned by 56 * [getPromise]{@link module:data_receiver.PendingReceive#getPromise}. 57 * @param {!ArrayBuffer} data The data to dispatch. 58 */ 59 PendingReceive.prototype.dispatchData = function(data) { 60 this.dataCallback_(data); 61 }; 62 63 /** 64 * Dispatches an error if the offset of the error has been reached. 65 * @param {!PendingReceiveError} error The error to dispatch. 66 * @param {number} bytesReceived The number of bytes that have been received. 67 */ 68 PendingReceive.prototype.dispatchError = function(error, bytesReceived) { 69 if (bytesReceived != error.offset) 70 return false; 71 72 var e = new Error(); 73 e.error = error.error; 74 this.errorCallback_(e); 75 return true; 76 }; 77 78 /** 79 * Unconditionally dispatches an error. 80 * @param {number} error The error to dispatch. 81 */ 82 PendingReceive.prototype.dispatchFatalError = function(error) { 83 var e = new Error(); 84 e.error = error; 85 this.errorCallback_(e); 86 }; 87 88 /** 89 * A DataReceiver that receives data from a DataSource. 90 * @param {!MojoHandle} handle The handle to the DataSource. 91 * @param {number} bufferSize How large a buffer the data pipe should use. 92 * @param {number} fatalErrorValue The receive error value to report in the 93 * event of a fatal error. 94 * @constructor 95 * @alias module:data_receiver.DataReceiver 96 */ 97 function DataReceiver(handle, bufferSize, fatalErrorValue) { 98 var dataPipeOptions = { 99 flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 100 elementNumBytes: 1, 101 capacityNumBytes: bufferSize, 102 }; 103 var receivePipe = core.createDataPipe(dataPipeOptions); 104 this.init_( 105 handle, receivePipe.consumerHandle, fatalErrorValue, 0, null, false); 106 this.source_.init(receivePipe.producerHandle); 107 } 108 109 DataReceiver.prototype = 110 $Object.create(dataStream.DataSourceClientStub.prototype); 111 112 /** 113 * Closes this DataReceiver. 114 */ 115 DataReceiver.prototype.close = function() { 116 if (this.shutDown_) 117 return; 118 this.shutDown_ = true; 119 this.router_.close(); 120 this.waiter_.stop(); 121 core.close(this.receivePipe_); 122 if (this.receive_) { 123 this.receive_.dispatchFatalError(this.fatalErrorValue_); 124 this.receive_ = null; 125 } 126 }; 127 128 /** 129 * Initialize this DataReceiver. 130 * @param {!MojoHandle} source A handle to the DataSource 131 * @param {!MojoHandle} dataPipe A handle to use for receiving data from the 132 * DataSource. 133 * @param {number} fatalErrorValue The error to dispatch in the event of a 134 * fatal error. 135 * @param {number} bytesReceived The number of bytes already received. 136 * @param {PendingReceiveError} pendingError The pending error if there is 137 * one. 138 * @param {boolean} paused Whether the DataSource is paused. 139 * @private 140 */ 141 DataReceiver.prototype.init_ = function(source, 142 dataPipe, 143 fatalErrorValue, 144 bytesReceived, 145 pendingError, 146 paused) { 147 /** 148 * The [Router]{@link module:mojo/public/js/bindings/router.Router} for the 149 * connection to the DataSource. 150 * @private 151 */ 152 this.router_ = new router.Router(source); 153 /** 154 * The connection to the DataSource. 155 * @private 156 */ 157 this.source_ = new dataStream.DataSourceProxy(this.router_); 158 this.router_.setIncomingReceiver(this); 159 /** 160 * The handle to the data pipe to use for receiving data. 161 * @private 162 */ 163 this.receivePipe_ = dataPipe; 164 /** 165 * The current receive operation. 166 * @type {module:data_receiver~PendingReceive} 167 * @private 168 */ 169 this.receive_ = null; 170 /** 171 * The error to be dispatched in the event of a fatal error. 172 * @const {number} 173 * @private 174 */ 175 this.fatalErrorValue_ = fatalErrorValue; 176 /** 177 * The async waiter used to wait for 178 * |[receivePipe_]{@link module:data_receiver.DataReceiver#receivePipe_}| to 179 * be readable. 180 * @type {!module:async_waiter.AsyncWaiter} 181 * @private 182 */ 183 this.waiter_ = new asyncWaiter.AsyncWaiter(this.receivePipe_, 184 core.HANDLE_SIGNAL_READABLE, 185 this.onHandleReady_.bind(this)); 186 /** 187 * The number of bytes received from the DataSource. 188 * @type {number} 189 * @private 190 */ 191 this.bytesReceived_ = bytesReceived; 192 /** 193 * The pending error if there is one. 194 * @type {PendingReceiveError} 195 * @private 196 */ 197 this.pendingError_ = pendingError; 198 /** 199 * Whether the DataSource is paused. 200 * @type {boolean} 201 * @private 202 */ 203 this.paused_ = paused; 204 /** 205 * Whether this DataReceiver has shut down. 206 * @type {boolean} 207 * @private 208 */ 209 this.shutDown_ = false; 210 }; 211 212 /** 213 * Serializes this DataReceiver. 214 * This will cancel a receive if one is in progress. 215 * @return {!Promise.<SerializedDataReceiver>} A promise that will resolve to 216 * the serialization of this DataReceiver. If this DataReceiver has shut 217 * down, the promise will resolve to null. 218 */ 219 DataReceiver.prototype.serialize = function() { 220 if (this.shutDown_) 221 return Promise.resolve(null); 222 223 this.waiter_.stop(); 224 if (this.receive_) { 225 this.receive_.dispatchFatalError(this.fatalErrorValue_); 226 this.receive_ = null; 227 } 228 var serialized = new serialization.SerializedDataReceiver(); 229 serialized.source = this.router_.connector_.handle_; 230 serialized.data_pipe = this.receivePipe_; 231 serialized.fatal_error_value = this.fatalErrorValue_; 232 serialized.bytes_received = this.bytesReceived_; 233 serialized.paused = this.paused_; 234 serialized.pending_error = this.pendingError_; 235 this.router_.connector_.handle_ = null; 236 this.router_.close(); 237 this.shutDown_ = true; 238 return Promise.resolve(serialized); 239 }; 240 241 /** 242 * Deserializes a SerializedDataReceiver. 243 * @param {SerializedDataReceiver} serialized The serialized DataReceiver. 244 * @return {!DataReceiver} The deserialized DataReceiver. 245 */ 246 DataReceiver.deserialize = function(serialized) { 247 var receiver = $Object.create(DataReceiver.prototype); 248 receiver.deserialize_(serialized); 249 return receiver; 250 }; 251 252 /** 253 * Deserializes a SerializedDataReceiver into this DataReceiver. 254 * @param {SerializedDataReceiver} serialized The serialized DataReceiver. 255 * @private 256 */ 257 DataReceiver.prototype.deserialize_ = function(serialized) { 258 if (!serialized) { 259 this.shutDown_ = true; 260 return; 261 } 262 this.init_(serialized.source, 263 serialized.data_pipe, 264 serialized.fatal_error_value, 265 serialized.bytes_received, 266 serialized.pending_error, 267 serialized.paused); 268 }; 269 270 /** 271 * Receive data from the DataSource. 272 * @return {Promise.<ArrayBuffer>} A promise to the received data. If an error 273 * occurs, the promise will reject with an Error object with a property 274 * error containing the error code. 275 * @throws Will throw if this has encountered a fatal error or another receive 276 * is in progress. 277 */ 278 DataReceiver.prototype.receive = function() { 279 if (this.shutDown_) 280 throw new Error('DataReceiver has been closed'); 281 if (this.receive_) 282 throw new Error('Receive already in progress.'); 283 var receive = new PendingReceive(); 284 var promise = receive.getPromise(); 285 if (this.pendingError_ && 286 receive.dispatchError(this.pendingError_, this.bytesReceived_)) { 287 this.pendingError_ = null; 288 this.paused_ = true; 289 return promise; 290 } 291 if (this.paused_) { 292 this.source_.resume(); 293 this.paused_ = false; 294 } 295 this.receive_ = receive; 296 this.waiter_.start(); 297 return promise; 298 }; 299 300 /** 301 * Invoked when 302 * |[receivePipe_]{@link module:data_receiver.DataReceiver#receivePipe_}| is 303 * ready to read. Reads from the data pipe if the wait is successful. 304 * @param {number} waitResult The result of the asynchronous wait. 305 * @private 306 */ 307 DataReceiver.prototype.onHandleReady_ = function(waitResult) { 308 if (waitResult != core.RESULT_OK || !this.receive_) { 309 this.close(); 310 return; 311 } 312 var result = core.readData(this.receivePipe_, core.READ_DATA_FLAG_NONE); 313 if (result.result == core.RESULT_OK) { 314 // TODO(sammc): Handle overflow in the same fashion as the C++ receiver. 315 this.bytesReceived_ += result.buffer.byteLength; 316 this.receive_.dispatchData(result.buffer); 317 this.receive_ = null; 318 } else if (result.result == core.RESULT_SHOULD_WAIT) { 319 this.waiter_.start(); 320 } else { 321 this.close(); 322 } 323 }; 324 325 /** 326 * Invoked by the DataSource when an error is encountered. 327 * @param {number} offset The location at which the error occurred. 328 * @param {number} error The error that occurred. 329 * @private 330 */ 331 DataReceiver.prototype.onError = function(offset, error) { 332 if (this.shutDown_) 333 return; 334 335 var pendingError = new serialization.PendingReceiveError(); 336 pendingError.error = error; 337 pendingError.offset = offset; 338 if (this.receive_ && 339 this.receive_.dispatchError(pendingError, this.bytesReceived_)) { 340 this.receive_ = null; 341 this.waiter_.stop(); 342 this.paused_ = true; 343 return; 344 } 345 this.pendingError_ = pendingError; 346 }; 347 348 return {DataReceiver: DataReceiver}; 349}); 350