• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict'
2
3const BB = require('bluebird')
4
5const cacheFile = require('npm-cache-filename')
6const chownr = BB.promisify(require('chownr'))
7const correctMkdir = BB.promisify(require('../utils/correct-mkdir.js'))
8const figgyPudding = require('figgy-pudding')
9const fs = require('graceful-fs')
10const JSONStream = require('JSONStream')
11const log = require('npmlog')
12const mkdir = BB.promisify(require('gentle-fs').mkdir)
13const ms = require('mississippi')
14const npmFetch = require('libnpm/fetch')
15const path = require('path')
16const sortedUnionStream = require('sorted-union-stream')
17const url = require('url')
18const writeStreamAtomic = require('fs-write-stream-atomic')
19
20const statAsync = BB.promisify(fs.stat)
21
22const APMOpts = figgyPudding({
23  cache: {},
24  registry: {}
25})
26// Returns a sorted stream of all package metadata. Internally, takes care of
27// maintaining its metadata cache and making partial or full remote requests,
28// according to staleness, validity, etc.
29//
30// The local cache must hold certain invariants:
31// 1. It must be a proper JSON object
32// 2. It must have its keys lexically sorted
33// 3. The first entry must be `_updated` with a millisecond timestamp as a val.
34// 4. It must include all entries that exist in the metadata endpoint as of
35//    the value in `_updated`
36module.exports = allPackageMetadata
37function allPackageMetadata (opts) {
38  const staleness = opts.staleness
39  const stream = ms.through.obj()
40
41  opts = APMOpts(opts)
42  const cacheBase = cacheFile(path.resolve(path.dirname(opts.cache)))(url.resolve(opts.registry, '/-/all'))
43  const cachePath = path.join(cacheBase, '.cache.json')
44  createEntryStream(
45    cachePath, staleness, opts
46  ).then(({entryStream, latest, newEntries}) => {
47    log.silly('all-package-metadata', 'entry stream created')
48    if (entryStream && newEntries) {
49      return createCacheWriteStream(cachePath, latest, opts).then(writer => {
50        log.silly('all-package-metadata', 'output stream created')
51        ms.pipeline.obj(entryStream, writer, stream)
52      })
53    } else if (entryStream) {
54      ms.pipeline.obj(entryStream, stream)
55    } else {
56      stream.emit('error', new Error('No search sources available'))
57    }
58  }).catch(err => stream.emit('error', err))
59  return stream
60}
61
62// Creates a stream of the latest available package metadata.
63// Metadata will come from a combination of the local cache and remote data.
64module.exports._createEntryStream = createEntryStream
65function createEntryStream (cachePath, staleness, opts) {
66  return createCacheEntryStream(
67    cachePath, opts
68  ).catch(err => {
69    log.warn('', 'Failed to read search cache. Rebuilding')
70    log.silly('all-package-metadata', 'cache read error: ', err)
71    return {}
72  }).then(({
73    updateStream: cacheStream,
74    updatedLatest: cacheLatest
75  }) => {
76    cacheLatest = cacheLatest || 0
77    return createEntryUpdateStream(staleness, cacheLatest, opts).catch(err => {
78      log.warn('', 'Search data request failed, search might be stale')
79      log.silly('all-package-metadata', 'update request error: ', err)
80      return {}
81    }).then(({updateStream, updatedLatest}) => {
82      updatedLatest = updatedLatest || 0
83      const latest = updatedLatest || cacheLatest
84      if (!cacheStream && !updateStream) {
85        throw new Error('No search sources available')
86      }
87      if (cacheStream && updateStream) {
88        // Deduped, unioned, sorted stream from the combination of both.
89        return {
90          entryStream: createMergedStream(cacheStream, updateStream),
91          latest,
92          newEntries: !!updatedLatest
93        }
94      } else {
95        // Either one works if one or the other failed
96        return {
97          entryStream: cacheStream || updateStream,
98          latest,
99          newEntries: !!updatedLatest
100        }
101      }
102    })
103  })
104}
105
106// Merges `a` and `b` into one stream, dropping duplicates in favor of entries
107// in `b`. Both input streams should already be individually sorted, and the
108// returned output stream will have semantics resembling the merge step of a
109// plain old merge sort.
110module.exports._createMergedStream = createMergedStream
111function createMergedStream (a, b) {
112  linkStreams(a, b)
113  return sortedUnionStream(b, a, ({name}) => name)
114}
115
116// Reads the local index and returns a stream that spits out package data.
117module.exports._createCacheEntryStream = createCacheEntryStream
118function createCacheEntryStream (cacheFile, opts) {
119  log.verbose('all-package-metadata', 'creating entry stream from local cache')
120  log.verbose('all-package-metadata', cacheFile)
121  return statAsync(cacheFile).then(stat => {
122    // TODO - This isn't very helpful if `cacheFile` is empty or just `{}`
123    const entryStream = ms.pipeline.obj(
124      fs.createReadStream(cacheFile),
125      JSONStream.parse('*'),
126      // I believe this passthrough is necessary cause `jsonstream` returns
127      // weird custom streams that behave funny sometimes.
128      ms.through.obj()
129    )
130    return extractUpdated(entryStream, 'cached-entry-stream', opts)
131  })
132}
133
134// Stream of entry updates from the server. If `latest` is `0`, streams the
135// entire metadata object from the registry.
136module.exports._createEntryUpdateStream = createEntryUpdateStream
137function createEntryUpdateStream (staleness, latest, opts) {
138  log.verbose('all-package-metadata', 'creating remote entry stream')
139  let partialUpdate = false
140  let uri = '/-/all'
141  if (latest && (Date.now() - latest < (staleness * 1000))) {
142    // Skip the request altogether if our `latest` isn't stale.
143    log.verbose('all-package-metadata', 'Local data up to date, skipping update')
144    return BB.resolve({})
145  } else if (latest === 0) {
146    log.warn('', 'Building the local index for the first time, please be patient')
147    log.verbose('all-package-metadata', 'No cached data: requesting full metadata db')
148  } else {
149    log.verbose('all-package-metadata', 'Cached data present with timestamp:', latest, 'requesting partial index update')
150    uri += '/since?stale=update_after&startkey=' + latest
151    partialUpdate = true
152  }
153  return npmFetch(uri, opts).then(res => {
154    log.silly('all-package-metadata', 'request stream opened, code:', res.statusCode)
155    let entryStream = ms.pipeline.obj(
156      res.body,
157      JSONStream.parse('*', (pkg, key) => {
158        if (key[0] === '_updated' || key[0][0] !== '_') {
159          return pkg
160        }
161      })
162    )
163    if (partialUpdate) {
164      // The `/all/since` endpoint doesn't return `_updated`, so we
165      // just use the request's own timestamp.
166      return {
167        updateStream: entryStream,
168        updatedLatest: Date.parse(res.headers.get('date'))
169      }
170    } else {
171      return extractUpdated(entryStream, 'entry-update-stream', opts)
172    }
173  })
174}
175
176// Both the (full) remote requests and the local index have `_updated` as their
177// first returned entries. This is the "latest" unix timestamp for the metadata
178// in question. This code does a bit of juggling with the data streams
179// so that we can pretend that field doesn't exist, but still extract `latest`
180function extractUpdated (entryStream, label, opts) {
181  log.silly('all-package-metadata', 'extracting latest')
182  return new BB((resolve, reject) => {
183    function nope (msg) {
184      return function () {
185        log.warn('all-package-metadata', label, msg)
186        entryStream.removeAllListeners()
187        entryStream.destroy()
188        reject(new Error(msg))
189      }
190    }
191    const onErr = nope('Failed to read stream')
192    const onEnd = nope('Empty or invalid stream')
193    entryStream.on('error', onErr)
194    entryStream.on('end', onEnd)
195    entryStream.once('data', latest => {
196      log.silly('all-package-metadata', 'got first stream entry for', label, latest)
197      entryStream.removeListener('error', onErr)
198      entryStream.removeListener('end', onEnd)
199      if (typeof latest === 'number') {
200        // The extra pipeline is to return a stream that will implicitly unpause
201        // after having an `.on('data')` listener attached, since using this
202        // `data` event broke its initial state.
203        resolve({
204          updateStream: entryStream.pipe(ms.through.obj()),
205          updatedLatest: latest
206        })
207      } else {
208        reject(new Error('expected first entry to be _updated'))
209      }
210    })
211  })
212}
213
214// Creates a stream that writes input metadata to the current cache.
215// Cache updates are atomic, and the stream closes when *everything* is done.
216// The stream is also passthrough, so entries going through it will also
217// be output from it.
218module.exports._createCacheWriteStream = createCacheWriteStream
219function createCacheWriteStream (cacheFile, latest, opts) {
220  return _ensureCacheDirExists(cacheFile, opts).then(({uid, gid}) => {
221    log.silly('all-package-metadata', 'creating output stream')
222    const outStream = _createCacheOutStream()
223    const cacheFileStream = writeStreamAtomic(cacheFile)
224    const inputStream = _createCacheInStream(
225      cacheFileStream, outStream, latest
226    )
227
228    // Glue together the various streams so they fail together.
229    // `cacheFileStream` errors are already handled by the `inputStream`
230    // pipeline
231    let errEmitted = false
232    linkStreams(inputStream, outStream, () => { errEmitted = true })
233
234    cacheFileStream.on('close', () => {
235      if (!errEmitted) {
236        if (typeof uid === 'number' &&
237            typeof gid === 'number' &&
238            process.getuid &&
239            process.getgid &&
240            (process.getuid() !== uid || process.getgid() !== gid)) {
241          chownr.sync(cacheFile, uid, gid)
242        }
243        outStream.end()
244      }
245    })
246
247    return ms.duplex.obj(inputStream, outStream)
248  })
249}
250
251// return the {uid,gid} that the cache should have
252function _ensureCacheDirExists (cacheFile, opts) {
253  var cacheBase = path.dirname(cacheFile)
254  log.silly('all-package-metadata', 'making sure cache dir exists at', cacheBase)
255  return correctMkdir(opts.cache).then(st => {
256    return mkdir(cacheBase).then(made => {
257      return chownr(made || cacheBase, st.uid, st.gid)
258    }).then(() => ({ uid: st.uid, gid: st.gid }))
259  })
260}
261
262function _createCacheOutStream () {
263  // NOTE: this looks goofy, but it's necessary in order to get
264  //       JSONStream to play nice with the rest of everything.
265  return ms.pipeline.obj(
266    ms.through(),
267    JSONStream.parse('*', (obj, key) => {
268      // This stream happens to get _updated passed through it, for
269      // implementation reasons. We make sure to filter it out cause
270      // the fact that it comes t
271      if (typeof obj === 'object') {
272        return obj
273      }
274    }),
275    ms.through.obj()
276  )
277}
278
279function _createCacheInStream (writer, outStream, latest) {
280  let updatedWritten = false
281  const inStream = ms.pipeline.obj(
282    ms.through.obj((pkg, enc, cb) => {
283      if (!updatedWritten && typeof pkg === 'number') {
284        // This is the `_updated` value getting sent through.
285        updatedWritten = true
286        return cb(null, ['_updated', pkg])
287      } else if (typeof pkg !== 'object') {
288        this.emit('error', new Error('invalid value written to input stream'))
289      } else {
290        // The [key, val] format is expected by `jsonstream` for object writing
291        cb(null, [pkg.name, pkg])
292      }
293    }),
294    JSONStream.stringifyObject('{', ',', '}'),
295    ms.through((chunk, enc, cb) => {
296      // This tees off the buffer data to `outStream`, and then continues
297      // the pipeline as usual
298      outStream.write(chunk, enc, () => cb(null, chunk))
299    }),
300    // And finally, we write to the cache file.
301    writer
302  )
303  inStream.write(latest)
304  return inStream
305}
306
307// Links errors between `a` and `b`, preventing cycles, and calls `cb` if
308// an error happens, once per error.
309function linkStreams (a, b, cb) {
310  var lastError = null
311  a.on('error', function (err) {
312    if (err !== lastError) {
313      lastError = err
314      b.emit('error', err)
315      cb && cb(err)
316    }
317  })
318  b.on('error', function (err) {
319    if (err !== lastError) {
320      lastError = err
321      a.emit('error', err)
322      cb && cb(err)
323    }
324  })
325}
326