• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1var iterate = require('stream-iterate')
2var from = require('from2')
3
4var defaultKey = function (val) {
5  return val.key || val
6}
7
8var union = function (streamA, streamB, toKey) {
9  var readA = iterate(streamA)
10  var readB = iterate(streamB)
11
12  if (!toKey) toKey = defaultKey
13
14  var stream = from.obj(function loop (size, cb) {
15    readA(function (err, dataA, nextA) {
16      if (err) return cb(err)
17      readB(function (err, dataB, nextB) {
18        if (err) return cb(err)
19
20        if (!dataA && !dataB) return cb(null, null)
21
22        if (!dataA) {
23          nextB()
24          return cb(null, dataB)
25        }
26
27        if (!dataB) {
28          nextA()
29          return cb(null, dataA)
30        }
31
32        var keyA = toKey(dataA)
33        var keyB = toKey(dataB)
34
35        if (keyA === keyB) {
36          nextB()
37          return loop(size, cb)
38        }
39
40        if (keyA < keyB) {
41          nextA()
42          return cb(null, dataA)
43        }
44
45        nextB()
46        cb(null, dataB)
47      })
48    })
49  })
50
51  stream.on('close', function () {
52    if (streamA.destroy) streamA.destroy()
53    if (streamB.destroy) streamB.destroy()
54  })
55
56  return stream
57}
58
59module.exports = union
60