• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23require('../common');
24const assert = require('assert');
25const { Readable, Writable } = require('stream');
26
27const EE = require('events').EventEmitter;
28
29
30// A mock thing a bit like the net.Socket/tcp_wrap.handle interaction
31
32const stream = new Readable({
33  highWaterMark: 16,
34  encoding: 'utf8'
35});
36
37const source = new EE();
38
39stream._read = function() {
40  console.error('stream._read');
41  readStart();
42};
43
44let ended = false;
45stream.on('end', function() {
46  ended = true;
47});
48
49source.on('data', function(chunk) {
50  const ret = stream.push(chunk);
51  console.error('data', stream.readableLength);
52  if (!ret)
53    readStop();
54});
55
56source.on('end', function() {
57  stream.push(null);
58});
59
60let reading = false;
61
62function readStart() {
63  console.error('readStart');
64  reading = true;
65}
66
67function readStop() {
68  console.error('readStop');
69  reading = false;
70  process.nextTick(function() {
71    const r = stream.read();
72    if (r !== null)
73      writer.write(r);
74  });
75}
76
77const writer = new Writable({
78  decodeStrings: false
79});
80
81const written = [];
82
83const expectWritten =
84  [ 'asdfgasdfgasdfgasdfg',
85    'asdfgasdfgasdfgasdfg',
86    'asdfgasdfgasdfgasdfg',
87    'asdfgasdfgasdfgasdfg',
88    'asdfgasdfgasdfgasdfg',
89    'asdfgasdfgasdfgasdfg' ];
90
91writer._write = function(chunk, encoding, cb) {
92  console.error(`WRITE ${chunk}`);
93  written.push(chunk);
94  process.nextTick(cb);
95};
96
97writer.on('finish', finish);
98
99
100// Now emit some chunks.
101
102const chunk = 'asdfg';
103
104let set = 0;
105readStart();
106data();
107function data() {
108  assert(reading);
109  source.emit('data', chunk);
110  assert(reading);
111  source.emit('data', chunk);
112  assert(reading);
113  source.emit('data', chunk);
114  assert(reading);
115  source.emit('data', chunk);
116  assert(!reading);
117  if (set++ < 5)
118    setTimeout(data, 10);
119  else
120    end();
121}
122
123function finish() {
124  console.error('finish');
125  assert.deepStrictEqual(written, expectWritten);
126  console.log('ok');
127}
128
129function end() {
130  source.emit('end');
131  assert(!reading);
132  writer.end(stream.read());
133  setImmediate(function() {
134    assert(ended);
135  });
136}
137