• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/**************************************************************************************************
2 * EJDB2 Node.js native API binding.
3 *
4 * MIT License
5 *
6 * Copyright (c) 2012-2021 Softmotions Ltd <info@softmotions.com>
7 *
8 * Permission is hereby granted, free of charge, to any person obtaining a copy
9 * of this software and associated documentation files (the "Software"), to deal
10 * in the Software without restriction, including without limitation the rights
11 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12 *  copies of the Software, and to permit persons to whom the Software is
13 * furnished to do so, subject to the following conditions:
14 *
15 * The above copyright notice and this permission notice shall be included in all
16 * copies or substantial portions of the Software.
17 *
18 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 * SOFTWARE.
25 *************************************************************************************************/
26
27const semver = require('semver');
28const {engines} = require('./package');
29
30if (!semver.satisfies(process.version, engines.node)) {
31  console.log(`Required node version ${engines.node} not satisfied with current version ${process.version}.`);
32  process.exit(1);
33}
34
35global.__ejdb_add_stream_result__ = addStreamResult; // Passing it to ejdb2_node init
36const {EJDB2Impl} = require('./binary')('ejdb2_node');
37const {Readable} = require('stream');
38delete global.__ejdb_add_stream_result__;
39
40/**
41 * EJDB2 Error helpers.
42 */
43class JBE {
44
45  /**
46   * Returns `true` if given error [err] is `IWKV_ERROR_NOTFOUND`
47   * @param {Error} err
48   * @returns {boolean}
49   */
50  static isNotFound(err) {
51    const code = (err.code || '').toString();
52    return code.indexOf('@ejdb IWRC:75001') == 0;
53  }
54
55  /**
56   * Returns `true` if given errror [err] is `JQL_ERROR_QUERY_PARSE`
57   * @param {Error} err
58   * @return {boolean}
59   */
60  static isInvalidQuery(err) {
61    const code = (err.code || '').toString();
62    return code.indexOf('@ejdb IWRC:87001') == 0;
63  }
64}
65
66/**
67 * EJDB document.
68 */
69class JBDOC {
70
71  /**
72   * Get document JSON object
73   */
74  get json() {
75    if (this._json != null) {
76      return this._json;
77    }
78    this._json = JSON.parse(this._raw);
79    this._raw = null;
80    return this._json;
81  }
82
83  /**
84   * @param {number} id Document ID
85   * @param {string} raw Document JSON as string
86   */
87  constructor(id, raw) {
88    this.id = id;
89    this._raw = raw;
90    this._json = null;
91  }
92
93  toString() {
94    return `JBDOC: ${this.id} ${this._raw != null ? this._raw : JSON.stringify(this.json)}`;
95  }
96}
97
98/**
99 * EJDB Query resultset stream.
100 */
101class JBDOCStream extends Readable {
102
103  get _impl() {
104    return this.jql._impl;
105  }
106
107  get writable() {
108    return !this._paused && !this._destroyed;
109  }
110
111  /**
112   * Query result stream.
113   * @param {JQL} jql
114   */
115  constructor(jql, opts) {
116    super({
117      objectMode: true,
118      highWaterMark: 64
119    });
120    this._pending = [];
121    this._paused = true;
122    this._destroyed = false;
123    this._aborted = false;
124    this.jql = jql;
125    this.opts = opts;
126    this.promise = this._impl.jql_stream_attach(jql, this, [opts.limit, opts.explainCallback])
127                       .catch((err) => this.destroy(err));
128  }
129
130  abort() {
131    // unpause if paused
132    // needed to release frozen on pause db query thread
133    this._doResume();
134    // set internal abort state
135    this._impl.jql_stream_abort(this);
136    this._aborted = true;
137  }
138
139  /**
140   * Destroy stream
141   */
142  _destroy(err, callback) {
143    if (!this._destroyed) {
144      this.abort();
145      this._destroyed = true;
146      setImmediate(() => this._impl.jql_stream_destroy(this));
147    }
148    callback(err);
149  }
150
151  _read() {
152    if (this._destroyed) {
153      return;
154    }
155    this._doResume();
156    if (this._pending.length > 0) {
157      // Flush pending records to consumer
158      // Maintaining pending list since `napi_threadsafe_function` queue
159      // may be not empty at the time of `jql_stream_pause` call
160      let pv = this._pending.shift();
161      while (this.writable && pv) {
162        if (pv.length == 0) { // Got pending EOF
163          this._pending.length = 0;
164          this.destroy();
165          break;
166        }
167        addStreamResult(this, pv[0], pv[1]);
168        pv = this._pending.shift();
169      }
170    }
171  }
172
173  _doResume() {
174    if (this._paused) {
175      this._impl.jql_stream_resume(this);
176      this._paused = false;
177    }
178  }
179
180  _doPause() {
181    if (!this._paused) {
182      this._impl.jql_stream_pause(this);
183      this._paused = true;
184    }
185  }
186}
187
188// Global module function for add results to query stream
189function addStreamResult(stream, id, jsondoc, log) {
190  if (stream._destroyed) {
191    return;
192  }
193  if (log != null && stream.opts.explainCallback != null) {
194    stream.opts.explainCallback(log);
195    delete stream.opts.explainCallback;
196  }
197
198  const count = (typeof jsondoc === 'number');
199  if (id >= 0 || count) {
200    if (!stream._aborted) {
201      if (stream._paused) {
202        // Maintaining pending list since `napi_threadsafe_function` queue
203        // may be not empty at the time of `jql_stream_pause` call
204        stream.pending.push([id, jsondoc]);
205        return;
206      }
207      let doc;
208      if (count) { // count int response
209        doc = new JBDOC(jsondoc, jsondoc);
210      } else if (jsondoc != null) {
211        doc = new JBDOC(id, jsondoc);
212      }
213      if (doc != null && stream.push(doc) == false) {
214        stream._doPause();
215      }
216    }
217  }
218
219  if (id < 0) { // last record
220    if (!stream._aborted && stream._paused) {
221      stream.pending.push([]);
222    } else {
223      stream.destroy();
224    }
225  }
226}
227
228/**
229 * EJDB Query.
230 */
231class JQL {
232
233  get _impl() {
234    return this.db._impl;
235  }
236
237  /**
238   * Get `limit` value used by query.
239   */
240  get limit() {
241    return this._impl.jql_limit(this);
242  }
243
244  /**
245   * @param {EJDB2} db
246   * @param {string} query
247   * @param {string} collection
248   */
249  constructor(db, query, collection) {
250    this.db = db;
251    this.query = query;
252    this.collection = collection;
253    this._impl.jql_init(this, query, collection);
254  }
255
256  /**
257   * Executes a query and returns a
258   * readable stream of matched documents.
259   *
260   * @param {Object} [opts]
261   * @return {ReadableStream<JBDOC>}
262   */
263  stream(opts) {
264    return new JBDOCStream(this, opts || {});
265  }
266
267  /**
268   * Executes this query and waits its completion.
269   *
270   * @param {Promise} opts
271   */
272  completionPromise(opts) {
273    const stream = this.stream(opts || {});
274    return new Promise((resolve, reject) => {
275      stream.on('data', () => stream.destroy());
276      stream.on('close', () => resolve());
277      stream.on('error', (err) => reject(err));
278    });
279  }
280
281  /**
282   * Returns a scalar integer value as result of query execution.
283   * Eg.: A count query: `/... | count`
284   * @param {Object} [opts]
285   * @return {Promise<number>}
286   */
287  scalarInt(opts) {
288    const stream = this.stream(opts);
289    return new Promise((resolve, reject) => {
290      stream.on('data', (doc) => {
291        resolve(doc.id);
292        stream.destroy();
293      });
294      stream.on('error', (err) => reject(err));
295    });
296  }
297
298  /**
299   * Returns result set as a list.
300   * Use it with caution on large data sets.
301   *
302   * @param {Object} [opts]
303   * @return {Promise<Array<JBDOC>>}
304   */
305  list(opts) {
306    const ret = [];
307    const stream = this.stream(opts);
308    return new Promise((resolve, reject) => {
309      stream.on('data', (doc) => ret.push(doc));
310      stream.on('close', () => resolve(ret));
311      stream.on('error', (err) => reject(err));
312    });
313  }
314
315  /**
316   * Collects up to [n] documents from result set into array.
317   * @param {number} n
318   * @param {Object} [opts]
319   * @return {Promise<Array<JBDOC>>}
320   */
321  firstN(n, opts) {
322    opts = opts || {};
323    opts.limit = n;
324    const ret = [];
325    const stream = this.stream(opts);
326    return new Promise((resolve, reject) => {
327      stream.on('data', (doc) => {
328        ret.push(doc);
329        if (ret.length >= n) {
330          stream.destroy();
331        }
332      });
333      stream.on('close', () => resolve(ret));
334      stream.on('error', (err) => reject(err));
335    });
336  }
337
338  /**
339   * Returns a first record in result set.
340   * If record is not found promise with `undefined` will be returned.
341   *
342   * @param {Object} [opts]
343   * @return {Promise<JBDOC|undefined>}
344   */
345  async first(opts) {
346    const fv = await this.firstN(1, opts);
347    return fv[0];
348  }
349
350  /**
351   * Set [json] at the specified [placeholder].
352   * @param {string|number} placeholder
353   * @param {string|object} val
354   * @return {JQL}
355   */
356  setJSON(placeholder, val) {
357    this._checkPlaceholder(placeholder);
358    if (typeof val !== 'string') {
359      val = JSON.stringify(val);
360    }
361    this._impl.jql_set(this, placeholder, val, 1);
362    return this;
363  }
364
365  /**
366   * Set [regexp] string at the specified [placeholder].
367   * @param {string|number} placeholder
368   * @param {string|RegExp} val
369   * @return {JQL}
370   */
371  setRegexp(placeholder, val) {
372    this._checkPlaceholder(placeholder);
373    if (val instanceof RegExp) {
374      const sval = val.toString();
375      val = sval.substring(1, sval.lastIndexOf('/'));
376    } else if (typeof val !== 'string') {
377      throw new Error('Regexp argument must be a string or RegExp object');
378    }
379    this._impl.jql_set(this, placeholder, val, 2);
380    return this;
381  }
382
383  /**
384   * Set number [val] at the specified [placeholder].
385   * @param {string|number} placeholder
386   * @param {number} val
387   * @return {JQL}
388   */
389  setNumber(placeholder, val) {
390    this._checkPlaceholder(placeholder);
391    if (typeof val !== 'number') {
392      throw new Error('Value must be a number');
393    }
394    this._impl.jql_set(this, placeholder, val, this._isInteger(val) ? 3 : 4);
395    return this;
396  }
397
398  /**
399   * Set boolean [val] at the specified [placeholder].
400   * @param {string|number} placeholder
401   * @param {boolean} val
402   * @return {JQL}
403   */
404  setBoolean(placeholder, val) {
405    this._checkPlaceholder(placeholder);
406    this._impl.jql_set(this, placeholder, !!val, 5);
407    return this;
408  }
409
410  /**
411   * Set string [val] at the specified [placeholder].
412   * @param {string|number} placeholder
413   * @param {string} val
414   * @return {JQL}
415   */
416  setString(placeholder, val) {
417    this._checkPlaceholder(placeholder);
418    if (val != null && typeof val !== 'string') {
419      val = val.toString();
420    }
421    this._impl.jql_set(this, placeholder, val, 6);
422    return this;
423  }
424
425  /**
426   * Set `null` at the specified [placeholder].
427   * @param {string|number} placeholder
428   * @return {JQL}
429   */
430  setNull(placeholder) {
431    this._checkPlaceholder(placeholder);
432    this._impl.jql_set(this, placeholder, null, 7);
433    return this;
434  }
435
436  _isInteger(n) {
437    return n === +n && n === (n | 0);
438  }
439
440  _checkPlaceholder(placeholder) {
441    const t = typeof placeholder;
442    if (t !== 'number' && t !== 'string') {
443      throw new Error('Invalid placeholder specified, must be either string or number');
444    }
445  }
446}
447
448/**
449 * EJDB2 Nodejs wrapper.
450 */
451class EJDB2 {
452
453  /**
454   * Open database instance.
455   *
456   * @param {String} path Path to database
457   * @param {Object} [opts]
458   * @returns {Promise<EJDB2>} EJDB2 instance promise
459   */
460  static open(path, opts) {
461    opts = opts || {};
462
463    function toArgs() {
464      let oflags = 0;
465      const ret = [path];
466      if (opts['readonly']) {
467        oflags |= 0x02;
468      }
469      if (opts['truncate']) {
470        oflags |= 0x04;
471      }
472      ret.push(oflags);
473      ret.push(opts['wal_enabled'] != null ? !!opts['wal_enabled'] : true);
474      ret.push(opts['wal_check_crc_on_checkpoint']);
475      ret.push(opts['wal_checkpoint_buffer_sz']);
476      ret.push(opts['wal_checkpoint_timeout_sec']);
477      ret.push(opts['wal_savepoint_timeout_sec']);
478      ret.push(opts['wal_wal_buffer_sz']);
479      ret.push(opts['document_buffer_sz']);
480      ret.push(opts['sort_buffer_sz']);
481      ret.push(opts['http_enabled']);
482      ret.push(opts['http_access_token']);
483      ret.push(opts['http_bind']);
484      ret.push(opts['http_max_body_size']);
485      ret.push(opts['http_port']);
486      ret.push(opts['http_read_anon']);
487      return ret;
488    }
489
490    const inst = new EJDB2(toArgs());
491    return inst._impl.open().then(() => inst);
492  }
493
494  constructor(args) {
495    this._impl = new EJDB2Impl(args);
496  }
497
498  /**
499   * Closes database instance.
500   * @return {Promise<void>}
501   */
502  close() {
503    return this._impl.close();
504  }
505
506  /**
507   * Saves [json] document under specified [id] or create a document
508   * with new generated `id`. Returns promise holding actual document `id`.
509   *
510   * @param {String} collection
511   * @param {Object|string} json
512   * @param {number} [id]
513   * @returns {Promise<number>}
514   */
515  put(collection, json, id) {
516    if (typeof json !== 'string') {
517      json = JSON.stringify(json);
518    }
519    return this._impl.put(collection, json, id);
520  }
521
522  /**
523   * Apply rfc6902/rfc7386 JSON [patch] to the document identified by [id].
524   *
525   * @param {String} collection
526   * @param {Object|string} json
527   * @param {number} id
528   * @return {Promise<void>}
529   */
530  patch(collection, json, id) {
531    return this._impl.patch(collection, json, id);
532  }
533
534  /**
535   * Apply JSON merge patch (rfc7396) to the document identified by `id` or
536   * insert new document under specified `id`.
537   *
538   * @param {String} collection
539   * @param {Object|string} json
540   * @param {number} id
541   * @return {Promise<void>}
542   */
543  patchOrPut(collection, json, id) {
544    return this._impl.patch_or_put(collection, json, id);
545  }
546
547  /**
548   * Get json body of document identified by [id] and stored in [collection].
549   *
550   * @param {String} collection
551   * @param {number} id
552   * @return {Promise<object>} JSON object
553   */
554  get(collection, id) {
555    return this._impl.get(collection, id).then((raw) => JSON.parse(raw));
556  }
557
558  /**
559   * Get json body of document identified by [id] and stored in [collection].
560   * If document with given `id` is not found then `null` will be resoved.
561   *
562   * @param {string} collection
563   * @param {number} id
564   * @return {Promise<object|null>} JSON object
565   */
566  getOrNull(collection, id) {
567    return this.get(collection, id).catch((err) => {
568      if (JBE.isNotFound(err)) {
569        return null;
570      } else {
571        return Promise.reject(err);
572      }
573    });
574  }
575
576  /**
577   * Get json body with database metadata.
578   *
579   * @return {Promise<object>}
580   */
581  info() {
582    return this._impl.info().then((raw) => JSON.parse(raw));
583  }
584
585  /**
586   * Removes document idenfied by [id] from [collection].
587   *
588   * @param {String} collection
589   * @param {number} id
590   * @return {Promise<void>}
591   */
592  del(collection, id) {
593    return this._impl.del(collection, id);
594  }
595
596  /**
597   * Renames collection.
598   *
599   * @param {String} oldCollectionName Collection to be renamed
600   * @param {String} newCollectionName New name of collection
601   * @return {Promise<void>}
602   */
603  renameCollection(oldCollectionName, newCollectionName) {
604    return this._impl.rename_collection(oldCollectionName, newCollectionName);
605  }
606
607  /**
608   * Ensures json document database index specified by [path] json pointer to string data type.
609   *
610   * @param {String} collection
611   * @param {String} path
612   * @param {boolean} [unique=false]
613   * @return {Promise<void>}
614   */
615  ensureStringIndex(collection, path, unique) {
616    return this._impl.index(collection, path, 0x04 | (unique ? 0x01 : 0), false);
617  }
618
619  /**
620   * Removes specified database index.
621   *
622   * @param {String} collection
623   * @param {String} path
624   * @param {boolean} [unique=false]
625   * @return {Promise<void>}
626   */
627  removeStringIndex(collection, path, unique) {
628    return this._impl.index(collection, path, 0x04 | (unique ? 0x01 : 0), true);
629  }
630
631  /**
632   * Ensures json document database index specified by [path] json pointer to integer data type.
633   *
634   * @param {String} collection
635   * @param {String} path
636   * @param {boolean} [unique=false]
637   * @return {Promise<void>}
638   */
639  ensureIntIndex(collection, path, unique) {
640    return this._impl.index(collection, path, 0x08 | (unique ? 0x01 : 0), false);
641  }
642
643  /**
644   * Removes specified database index.
645   *
646   * @param {String} collection
647   * @param {String} path
648   * @param {boolean} [unique=false]
649   * @return {Promise<void>}
650   */
651  removeIntIndex(collection, path, unique) {
652    return this._impl.index(collection, path, 0x08 | (unique ? 0x01 : 0), true);
653  }
654
655  /**
656   * Ensures json document database index specified by [path] json pointer to floating point data type.
657   *
658   * @param {String} collection
659   * @param {String} path
660   * @param {boolean} [unique=false]
661   * @return {Promise<void>}
662   */
663  ensureFloatIndex(collection, path, unique) {
664    return this._impl.index(collection, path, 0x10 | (unique ? 0x01 : 0), false);
665  }
666
667  /**
668   * Removes specified database index.
669   *
670   * @param {String} collection
671   * @param {String} path
672   * @param {boolean} [unique=false]
673   * @return {Promise<void>}
674   */
675  removeFloatIndex(collection, path, unique) {
676    return this._impl.index(collection, path, 0x10 | (unique ? 0x01 : 0), true);
677  }
678
679  /**
680   * Removes database [collection].
681   *
682   * @param {String} collection
683   * @return {Promise<void>}
684   */
685  removeCollection(collection) {
686    return this._impl.rmcoll(collection);
687  }
688
689  /**
690   * Create instance of [query] specified for [collection].
691   * If [collection] is not specified a [query] spec must contain collection name,
692   * eg: `@mycollection/[foo=bar]`
693   *
694   * @param {String} query
695   * @param {String} [collection]
696   * @returns {JQL}
697   */
698  createQuery(query, collection) {
699    return new JQL(this, query, collection);
700  }
701
702  /**
703   * Creates an online database backup image and copies it into the specified [fileName].
704   * During online backup phase read/write database operations are allowed and not
705   * blocked for significant amount of time. Returns promise with backup
706   * finish time as number of milliseconds since epoch.
707   *
708   * @param {String} fileName Backup image file path.
709   * @returns {Promise<number>}
710   */
711  onlineBackup(fileName) {
712    return this._impl.online_backup(fileName);
713  }
714}
715
716module.exports = {
717  EJDB2,
718  JBE
719};
720
721
722