1'use strict' 2 3const BB = require('bluebird') 4 5const cacheFile = require('npm-cache-filename') 6const chownr = BB.promisify(require('chownr')) 7const correctMkdir = BB.promisify(require('../utils/correct-mkdir.js')) 8const figgyPudding = require('figgy-pudding') 9const fs = require('graceful-fs') 10const JSONStream = require('JSONStream') 11const log = require('npmlog') 12const mkdir = BB.promisify(require('gentle-fs').mkdir) 13const ms = require('mississippi') 14const npmFetch = require('libnpm/fetch') 15const path = require('path') 16const sortedUnionStream = require('sorted-union-stream') 17const url = require('url') 18const writeStreamAtomic = require('fs-write-stream-atomic') 19 20const statAsync = BB.promisify(fs.stat) 21 22const APMOpts = figgyPudding({ 23 cache: {}, 24 registry: {} 25}) 26// Returns a sorted stream of all package metadata. Internally, takes care of 27// maintaining its metadata cache and making partial or full remote requests, 28// according to staleness, validity, etc. 29// 30// The local cache must hold certain invariants: 31// 1. It must be a proper JSON object 32// 2. It must have its keys lexically sorted 33// 3. The first entry must be `_updated` with a millisecond timestamp as a val. 34// 4. It must include all entries that exist in the metadata endpoint as of 35// the value in `_updated` 36module.exports = allPackageMetadata 37function allPackageMetadata (opts) { 38 const staleness = opts.staleness 39 const stream = ms.through.obj() 40 41 opts = APMOpts(opts) 42 const cacheBase = cacheFile(path.resolve(path.dirname(opts.cache)))(url.resolve(opts.registry, '/-/all')) 43 const cachePath = path.join(cacheBase, '.cache.json') 44 createEntryStream( 45 cachePath, staleness, opts 46 ).then(({entryStream, latest, newEntries}) => { 47 log.silly('all-package-metadata', 'entry stream created') 48 if (entryStream && newEntries) { 49 return createCacheWriteStream(cachePath, latest, opts).then(writer => { 50 log.silly('all-package-metadata', 'output stream created') 51 ms.pipeline.obj(entryStream, writer, stream) 52 }) 53 } else if (entryStream) { 54 ms.pipeline.obj(entryStream, stream) 55 } else { 56 stream.emit('error', new Error('No search sources available')) 57 } 58 }).catch(err => stream.emit('error', err)) 59 return stream 60} 61 62// Creates a stream of the latest available package metadata. 63// Metadata will come from a combination of the local cache and remote data. 64module.exports._createEntryStream = createEntryStream 65function createEntryStream (cachePath, staleness, opts) { 66 return createCacheEntryStream( 67 cachePath, opts 68 ).catch(err => { 69 log.warn('', 'Failed to read search cache. Rebuilding') 70 log.silly('all-package-metadata', 'cache read error: ', err) 71 return {} 72 }).then(({ 73 updateStream: cacheStream, 74 updatedLatest: cacheLatest 75 }) => { 76 cacheLatest = cacheLatest || 0 77 return createEntryUpdateStream(staleness, cacheLatest, opts).catch(err => { 78 log.warn('', 'Search data request failed, search might be stale') 79 log.silly('all-package-metadata', 'update request error: ', err) 80 return {} 81 }).then(({updateStream, updatedLatest}) => { 82 updatedLatest = updatedLatest || 0 83 const latest = updatedLatest || cacheLatest 84 if (!cacheStream && !updateStream) { 85 throw new Error('No search sources available') 86 } 87 if (cacheStream && updateStream) { 88 // Deduped, unioned, sorted stream from the combination of both. 89 return { 90 entryStream: createMergedStream(cacheStream, updateStream), 91 latest, 92 newEntries: !!updatedLatest 93 } 94 } else { 95 // Either one works if one or the other failed 96 return { 97 entryStream: cacheStream || updateStream, 98 latest, 99 newEntries: !!updatedLatest 100 } 101 } 102 }) 103 }) 104} 105 106// Merges `a` and `b` into one stream, dropping duplicates in favor of entries 107// in `b`. Both input streams should already be individually sorted, and the 108// returned output stream will have semantics resembling the merge step of a 109// plain old merge sort. 110module.exports._createMergedStream = createMergedStream 111function createMergedStream (a, b) { 112 linkStreams(a, b) 113 return sortedUnionStream(b, a, ({name}) => name) 114} 115 116// Reads the local index and returns a stream that spits out package data. 117module.exports._createCacheEntryStream = createCacheEntryStream 118function createCacheEntryStream (cacheFile, opts) { 119 log.verbose('all-package-metadata', 'creating entry stream from local cache') 120 log.verbose('all-package-metadata', cacheFile) 121 return statAsync(cacheFile).then(stat => { 122 // TODO - This isn't very helpful if `cacheFile` is empty or just `{}` 123 const entryStream = ms.pipeline.obj( 124 fs.createReadStream(cacheFile), 125 JSONStream.parse('*'), 126 // I believe this passthrough is necessary cause `jsonstream` returns 127 // weird custom streams that behave funny sometimes. 128 ms.through.obj() 129 ) 130 return extractUpdated(entryStream, 'cached-entry-stream', opts) 131 }) 132} 133 134// Stream of entry updates from the server. If `latest` is `0`, streams the 135// entire metadata object from the registry. 136module.exports._createEntryUpdateStream = createEntryUpdateStream 137function createEntryUpdateStream (staleness, latest, opts) { 138 log.verbose('all-package-metadata', 'creating remote entry stream') 139 let partialUpdate = false 140 let uri = '/-/all' 141 if (latest && (Date.now() - latest < (staleness * 1000))) { 142 // Skip the request altogether if our `latest` isn't stale. 143 log.verbose('all-package-metadata', 'Local data up to date, skipping update') 144 return BB.resolve({}) 145 } else if (latest === 0) { 146 log.warn('', 'Building the local index for the first time, please be patient') 147 log.verbose('all-package-metadata', 'No cached data: requesting full metadata db') 148 } else { 149 log.verbose('all-package-metadata', 'Cached data present with timestamp:', latest, 'requesting partial index update') 150 uri += '/since?stale=update_after&startkey=' + latest 151 partialUpdate = true 152 } 153 return npmFetch(uri, opts).then(res => { 154 log.silly('all-package-metadata', 'request stream opened, code:', res.statusCode) 155 let entryStream = ms.pipeline.obj( 156 res.body, 157 JSONStream.parse('*', (pkg, key) => { 158 if (key[0] === '_updated' || key[0][0] !== '_') { 159 return pkg 160 } 161 }) 162 ) 163 if (partialUpdate) { 164 // The `/all/since` endpoint doesn't return `_updated`, so we 165 // just use the request's own timestamp. 166 return { 167 updateStream: entryStream, 168 updatedLatest: Date.parse(res.headers.get('date')) 169 } 170 } else { 171 return extractUpdated(entryStream, 'entry-update-stream', opts) 172 } 173 }) 174} 175 176// Both the (full) remote requests and the local index have `_updated` as their 177// first returned entries. This is the "latest" unix timestamp for the metadata 178// in question. This code does a bit of juggling with the data streams 179// so that we can pretend that field doesn't exist, but still extract `latest` 180function extractUpdated (entryStream, label, opts) { 181 log.silly('all-package-metadata', 'extracting latest') 182 return new BB((resolve, reject) => { 183 function nope (msg) { 184 return function () { 185 log.warn('all-package-metadata', label, msg) 186 entryStream.removeAllListeners() 187 entryStream.destroy() 188 reject(new Error(msg)) 189 } 190 } 191 const onErr = nope('Failed to read stream') 192 const onEnd = nope('Empty or invalid stream') 193 entryStream.on('error', onErr) 194 entryStream.on('end', onEnd) 195 entryStream.once('data', latest => { 196 log.silly('all-package-metadata', 'got first stream entry for', label, latest) 197 entryStream.removeListener('error', onErr) 198 entryStream.removeListener('end', onEnd) 199 if (typeof latest === 'number') { 200 // The extra pipeline is to return a stream that will implicitly unpause 201 // after having an `.on('data')` listener attached, since using this 202 // `data` event broke its initial state. 203 resolve({ 204 updateStream: entryStream.pipe(ms.through.obj()), 205 updatedLatest: latest 206 }) 207 } else { 208 reject(new Error('expected first entry to be _updated')) 209 } 210 }) 211 }) 212} 213 214// Creates a stream that writes input metadata to the current cache. 215// Cache updates are atomic, and the stream closes when *everything* is done. 216// The stream is also passthrough, so entries going through it will also 217// be output from it. 218module.exports._createCacheWriteStream = createCacheWriteStream 219function createCacheWriteStream (cacheFile, latest, opts) { 220 return _ensureCacheDirExists(cacheFile, opts).then(({uid, gid}) => { 221 log.silly('all-package-metadata', 'creating output stream') 222 const outStream = _createCacheOutStream() 223 const cacheFileStream = writeStreamAtomic(cacheFile) 224 const inputStream = _createCacheInStream( 225 cacheFileStream, outStream, latest 226 ) 227 228 // Glue together the various streams so they fail together. 229 // `cacheFileStream` errors are already handled by the `inputStream` 230 // pipeline 231 let errEmitted = false 232 linkStreams(inputStream, outStream, () => { errEmitted = true }) 233 234 cacheFileStream.on('close', () => { 235 if (!errEmitted) { 236 if (typeof uid === 'number' && 237 typeof gid === 'number' && 238 process.getuid && 239 process.getgid && 240 (process.getuid() !== uid || process.getgid() !== gid)) { 241 chownr.sync(cacheFile, uid, gid) 242 } 243 outStream.end() 244 } 245 }) 246 247 return ms.duplex.obj(inputStream, outStream) 248 }) 249} 250 251// return the {uid,gid} that the cache should have 252function _ensureCacheDirExists (cacheFile, opts) { 253 var cacheBase = path.dirname(cacheFile) 254 log.silly('all-package-metadata', 'making sure cache dir exists at', cacheBase) 255 return correctMkdir(opts.cache).then(st => { 256 return mkdir(cacheBase).then(made => { 257 return chownr(made || cacheBase, st.uid, st.gid) 258 }).then(() => ({ uid: st.uid, gid: st.gid })) 259 }) 260} 261 262function _createCacheOutStream () { 263 // NOTE: this looks goofy, but it's necessary in order to get 264 // JSONStream to play nice with the rest of everything. 265 return ms.pipeline.obj( 266 ms.through(), 267 JSONStream.parse('*', (obj, key) => { 268 // This stream happens to get _updated passed through it, for 269 // implementation reasons. We make sure to filter it out cause 270 // the fact that it comes t 271 if (typeof obj === 'object') { 272 return obj 273 } 274 }), 275 ms.through.obj() 276 ) 277} 278 279function _createCacheInStream (writer, outStream, latest) { 280 let updatedWritten = false 281 const inStream = ms.pipeline.obj( 282 ms.through.obj((pkg, enc, cb) => { 283 if (!updatedWritten && typeof pkg === 'number') { 284 // This is the `_updated` value getting sent through. 285 updatedWritten = true 286 return cb(null, ['_updated', pkg]) 287 } else if (typeof pkg !== 'object') { 288 this.emit('error', new Error('invalid value written to input stream')) 289 } else { 290 // The [key, val] format is expected by `jsonstream` for object writing 291 cb(null, [pkg.name, pkg]) 292 } 293 }), 294 JSONStream.stringifyObject('{', ',', '}'), 295 ms.through((chunk, enc, cb) => { 296 // This tees off the buffer data to `outStream`, and then continues 297 // the pipeline as usual 298 outStream.write(chunk, enc, () => cb(null, chunk)) 299 }), 300 // And finally, we write to the cache file. 301 writer 302 ) 303 inStream.write(latest) 304 return inStream 305} 306 307// Links errors between `a` and `b`, preventing cycles, and calls `cb` if 308// an error happens, once per error. 309function linkStreams (a, b, cb) { 310 var lastError = null 311 a.on('error', function (err) { 312 if (err !== lastError) { 313 lastError = err 314 b.emit('error', err) 315 cb && cb(err) 316 } 317 }) 318 b.on('error', function (err) { 319 if (err !== lastError) { 320 lastError = err 321 a.emit('error', err) 322 cb && cb(err) 323 } 324 }) 325} 326