• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/**************************************************************************************************
2 * EJDB2 Node.js native API binding.
3 *
4 * MIT License
5 *
6 * Copyright (c) 2012-2022 Softmotions Ltd <info@softmotions.com>
7 *
8 * Permission is hereby granted, free of charge, to any person obtaining a copy
9 * of this software and associated documentation files (the "Software"), to deal
10 * in the Software without restriction, including without limitation the rights
11 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12 *  copies of the Software, and to permit persons to whom the Software is
13 * furnished to do so, subject to the following conditions:
14 *
15 * The above copyright notice and this permission notice shall be included in all
16 * copies or substantial portions of the Software.
17 *
18 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 * SOFTWARE.
25 *************************************************************************************************/
26
27const semver = require('semver');
28const {engines} = require('./package');
29
30if (!semver.satisfies(process.version, engines.node)) {
31  console.log(`Required node version ${engines.node} not satisfied with current version ${process.version}.`);
32  process.exit(1);
33}
34
35global.__ejdb_add_stream_result__ = addStreamResult; // Passing it to ejdb2_node init
36const {EJDB2Impl} = require('./binary')('ejdb2_node');
37const {Readable} = require('stream');
38delete global.__ejdb_add_stream_result__;
39
40/**
41 * EJDB2 Error helpers.
42 */
43class JBE {
44
45  /**
46   * Returns `true` if given error [err] is `IWKV_ERROR_NOTFOUND`
47   * @param {Error} err
48   * @returns {boolean}
49   */
50  static isNotFound(err) {
51    const code = (err.code || '').toString();
52    return code.indexOf('@ejdb IWRC:75001') == 0;
53  }
54
55  /**
56   * Returns `true` if given errror [err] is `JQL_ERROR_QUERY_PARSE`
57   * @param {Error} err
58   * @return {boolean}
59   */
60  static isInvalidQuery(err) {
61    const code = (err.code || '').toString();
62    return code.indexOf('@ejdb IWRC:87001') == 0;
63  }
64}
65
66/**
67 * EJDB document.
68 */
69class JBDOC {
70
71  /**
72   * Get document JSON object
73   */
74  get json() {
75    if (this._json != null) {
76      return this._json;
77    }
78    this._json = JSON.parse(this._raw);
79    this._raw = null;
80    return this._json;
81  }
82
83  /**
84   * @param {number} id Document ID
85   * @param {string} raw Document JSON as string
86   */
87  constructor(id, raw) {
88    this.id = id;
89    this._raw = raw;
90    this._json = null;
91  }
92
93  toString() {
94    return `JBDOC: ${this.id} ${this._raw != null ? this._raw : JSON.stringify(this.json)}`;
95  }
96}
97
98/**
99 * EJDB Query resultset stream.
100 */
101class JBDOCStream extends Readable {
102
103  get _impl() {
104    return this.jql._impl;
105  }
106
107  get writable() {
108    return !this._paused && !this._destroyed;
109  }
110
111  /**
112   * Query result stream.
113   * @param {JQL} jql
114   */
115  constructor(jql, opts) {
116    super({
117      objectMode: true,
118      highWaterMark: 64
119    });
120    this._pending = [];
121    this._paused = true;
122    this._destroyed = false;
123    this._aborted = false;
124    this.jql = jql;
125    this.opts = opts;
126    this.promise = this._impl.jql_stream_attach(jql, this, [opts.limit, opts.explainCallback])
127                       .catch((err) => this.destroy(err));
128  }
129
130  abort() {
131    // unpause if paused
132    // needed to release frozen on pause db query thread
133    this._doResume();
134    // set internal abort state
135    this._impl.jql_stream_abort(this);
136    this._aborted = true;
137  }
138
139  /**
140   * Destroy stream
141   */
142  _destroy(err, callback) {
143    if (!this._destroyed) {
144      this.abort();
145      this._destroyed = true;
146      setImmediate(() => this._impl.jql_stream_destroy(this));
147    }
148    callback(err);
149  }
150
151  _read() {
152    if (this._destroyed) {
153      return;
154    }
155    this._doResume();
156    if (this._pending.length > 0) {
157      // Flush pending records to consumer
158      // Maintaining pending list since `napi_threadsafe_function` queue
159      // may be not empty at the time of `jql_stream_pause` call
160      let pv = this._pending.shift();
161      while (this.writable && pv) {
162        if (pv.length == 0) { // Got pending EOF
163          this._pending.length = 0;
164          this.destroy();
165          break;
166        }
167        addStreamResult(this, pv[0], pv[1]);
168        pv = this._pending.shift();
169      }
170    }
171  }
172
173  _doResume() {
174    if (this._paused) {
175      this._impl.jql_stream_resume(this);
176      this._paused = false;
177    }
178  }
179
180  _doPause() {
181    if (!this._paused) {
182      this._impl.jql_stream_pause(this);
183      this._paused = true;
184    }
185  }
186}
187
188// Global module function for add results to query stream
189function addStreamResult(stream, id, jsondoc, log) {
190  if (stream._destroyed) {
191    return;
192  }
193  if (log != null && stream.opts.explainCallback != null) {
194    stream.opts.explainCallback(log);
195    delete stream.opts.explainCallback;
196  }
197
198  const count = (typeof jsondoc === 'number');
199  if (id >= 0 || count) {
200    if (!stream._aborted) {
201      if (stream._paused) {
202        // Maintaining pending list since `napi_threadsafe_function` queue
203        // may be not empty at the time of `jql_stream_pause` call
204        stream.pending.push([id, jsondoc]);
205        return;
206      }
207      let doc;
208      if (count) { // count int response
209        doc = new JBDOC(jsondoc, jsondoc);
210      } else if (jsondoc != null) {
211        doc = new JBDOC(id, jsondoc);
212      }
213      if (doc != null && stream.push(doc) == false) {
214        stream._doPause();
215      }
216    }
217  }
218
219  if (id < 0) { // last record
220    if (!stream._aborted && stream._paused) {
221      stream.pending.push([]);
222    } else {
223      stream.push(null);
224      stream.destroy();
225    }
226  }
227}
228
229/**
230 * EJDB Query.
231 */
232class JQL {
233
234  get _impl() {
235    return this.db._impl;
236  }
237
238  /**
239   * Get `limit` value used by query.
240   */
241  get limit() {
242    return this._impl.jql_limit(this);
243  }
244
245  /**
246   * @param {EJDB2} db
247   * @param {string} query
248   * @param {string} collection
249   */
250  constructor(db, query, collection) {
251    this.db = db;
252    this.query = query;
253    this.collection = collection;
254    this._impl.jql_init(this, query, collection);
255  }
256
257  /**
258   * Executes a query and returns a
259   * readable stream of matched documents.
260   *
261   * @param {Object} [opts]
262   * @return {ReadableStream<JBDOC>}
263   */
264  stream(opts) {
265    return new JBDOCStream(this, opts || {});
266  }
267
268  /**
269   * Executes this query and waits its completion.
270   *
271   * @param {Promise} opts
272   */
273  completionPromise(opts) {
274    const stream = this.stream(opts || {});
275    return new Promise((resolve, reject) => {
276      stream.on('data', () => stream.destroy());
277      stream.on('close', () => resolve());
278      stream.on('error', (err) => reject(err));
279    });
280  }
281
282  /**
283   * Returns a scalar integer value as result of query execution.
284   * Eg.: A count query: `/... | count`
285   * @param {Object} [opts]
286   * @return {Promise<number>}
287   */
288  scalarInt(opts) {
289    const stream = this.stream(opts);
290    return new Promise((resolve, reject) => {
291      stream.on('data', (doc) => {
292        resolve(doc.id);
293        stream.destroy();
294      });
295      stream.on('error', (err) => reject(err));
296    });
297  }
298
299  /**
300   * Returns result set as a list.
301   * Use it with caution on large data sets.
302   *
303   * @param {Object} [opts]
304   * @return {Promise<Array<JBDOC>>}
305   */
306  list(opts) {
307    const ret = [];
308    const stream = this.stream(opts);
309    return new Promise((resolve, reject) => {
310      stream.on('data', (doc) => ret.push(doc));
311      stream.on('close', () => resolve(ret));
312      stream.on('error', (err) => reject(err));
313    });
314  }
315
316  /**
317   * Collects up to [n] documents from result set into array.
318   * @param {number} n
319   * @param {Object} [opts]
320   * @return {Promise<Array<JBDOC>>}
321   */
322  firstN(n, opts) {
323    opts = opts || {};
324    opts.limit = n;
325    const ret = [];
326    const stream = this.stream(opts);
327    return new Promise((resolve, reject) => {
328      stream.on('data', (doc) => {
329        ret.push(doc);
330        if (ret.length >= n) {
331          stream.destroy();
332        }
333      });
334      stream.on('close', () => resolve(ret));
335      stream.on('error', (err) => reject(err));
336    });
337  }
338
339  /**
340   * Returns a first record in result set.
341   * If record is not found promise with `undefined` will be returned.
342   *
343   * @param {Object} [opts]
344   * @return {Promise<JBDOC|undefined>}
345   */
346  async first(opts) {
347    const fv = await this.firstN(1, opts);
348    return fv[0];
349  }
350
351  /**
352   * Set [json] at the specified [placeholder].
353   * @param {string|number} placeholder
354   * @param {string|object} val
355   * @return {JQL}
356   */
357  setJSON(placeholder, val) {
358    this._checkPlaceholder(placeholder);
359    if (typeof val !== 'string') {
360      val = JSON.stringify(val);
361    }
362    this._impl.jql_set(this, placeholder, val, 1);
363    return this;
364  }
365
366  /**
367   * Set [regexp] string at the specified [placeholder].
368   * @param {string|number} placeholder
369   * @param {string|RegExp} val
370   * @return {JQL}
371   */
372  setRegexp(placeholder, val) {
373    this._checkPlaceholder(placeholder);
374    if (val instanceof RegExp) {
375      const sval = val.toString();
376      val = sval.substring(1, sval.lastIndexOf('/'));
377    } else if (typeof val !== 'string') {
378      throw new Error('Regexp argument must be a string or RegExp object');
379    }
380    this._impl.jql_set(this, placeholder, val, 2);
381    return this;
382  }
383
384  /**
385   * Set number [val] at the specified [placeholder].
386   * @param {string|number} placeholder
387   * @param {number} val
388   * @return {JQL}
389   */
390  setNumber(placeholder, val) {
391    this._checkPlaceholder(placeholder);
392    if (typeof val !== 'number') {
393      throw new Error('Value must be a number');
394    }
395    this._impl.jql_set(this, placeholder, val, this._isInteger(val) ? 3 : 4);
396    return this;
397  }
398
399  /**
400   * Set boolean [val] at the specified [placeholder].
401   * @param {string|number} placeholder
402   * @param {boolean} val
403   * @return {JQL}
404   */
405  setBoolean(placeholder, val) {
406    this._checkPlaceholder(placeholder);
407    this._impl.jql_set(this, placeholder, !!val, 5);
408    return this;
409  }
410
411  /**
412   * Set string [val] at the specified [placeholder].
413   * @param {string|number} placeholder
414   * @param {string} val
415   * @return {JQL}
416   */
417  setString(placeholder, val) {
418    this._checkPlaceholder(placeholder);
419    if (val != null && typeof val !== 'string') {
420      val = val.toString();
421    }
422    this._impl.jql_set(this, placeholder, val, 6);
423    return this;
424  }
425
426  /**
427   * Set `null` at the specified [placeholder].
428   * @param {string|number} placeholder
429   * @return {JQL}
430   */
431  setNull(placeholder) {
432    this._checkPlaceholder(placeholder);
433    this._impl.jql_set(this, placeholder, null, 7);
434    return this;
435  }
436
437  _isInteger(n) {
438    return n === +n && n === (n | 0);
439  }
440
441  _checkPlaceholder(placeholder) {
442    const t = typeof placeholder;
443    if (t !== 'number' && t !== 'string') {
444      throw new Error('Invalid placeholder specified, must be either string or number');
445    }
446  }
447}
448
449/**
450 * EJDB2 Nodejs wrapper.
451 */
452class EJDB2 {
453
454  /**
455   * Open database instance.
456   *
457   * @param {String} path Path to database
458   * @param {Object} [opts]
459   * @returns {Promise<EJDB2>} EJDB2 instance promise
460   */
461  static open(path, opts) {
462    opts = opts || {};
463
464    function toArgs() {
465      let oflags = 0;
466      const ret = [path];
467      if (opts['readonly']) {
468        oflags |= 0x02;
469      }
470      if (opts['truncate']) {
471        oflags |= 0x04;
472      }
473      ret.push(oflags);
474      ret.push(opts['wal_enabled'] != null ? !!opts['wal_enabled'] : true);
475      ret.push(opts['wal_check_crc_on_checkpoint']);
476      ret.push(opts['wal_checkpoint_buffer_sz']);
477      ret.push(opts['wal_checkpoint_timeout_sec']);
478      ret.push(opts['wal_savepoint_timeout_sec']);
479      ret.push(opts['wal_wal_buffer_sz']);
480      ret.push(opts['document_buffer_sz']);
481      ret.push(opts['sort_buffer_sz']);
482      ret.push(opts['http_enabled']);
483      ret.push(opts['http_access_token']);
484      ret.push(opts['http_bind']);
485      ret.push(opts['http_max_body_size']);
486      ret.push(opts['http_port']);
487      ret.push(opts['http_read_anon']);
488      return ret;
489    }
490
491    const inst = new EJDB2(toArgs());
492    return inst._impl.open().then(() => inst);
493  }
494
495  constructor(args) {
496    this._impl = new EJDB2Impl(args);
497  }
498
499  /**
500   * Closes database instance.
501   * @return {Promise<void>}
502   */
503  close() {
504    return this._impl.close();
505  }
506
507  /**
508   * Saves [json] document under specified [id] or create a document
509   * with new generated `id`. Returns promise holding actual document `id`.
510   *
511   * @param {String} collection
512   * @param {Object|string} json
513   * @param {number} [id]
514   * @returns {Promise<number>}
515   */
516  put(collection, json, id) {
517    if (typeof json !== 'string') {
518      json = JSON.stringify(json);
519    }
520    return this._impl.put(collection, json, id);
521  }
522
523  /**
524   * Apply rfc6902/rfc7386 JSON [patch] to the document identified by [id].
525   *
526   * @param {String} collection
527   * @param {Object|string} json
528   * @param {number} id
529   * @return {Promise<void>}
530   */
531  patch(collection, json, id) {
532    return this._impl.patch(collection, json, id);
533  }
534
535  /**
536   * Apply JSON merge patch (rfc7396) to the document identified by `id` or
537   * insert new document under specified `id`.
538   *
539   * @param {String} collection
540   * @param {Object|string} json
541   * @param {number} id
542   * @return {Promise<void>}
543   */
544  patchOrPut(collection, json, id) {
545    return this._impl.patch_or_put(collection, json, id);
546  }
547
548  /**
549   * Get json body of document identified by [id] and stored in [collection].
550   *
551   * @param {String} collection
552   * @param {number} id
553   * @return {Promise<object>} JSON object
554   */
555  get(collection, id) {
556    return this._impl.get(collection, id).then((raw) => JSON.parse(raw));
557  }
558
559  /**
560   * Get json body of document identified by [id] and stored in [collection].
561   * If document with given `id` is not found then `null` will be resoved.
562   *
563   * @param {string} collection
564   * @param {number} id
565   * @return {Promise<object|null>} JSON object
566   */
567  getOrNull(collection, id) {
568    return this.get(collection, id).catch((err) => {
569      if (JBE.isNotFound(err)) {
570        return null;
571      } else {
572        return Promise.reject(err);
573      }
574    });
575  }
576
577  /**
578   * Get json body with database metadata.
579   *
580   * @return {Promise<object>}
581   */
582  info() {
583    return this._impl.info().then((raw) => JSON.parse(raw));
584  }
585
586  /**
587   * Removes document idenfied by [id] from [collection].
588   *
589   * @param {String} collection
590   * @param {number} id
591   * @return {Promise<void>}
592   */
593  del(collection, id) {
594    return this._impl.del(collection, id);
595  }
596
597  /**
598   * Renames collection.
599   *
600   * @param {String} oldCollectionName Collection to be renamed
601   * @param {String} newCollectionName New name of collection
602   * @return {Promise<void>}
603   */
604  renameCollection(oldCollectionName, newCollectionName) {
605    return this._impl.rename_collection(oldCollectionName, newCollectionName);
606  }
607
608  /**
609   * Ensures json document database index specified by [path] json pointer to string data type.
610   *
611   * @param {String} collection
612   * @param {String} path
613   * @param {boolean} [unique=false]
614   * @return {Promise<void>}
615   */
616  ensureStringIndex(collection, path, unique) {
617    return this._impl.index(collection, path, 0x04 | (unique ? 0x01 : 0), false);
618  }
619
620  /**
621   * Removes specified database index.
622   *
623   * @param {String} collection
624   * @param {String} path
625   * @param {boolean} [unique=false]
626   * @return {Promise<void>}
627   */
628  removeStringIndex(collection, path, unique) {
629    return this._impl.index(collection, path, 0x04 | (unique ? 0x01 : 0), true);
630  }
631
632  /**
633   * Ensures json document database index specified by [path] json pointer to integer data type.
634   *
635   * @param {String} collection
636   * @param {String} path
637   * @param {boolean} [unique=false]
638   * @return {Promise<void>}
639   */
640  ensureIntIndex(collection, path, unique) {
641    return this._impl.index(collection, path, 0x08 | (unique ? 0x01 : 0), false);
642  }
643
644  /**
645   * Removes specified database index.
646   *
647   * @param {String} collection
648   * @param {String} path
649   * @param {boolean} [unique=false]
650   * @return {Promise<void>}
651   */
652  removeIntIndex(collection, path, unique) {
653    return this._impl.index(collection, path, 0x08 | (unique ? 0x01 : 0), true);
654  }
655
656  /**
657   * Ensures json document database index specified by [path] json pointer to floating point data type.
658   *
659   * @param {String} collection
660   * @param {String} path
661   * @param {boolean} [unique=false]
662   * @return {Promise<void>}
663   */
664  ensureFloatIndex(collection, path, unique) {
665    return this._impl.index(collection, path, 0x10 | (unique ? 0x01 : 0), false);
666  }
667
668  /**
669   * Removes specified database index.
670   *
671   * @param {String} collection
672   * @param {String} path
673   * @param {boolean} [unique=false]
674   * @return {Promise<void>}
675   */
676  removeFloatIndex(collection, path, unique) {
677    return this._impl.index(collection, path, 0x10 | (unique ? 0x01 : 0), true);
678  }
679
680  /**
681   * Removes database [collection].
682   *
683   * @param {String} collection
684   * @return {Promise<void>}
685   */
686  removeCollection(collection) {
687    return this._impl.rmcoll(collection);
688  }
689
690  /**
691   * Create instance of [query] specified for [collection].
692   * If [collection] is not specified a [query] spec must contain collection name,
693   * eg: `@mycollection/[foo=bar]`
694   *
695   * @param {String} query
696   * @param {String} [collection]
697   * @returns {JQL}
698   */
699  createQuery(query, collection) {
700    return new JQL(this, query, collection);
701  }
702
703  /**
704   * Creates an online database backup image and copies it into the specified [fileName].
705   * During online backup phase read/write database operations are allowed and not
706   * blocked for significant amount of time. Returns promise with backup
707   * finish time as number of milliseconds since epoch.
708   *
709   * @param {String} fileName Backup image file path.
710   * @returns {Promise<number>}
711   */
712  onlineBackup(fileName) {
713    return this._impl.online_backup(fileName);
714  }
715}
716
717module.exports = {
718  EJDB2,
719  JBE
720};
721
722
723