1// Copyright (C) 2011-2015 John Hewson 2// 3// Permission is hereby granted, free of charge, to any person obtaining a copy 4// of this software and associated documentation files (the "Software"), to 5// deal in the Software without restriction, including without limitation the 6// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7// sell copies of the Software, and to permit persons to whom the Software is 8// furnished to do so, subject to the following conditions: 9// 10// The above copyright notice and this permission notice shall be included in 11// all copies or substantial portions of the Software. 12// 13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19// IN THE SOFTWARE. 20 21var stream = require('stream'), 22 util = require('util'), 23 timers = require('timers'); 24 25// convinience API 26module.exports = function(readStream, options) { 27 return module.exports.createStream(readStream, options); 28}; 29 30// basic API 31module.exports.createStream = function(readStream, options) { 32 if (readStream) { 33 return createLineStream(readStream, options); 34 } else { 35 return new LineStream(options); 36 } 37}; 38 39// deprecated API 40module.exports.createLineStream = function(readStream) { 41 console.log('WARNING: byline#createLineStream is deprecated and will be removed soon'); 42 return createLineStream(readStream); 43}; 44 45function createLineStream(readStream, options) { 46 if (!readStream) { 47 throw new Error('expected readStream'); 48 } 49 if (!readStream.readable) { 50 throw new Error('readStream must be readable'); 51 } 52 var ls = new LineStream(options); 53 readStream.pipe(ls); 54 return ls; 55} 56 57// 58// using the new node v0.10 "streams2" API 59// 60 61module.exports.LineStream = LineStream; 62 63function LineStream(options) { 64 stream.Transform.call(this, options); 65 options = options || {}; 66 67 // use objectMode to stop the output from being buffered 68 // which re-concatanates the lines, just without newlines. 69 this._readableState.objectMode = true; 70 this._lineBuffer = []; 71 this._keepEmptyLines = options.keepEmptyLines || false; 72 this._lastChunkEndedWithCR = false; 73 74 // take the source's encoding if we don't have one 75 var self = this; 76 this.on('pipe', function(src) { 77 if (!self.encoding) { 78 // but we can't do this for old-style streams 79 if (src instanceof stream.Readable) { 80 self.encoding = src._readableState.encoding; 81 } 82 } 83 }); 84} 85util.inherits(LineStream, stream.Transform); 86 87LineStream.prototype._transform = function(chunk, encoding, done) { 88 // decode binary chunks as UTF-8 89 encoding = encoding || 'utf8'; 90 91 if (Buffer.isBuffer(chunk)) { 92 if (encoding == 'buffer') { 93 chunk = chunk.toString(); // utf8 94 encoding = 'utf8'; 95 } 96 else { 97 chunk = chunk.toString(encoding); 98 } 99 } 100 this._chunkEncoding = encoding; 101 102 // see: http://www.unicode.org/reports/tr18/#Line_Boundaries 103 var lines = chunk.split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g); 104 105 // don't split CRLF which spans chunks 106 if (this._lastChunkEndedWithCR && chunk[0] == '\n') { 107 lines.shift(); 108 } 109 110 if (this._lineBuffer.length > 0) { 111 this._lineBuffer[this._lineBuffer.length - 1] += lines[0]; 112 lines.shift(); 113 } 114 115 this._lastChunkEndedWithCR = chunk[chunk.length - 1] == '\r'; 116 this._lineBuffer = this._lineBuffer.concat(lines); 117 this._pushBuffer(encoding, 1, done); 118}; 119 120LineStream.prototype._pushBuffer = function(encoding, keep, done) { 121 // always buffer the last (possibly partial) line 122 while (this._lineBuffer.length > keep) { 123 var line = this._lineBuffer.shift(); 124 // skip empty lines 125 if (this._keepEmptyLines || line.length > 0 ) { 126 if (!this.push(this._reencode(line, encoding))) { 127 // when the high-water mark is reached, defer pushes until the next tick 128 var self = this; 129 timers.setImmediate(function() { 130 self._pushBuffer(encoding, keep, done); 131 }); 132 return; 133 } 134 } 135 } 136 done(); 137}; 138 139LineStream.prototype._flush = function(done) { 140 this._pushBuffer(this._chunkEncoding, 0, done); 141}; 142 143// see Readable::push 144LineStream.prototype._reencode = function(line, chunkEncoding) { 145 if (this.encoding && this.encoding != chunkEncoding) { 146 return new Buffer(line, chunkEncoding).toString(this.encoding); 147 } 148 else if (this.encoding) { 149 // this should be the most common case, i.e. we're using an encoded source stream 150 return line; 151 } 152 else { 153 return new Buffer(line, chunkEncoding); 154 } 155}; 156