• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1var streamify = require('./streamify.js')
2  , defer     = require('./defer.js')
3  ;
4
5// API
6module.exports = ReadableAsyncKit;
7
8/**
9 * Base constructor for all streams
10 * used to hold properties/methods
11 */
12function ReadableAsyncKit()
13{
14  ReadableAsyncKit.super_.apply(this, arguments);
15
16  // list of active jobs
17  this.jobs = {};
18
19  // add stream methods
20  this.destroy = destroy;
21  this._start  = _start;
22  this._read   = _read;
23}
24
25/**
26 * Destroys readable stream,
27 * by aborting outstanding jobs
28 *
29 * @returns {void}
30 */
31function destroy()
32{
33  if (this.destroyed)
34  {
35    return;
36  }
37
38  this.destroyed = true;
39
40  if (typeof this.terminator == 'function')
41  {
42    this.terminator();
43  }
44}
45
46/**
47 * Starts provided jobs in async manner
48 *
49 * @private
50 */
51function _start()
52{
53  // first argument – runner function
54  var runner = arguments[0]
55    // take away first argument
56    , args   = Array.prototype.slice.call(arguments, 1)
57      // second argument - input data
58    , input  = args[0]
59      // last argument - result callback
60    , endCb  = streamify.callback.call(this, args[args.length - 1])
61    ;
62
63  args[args.length - 1] = endCb;
64  // third argument - iterator
65  args[1] = streamify.iterator.call(this, args[1]);
66
67  // allow time for proper setup
68  defer(function()
69  {
70    if (!this.destroyed)
71    {
72      this.terminator = runner.apply(null, args);
73    }
74    else
75    {
76      endCb(null, Array.isArray(input) ? [] : {});
77    }
78  }.bind(this));
79}
80
81
82/**
83 * Implement _read to comply with Readable streams
84 * Doesn't really make sense for flowing object mode
85 *
86 * @private
87 */
88function _read()
89{
90
91}
92