• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1var async = require('./async.js');
2
3// API
4module.exports = {
5  iterator: wrapIterator,
6  callback: wrapCallback
7};
8
9/**
10 * Wraps iterators with long signature
11 *
12 * @this    ReadableAsyncKit#
13 * @param   {function} iterator - function to wrap
14 * @returns {function} - wrapped function
15 */
16function wrapIterator(iterator)
17{
18  var stream = this;
19
20  return function(item, key, cb)
21  {
22    var aborter
23      , wrappedCb = async(wrapIteratorCallback.call(stream, cb, key))
24      ;
25
26    stream.jobs[key] = wrappedCb;
27
28    // it's either shortcut (item, cb)
29    if (iterator.length == 2)
30    {
31      aborter = iterator(item, wrappedCb);
32    }
33    // or long format (item, key, cb)
34    else
35    {
36      aborter = iterator(item, key, wrappedCb);
37    }
38
39    return aborter;
40  };
41}
42
43/**
44 * Wraps provided callback function
45 * allowing to execute snitch function before
46 * real callback
47 *
48 * @this    ReadableAsyncKit#
49 * @param   {function} callback - function to wrap
50 * @returns {function} - wrapped function
51 */
52function wrapCallback(callback)
53{
54  var stream = this;
55
56  var wrapped = function(error, result)
57  {
58    return finisher.call(stream, error, result, callback);
59  };
60
61  return wrapped;
62}
63
64/**
65 * Wraps provided iterator callback function
66 * makes sure snitch only called once,
67 * but passes secondary calls to the original callback
68 *
69 * @this    ReadableAsyncKit#
70 * @param   {function} callback - callback to wrap
71 * @param   {number|string} key - iteration key
72 * @returns {function} wrapped callback
73 */
74function wrapIteratorCallback(callback, key)
75{
76  var stream = this;
77
78  return function(error, output)
79  {
80    // don't repeat yourself
81    if (!(key in stream.jobs))
82    {
83      callback(error, output);
84      return;
85    }
86
87    // clean up jobs
88    delete stream.jobs[key];
89
90    return streamer.call(stream, error, {key: key, value: output}, callback);
91  };
92}
93
94/**
95 * Stream wrapper for iterator callback
96 *
97 * @this  ReadableAsyncKit#
98 * @param {mixed} error - error response
99 * @param {mixed} output - iterator output
100 * @param {function} callback - callback that expects iterator results
101 */
102function streamer(error, output, callback)
103{
104  if (error && !this.error)
105  {
106    this.error = error;
107    this.pause();
108    this.emit('error', error);
109    // send back value only, as expected
110    callback(error, output && output.value);
111    return;
112  }
113
114  // stream stuff
115  this.push(output);
116
117  // back to original track
118  // send back value only, as expected
119  callback(error, output && output.value);
120}
121
122/**
123 * Stream wrapper for finishing callback
124 *
125 * @this  ReadableAsyncKit#
126 * @param {mixed} error - error response
127 * @param {mixed} output - iterator output
128 * @param {function} callback - callback that expects final results
129 */
130function finisher(error, output, callback)
131{
132  // signal end of the stream
133  // only for successfully finished streams
134  if (!error)
135  {
136    this.push(null);
137  }
138
139  // back to original track
140  callback(error, output);
141}
142