• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (C) 2011-2015 John Hewson
2//
3// Permission is hereby granted, free of charge, to any person obtaining a copy
4// of this software and associated documentation files (the "Software"), to
5// deal in the Software without restriction, including without limitation the
6// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7// sell copies of the Software, and to permit persons to whom the Software is
8// furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19// IN THE SOFTWARE.
20
21var stream = require('stream'),
22    util = require('util'),
23    timers = require('timers');
24
25// convinience API
26module.exports = function(readStream, options) {
27  return module.exports.createStream(readStream, options);
28};
29
30// basic API
31module.exports.createStream = function(readStream, options) {
32  if (readStream) {
33    return createLineStream(readStream, options);
34  } else {
35    return new LineStream(options);
36  }
37};
38
39// deprecated API
40module.exports.createLineStream = function(readStream) {
41  console.log('WARNING: byline#createLineStream is deprecated and will be removed soon');
42  return createLineStream(readStream);
43};
44
45function createLineStream(readStream, options) {
46  if (!readStream) {
47    throw new Error('expected readStream');
48  }
49  if (!readStream.readable) {
50    throw new Error('readStream must be readable');
51  }
52  var ls = new LineStream(options);
53  readStream.pipe(ls);
54  return ls;
55}
56
57//
58// using the new node v0.10 "streams2" API
59//
60
61module.exports.LineStream = LineStream;
62
63function LineStream(options) {
64  stream.Transform.call(this, options);
65  options = options || {};
66
67  // use objectMode to stop the output from being buffered
68  // which re-concatanates the lines, just without newlines.
69  this._readableState.objectMode = true;
70  this._lineBuffer = [];
71  this._keepEmptyLines = options.keepEmptyLines || false;
72  this._lastChunkEndedWithCR = false;
73
74  // take the source's encoding if we don't have one
75  var self = this;
76  this.on('pipe', function(src) {
77    if (!self.encoding) {
78      // but we can't do this for old-style streams
79      if (src instanceof stream.Readable) {
80        self.encoding = src._readableState.encoding;
81      }
82    }
83  });
84}
85util.inherits(LineStream, stream.Transform);
86
87LineStream.prototype._transform = function(chunk, encoding, done) {
88  // decode binary chunks as UTF-8
89  encoding = encoding || 'utf8';
90
91  if (Buffer.isBuffer(chunk)) {
92    if (encoding == 'buffer') {
93      chunk = chunk.toString(); // utf8
94      encoding = 'utf8';
95    }
96    else {
97     chunk = chunk.toString(encoding);
98    }
99  }
100  this._chunkEncoding = encoding;
101
102  // see: http://www.unicode.org/reports/tr18/#Line_Boundaries
103  var lines = chunk.split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g);
104
105  // don't split CRLF which spans chunks
106  if (this._lastChunkEndedWithCR && chunk[0] == '\n') {
107    lines.shift();
108  }
109
110  if (this._lineBuffer.length > 0) {
111    this._lineBuffer[this._lineBuffer.length - 1] += lines[0];
112    lines.shift();
113  }
114
115  this._lastChunkEndedWithCR = chunk[chunk.length - 1] == '\r';
116  this._lineBuffer = this._lineBuffer.concat(lines);
117  this._pushBuffer(encoding, 1, done);
118};
119
120LineStream.prototype._pushBuffer = function(encoding, keep, done) {
121  // always buffer the last (possibly partial) line
122  while (this._lineBuffer.length > keep) {
123    var line = this._lineBuffer.shift();
124    // skip empty lines
125    if (this._keepEmptyLines || line.length > 0 ) {
126      if (!this.push(this._reencode(line, encoding))) {
127        // when the high-water mark is reached, defer pushes until the next tick
128        var self = this;
129        timers.setImmediate(function() {
130          self._pushBuffer(encoding, keep, done);
131        });
132        return;
133      }
134    }
135  }
136  done();
137};
138
139LineStream.prototype._flush = function(done) {
140  this._pushBuffer(this._chunkEncoding, 0, done);
141};
142
143// see Readable::push
144LineStream.prototype._reencode = function(line, chunkEncoding) {
145  if (this.encoding && this.encoding != chunkEncoding) {
146    return new Buffer(line, chunkEncoding).toString(this.encoding);
147  }
148  else if (this.encoding) {
149    // this should be the most common case, i.e. we're using an encoded source stream
150    return line;
151  }
152  else {
153    return new Buffer(line, chunkEncoding);
154  }
155};
156