• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5define('data_receiver', [
6    'async_waiter',
7    'device/serial/data_stream.mojom',
8    'device/serial/data_stream_serialization.mojom',
9    'mojo/public/js/bindings/core',
10    'mojo/public/js/bindings/router',
11], function(asyncWaiter, dataStream, serialization, core, router) {
12  /**
13   * @module data_receiver
14   */
15
16  /**
17   * A pending receive operation.
18   * @constructor
19   * @alias module:data_receiver~PendingReceive
20   * @private
21   */
22  function PendingReceive() {
23    /**
24     * The promise that will be resolved or rejected when this receive completes
25     * or fails, respectively.
26     * @type {!Promise.<ArrayBuffer>}
27     * @private
28     */
29    this.promise_ = new Promise(function(resolve, reject) {
30      /**
31       * The callback to call with the data received on success.
32       * @type {Function}
33       * @private
34       */
35      this.dataCallback_ = resolve;
36      /**
37       * The callback to call with the error on failure.
38       * @type {Function}
39       * @private
40       */
41      this.errorCallback_ = reject;
42    }.bind(this));
43  }
44
45  /**
46   * Returns the promise that will be resolved when this operation completes or
47   * rejected if an error occurs.
48   * @return {Promise.<ArrayBuffer>} A promise to the data received.
49   */
50  PendingReceive.prototype.getPromise = function() {
51    return this.promise_;
52  };
53
54  /**
55   * Dispatches received data to the promise returned by
56   * [getPromise]{@link module:data_receiver.PendingReceive#getPromise}.
57   * @param {!ArrayBuffer} data The data to dispatch.
58   */
59  PendingReceive.prototype.dispatchData = function(data) {
60    this.dataCallback_(data);
61  };
62
63  /**
64   * Dispatches an error if the offset of the error has been reached.
65   * @param {!PendingReceiveError} error The error to dispatch.
66   * @param {number} bytesReceived The number of bytes that have been received.
67   */
68  PendingReceive.prototype.dispatchError = function(error, bytesReceived) {
69    if (bytesReceived != error.offset)
70      return false;
71
72    var e = new Error();
73    e.error = error.error;
74    this.errorCallback_(e);
75    return true;
76  };
77
78  /**
79   * Unconditionally dispatches an error.
80   * @param {number} error The error to dispatch.
81   */
82  PendingReceive.prototype.dispatchFatalError = function(error) {
83    var e = new Error();
84    e.error = error;
85    this.errorCallback_(e);
86  };
87
88  /**
89   * A DataReceiver that receives data from a DataSource.
90   * @param {!MojoHandle} handle The handle to the DataSource.
91   * @param {number} bufferSize How large a buffer the data pipe should use.
92   * @param {number} fatalErrorValue The receive error value to report in the
93   *     event of a fatal error.
94   * @constructor
95   * @alias module:data_receiver.DataReceiver
96   */
97  function DataReceiver(handle, bufferSize, fatalErrorValue) {
98    var dataPipeOptions = {
99      flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
100      elementNumBytes: 1,
101      capacityNumBytes: bufferSize,
102    };
103    var receivePipe = core.createDataPipe(dataPipeOptions);
104    this.init_(
105        handle, receivePipe.consumerHandle, fatalErrorValue, 0, null, false);
106    this.source_.init(receivePipe.producerHandle);
107  }
108
109  DataReceiver.prototype =
110      $Object.create(dataStream.DataSourceClientStub.prototype);
111
112  /**
113   * Closes this DataReceiver.
114   */
115  DataReceiver.prototype.close = function() {
116    if (this.shutDown_)
117      return;
118    this.shutDown_ = true;
119    this.router_.close();
120    this.waiter_.stop();
121    core.close(this.receivePipe_);
122    if (this.receive_) {
123      this.receive_.dispatchFatalError(this.fatalErrorValue_);
124      this.receive_ = null;
125    }
126  };
127
128  /**
129   * Initialize this DataReceiver.
130   * @param {!MojoHandle} source A handle to the DataSource
131   * @param {!MojoHandle} dataPipe A handle to use for receiving data from the
132   *     DataSource.
133   * @param {number} fatalErrorValue The error to dispatch in the event of a
134   *     fatal error.
135   * @param {number} bytesReceived The number of bytes already received.
136   * @param {PendingReceiveError} pendingError The pending error if there is
137   * one.
138   * @param {boolean} paused Whether the DataSource is paused.
139   * @private
140   */
141  DataReceiver.prototype.init_ = function(source,
142                                          dataPipe,
143                                          fatalErrorValue,
144                                          bytesReceived,
145                                          pendingError,
146                                          paused) {
147    /**
148     * The [Router]{@link module:mojo/public/js/bindings/router.Router} for the
149     * connection to the DataSource.
150     * @private
151     */
152    this.router_ = new router.Router(source);
153    /**
154     * The connection to the DataSource.
155     * @private
156     */
157    this.source_ = new dataStream.DataSourceProxy(this.router_);
158    this.router_.setIncomingReceiver(this);
159    /**
160     * The handle to the data pipe to use for receiving data.
161     * @private
162     */
163    this.receivePipe_ = dataPipe;
164    /**
165     * The current receive operation.
166     * @type {module:data_receiver~PendingReceive}
167     * @private
168     */
169    this.receive_ = null;
170    /**
171     * The error to be dispatched in the event of a fatal error.
172     * @const {number}
173     * @private
174     */
175    this.fatalErrorValue_ = fatalErrorValue;
176    /**
177     * The async waiter used to wait for
178     * |[receivePipe_]{@link module:data_receiver.DataReceiver#receivePipe_}| to
179     *     be readable.
180     * @type {!module:async_waiter.AsyncWaiter}
181     * @private
182     */
183    this.waiter_ = new asyncWaiter.AsyncWaiter(this.receivePipe_,
184                                               core.HANDLE_SIGNAL_READABLE,
185                                               this.onHandleReady_.bind(this));
186    /**
187     * The number of bytes received from the DataSource.
188     * @type {number}
189     * @private
190     */
191    this.bytesReceived_ = bytesReceived;
192    /**
193     * The pending error if there is one.
194     * @type {PendingReceiveError}
195     * @private
196     */
197    this.pendingError_ = pendingError;
198    /**
199     * Whether the DataSource is paused.
200     * @type {boolean}
201     * @private
202     */
203    this.paused_ = paused;
204    /**
205     * Whether this DataReceiver has shut down.
206     * @type {boolean}
207     * @private
208     */
209    this.shutDown_ = false;
210  };
211
212  /**
213   * Serializes this DataReceiver.
214   * This will cancel a receive if one is in progress.
215   * @return {!Promise.<SerializedDataReceiver>} A promise that will resolve to
216   *     the serialization of this DataReceiver. If this DataReceiver has shut
217   *     down, the promise will resolve to null.
218   */
219  DataReceiver.prototype.serialize = function() {
220    if (this.shutDown_)
221      return Promise.resolve(null);
222
223    this.waiter_.stop();
224    if (this.receive_) {
225      this.receive_.dispatchFatalError(this.fatalErrorValue_);
226      this.receive_ = null;
227    }
228    var serialized = new serialization.SerializedDataReceiver();
229    serialized.source = this.router_.connector_.handle_;
230    serialized.data_pipe = this.receivePipe_;
231    serialized.fatal_error_value = this.fatalErrorValue_;
232    serialized.bytes_received = this.bytesReceived_;
233    serialized.paused = this.paused_;
234    serialized.pending_error = this.pendingError_;
235    this.router_.connector_.handle_ = null;
236    this.router_.close();
237    this.shutDown_ = true;
238    return Promise.resolve(serialized);
239  };
240
241  /**
242   * Deserializes a SerializedDataReceiver.
243   * @param {SerializedDataReceiver} serialized The serialized DataReceiver.
244   * @return {!DataReceiver} The deserialized DataReceiver.
245   */
246  DataReceiver.deserialize = function(serialized) {
247    var receiver = $Object.create(DataReceiver.prototype);
248    receiver.deserialize_(serialized);
249    return receiver;
250  };
251
252  /**
253   * Deserializes a SerializedDataReceiver into this DataReceiver.
254   * @param {SerializedDataReceiver} serialized The serialized DataReceiver.
255   * @private
256   */
257  DataReceiver.prototype.deserialize_ = function(serialized) {
258    if (!serialized) {
259      this.shutDown_ = true;
260      return;
261    }
262    this.init_(serialized.source,
263               serialized.data_pipe,
264               serialized.fatal_error_value,
265               serialized.bytes_received,
266               serialized.pending_error,
267               serialized.paused);
268  };
269
270  /**
271   * Receive data from the DataSource.
272   * @return {Promise.<ArrayBuffer>} A promise to the received data. If an error
273   *     occurs, the promise will reject with an Error object with a property
274   *     error containing the error code.
275   * @throws Will throw if this has encountered a fatal error or another receive
276   *     is in progress.
277   */
278  DataReceiver.prototype.receive = function() {
279    if (this.shutDown_)
280      throw new Error('DataReceiver has been closed');
281    if (this.receive_)
282      throw new Error('Receive already in progress.');
283    var receive = new PendingReceive();
284    var promise = receive.getPromise();
285    if (this.pendingError_ &&
286        receive.dispatchError(this.pendingError_, this.bytesReceived_)) {
287      this.pendingError_ = null;
288      this.paused_ = true;
289      return promise;
290    }
291    if (this.paused_) {
292      this.source_.resume();
293      this.paused_ = false;
294    }
295    this.receive_ = receive;
296    this.waiter_.start();
297    return promise;
298  };
299
300  /**
301   * Invoked when
302   * |[receivePipe_]{@link module:data_receiver.DataReceiver#receivePipe_}| is
303   * ready to read. Reads from the data pipe if the wait is successful.
304   * @param {number} waitResult The result of the asynchronous wait.
305   * @private
306   */
307  DataReceiver.prototype.onHandleReady_ = function(waitResult) {
308    if (waitResult != core.RESULT_OK || !this.receive_) {
309      this.close();
310      return;
311    }
312    var result = core.readData(this.receivePipe_, core.READ_DATA_FLAG_NONE);
313    if (result.result == core.RESULT_OK) {
314      // TODO(sammc): Handle overflow in the same fashion as the C++ receiver.
315      this.bytesReceived_ += result.buffer.byteLength;
316      this.receive_.dispatchData(result.buffer);
317      this.receive_ = null;
318    } else if (result.result == core.RESULT_SHOULD_WAIT) {
319      this.waiter_.start();
320    } else {
321      this.close();
322    }
323  };
324
325  /**
326   * Invoked by the DataSource when an error is encountered.
327   * @param {number} offset The location at which the error occurred.
328   * @param {number} error The error that occurred.
329   * @private
330   */
331  DataReceiver.prototype.onError = function(offset, error) {
332    if (this.shutDown_)
333      return;
334
335    var pendingError = new serialization.PendingReceiveError();
336    pendingError.error = error;
337    pendingError.offset = offset;
338    if (this.receive_ &&
339        this.receive_.dispatchError(pendingError, this.bytesReceived_)) {
340      this.receive_ = null;
341      this.waiter_.stop();
342      this.paused_ = true;
343      return;
344    }
345    this.pendingError_ = pendingError;
346  };
347
348  return {DataReceiver: DataReceiver};
349});
350