1'use strict'; 2 3const { 4 Symbol, 5} = primordials; 6 7const { setImmediate } = require('timers'); 8const assert = require('internal/assert'); 9const { Socket } = require('net'); 10const { JSStream } = internalBinding('js_stream'); 11const uv = internalBinding('uv'); 12let debug = require('internal/util/debuglog').debuglog( 13 'stream_socket', 14 (fn) => { 15 debug = fn; 16 }, 17); 18const { owner_symbol } = require('internal/async_hooks').symbols; 19const { ERR_STREAM_WRAP } = require('internal/errors').codes; 20 21const kCurrentWriteRequest = Symbol('kCurrentWriteRequest'); 22const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest'); 23const kPendingShutdownRequest = Symbol('kPendingShutdownRequest'); 24 25function isClosing() { return this[owner_symbol].isClosing(); } 26 27function onreadstart() { return this[owner_symbol].readStart(); } 28 29function onreadstop() { return this[owner_symbol].readStop(); } 30 31function onshutdown(req) { return this[owner_symbol].doShutdown(req); } 32 33function onwrite(req, bufs) { return this[owner_symbol].doWrite(req, bufs); } 34 35/* This class serves as a wrapper for when the C++ side of Node wants access 36 * to a standard JS stream. For example, TLS or HTTP do not operate on network 37 * resources conceptually, although that is the common case and what we are 38 * optimizing for; in theory, they are completely composable and can work with 39 * any stream resource they see. 40 * 41 * For the common case, i.e. a TLS socket wrapping around a net.Socket, we 42 * can skip going through the JS layer and let TLS access the raw C++ handle 43 * of a net.Socket. The flipside of this is that, to maintain composability, 44 * we need a way to create "fake" net.Socket instances that call back into a 45 * "real" JavaScript stream. JSStreamSocket is exactly this. 46 */ 47class JSStreamSocket extends Socket { 48 constructor(stream) { 49 const handle = new JSStream(); 50 handle.close = (cb) => { 51 debug('close'); 52 this.doClose(cb); 53 }; 54 // Inside of the following functions, `this` refers to the handle 55 // and `this[owner_symbol]` refers to this JSStreamSocket instance. 56 handle.isClosing = isClosing; 57 handle.onreadstart = onreadstart; 58 handle.onreadstop = onreadstop; 59 handle.onshutdown = onshutdown; 60 handle.onwrite = onwrite; 61 62 stream.pause(); 63 stream.on('error', (err) => this.emit('error', err)); 64 const ondata = (chunk) => { 65 if (typeof chunk === 'string' || 66 stream.readableObjectMode === true) { 67 // Make sure that no further `data` events will happen. 68 stream.pause(); 69 stream.removeListener('data', ondata); 70 71 this.emit('error', new ERR_STREAM_WRAP()); 72 return; 73 } 74 75 debug('data', chunk.length); 76 if (this._handle) 77 this._handle.readBuffer(chunk); 78 }; 79 stream.on('data', ondata); 80 stream.once('end', () => { 81 debug('end'); 82 if (this._handle) 83 this._handle.emitEOF(); 84 }); 85 // Some `Stream` don't pass `hasError` parameters when closed. 86 stream.once('close', () => { 87 // Errors emitted from `stream` have also been emitted to this instance 88 // so that we don't pass errors to `destroy()` again. 89 this.destroy(); 90 }); 91 92 super({ handle, manualStart: true }); 93 this.stream = stream; 94 this[kCurrentWriteRequest] = null; 95 this[kCurrentShutdownRequest] = null; 96 this[kPendingShutdownRequest] = null; 97 this.readable = stream.readable; 98 this.writable = stream.writable; 99 100 // Start reading. 101 this.read(0); 102 } 103 104 // Allow legacy requires in the test suite to keep working: 105 // const { StreamWrap } = require('internal/js_stream_socket') 106 static get StreamWrap() { 107 return JSStreamSocket; 108 } 109 110 isClosing() { 111 return !this.readable || !this.writable; 112 } 113 114 readStart() { 115 this.stream.resume(); 116 return 0; 117 } 118 119 readStop() { 120 this.stream.pause(); 121 return 0; 122 } 123 124 doShutdown(req) { 125 // TODO(addaleax): It might be nice if we could get into a state where 126 // DoShutdown() is not called on streams while a write is still pending. 127 // 128 // Currently, the only part of the code base where that happens is the 129 // TLS implementation, which calls both DoWrite() and DoShutdown() on the 130 // underlying network stream inside of its own DoShutdown() method. 131 // Working around that on the native side is not quite trivial (yet?), 132 // so for now that is supported here. 133 134 if (this[kCurrentWriteRequest] !== null) { 135 this[kPendingShutdownRequest] = req; 136 return 0; 137 } 138 assert(this[kCurrentWriteRequest] === null); 139 assert(this[kCurrentShutdownRequest] === null); 140 this[kCurrentShutdownRequest] = req; 141 142 const handle = this._handle; 143 144 process.nextTick(() => { 145 // Ensure that write is dispatched asynchronously. 146 this.stream.end(() => { 147 this.finishShutdown(handle, 0); 148 }); 149 }); 150 return 0; 151 } 152 153 // handle === this._handle except when called from doClose(). 154 finishShutdown(handle, errCode) { 155 // The shutdown request might already have been cancelled. 156 if (this[kCurrentShutdownRequest] === null) 157 return; 158 const req = this[kCurrentShutdownRequest]; 159 this[kCurrentShutdownRequest] = null; 160 handle.finishShutdown(req, errCode); 161 } 162 163 doWrite(req, bufs) { 164 assert(this[kCurrentWriteRequest] === null); 165 assert(this[kCurrentShutdownRequest] === null); 166 167 const handle = this._handle; 168 const self = this; 169 170 let pending = bufs.length; 171 172 this.stream.cork(); 173 // Use `var` over `let` for performance optimization. 174 // eslint-disable-next-line no-var 175 for (var i = 0; i < bufs.length; ++i) 176 this.stream.write(bufs[i], done); 177 this.stream.uncork(); 178 179 // Only set the request here, because the `write()` calls could throw. 180 this[kCurrentWriteRequest] = req; 181 182 function done(err) { 183 if (!err && --pending !== 0) 184 return; 185 186 // Ensure that this is called once in case of error 187 pending = 0; 188 189 let errCode = 0; 190 if (err) { 191 errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE; 192 } 193 194 // Ensure that write was dispatched 195 setImmediate(() => { 196 self.finishWrite(handle, errCode); 197 }); 198 } 199 200 return 0; 201 } 202 203 // handle === this._handle except when called from doClose(). 204 finishWrite(handle, errCode) { 205 // The write request might already have been cancelled. 206 if (this[kCurrentWriteRequest] === null) 207 return; 208 const req = this[kCurrentWriteRequest]; 209 this[kCurrentWriteRequest] = null; 210 211 handle.finishWrite(req, errCode); 212 if (this[kPendingShutdownRequest]) { 213 const req = this[kPendingShutdownRequest]; 214 this[kPendingShutdownRequest] = null; 215 this.doShutdown(req); 216 } 217 } 218 219 doClose(cb) { 220 const handle = this._handle; 221 222 // When sockets of the "net" module destroyed, they will call 223 // `this._handle.close()` which will also emit EOF if not emitted before. 224 // This feature makes sockets on the other side emit "end" and "close" 225 // even though we haven't called `end()`. As `stream` are likely to be 226 // instances of `net.Socket`, calling `stream.destroy()` manually will 227 // avoid issues that don't properly close wrapped connections. 228 this.stream.destroy(); 229 230 setImmediate(() => { 231 // Should be already set by net.js 232 assert(this._handle === null); 233 234 this.finishWrite(handle, uv.UV_ECANCELED); 235 this.finishShutdown(handle, uv.UV_ECANCELED); 236 237 cb(); 238 }); 239 } 240} 241 242module.exports = JSStreamSocket; 243