1/************************************************************************************************** 2 * EJDB2 Node.js native API binding. 3 * 4 * MIT License 5 * 6 * Copyright (c) 2012-2021 Softmotions Ltd <info@softmotions.com> 7 * 8 * Permission is hereby granted, free of charge, to any person obtaining a copy 9 * of this software and associated documentation files (the "Software"), to deal 10 * in the Software without restriction, including without limitation the rights 11 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 12 * copies of the Software, and to permit persons to whom the Software is 13 * furnished to do so, subject to the following conditions: 14 * 15 * The above copyright notice and this permission notice shall be included in all 16 * copies or substantial portions of the Software. 17 * 18 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 19 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 20 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 21 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 22 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 23 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 24 * SOFTWARE. 25 *************************************************************************************************/ 26 27const semver = require('semver'); 28const {engines} = require('./package'); 29 30if (!semver.satisfies(process.version, engines.node)) { 31 console.log(`Required node version ${engines.node} not satisfied with current version ${process.version}.`); 32 process.exit(1); 33} 34 35global.__ejdb_add_stream_result__ = addStreamResult; // Passing it to ejdb2_node init 36const {EJDB2Impl} = require('./binary')('ejdb2_node'); 37const {Readable} = require('stream'); 38delete global.__ejdb_add_stream_result__; 39 40/** 41 * EJDB2 Error helpers. 42 */ 43class JBE { 44 45 /** 46 * Returns `true` if given error [err] is `IWKV_ERROR_NOTFOUND` 47 * @param {Error} err 48 * @returns {boolean} 49 */ 50 static isNotFound(err) { 51 const code = (err.code || '').toString(); 52 return code.indexOf('@ejdb IWRC:75001') == 0; 53 } 54 55 /** 56 * Returns `true` if given errror [err] is `JQL_ERROR_QUERY_PARSE` 57 * @param {Error} err 58 * @return {boolean} 59 */ 60 static isInvalidQuery(err) { 61 const code = (err.code || '').toString(); 62 return code.indexOf('@ejdb IWRC:87001') == 0; 63 } 64} 65 66/** 67 * EJDB document. 68 */ 69class JBDOC { 70 71 /** 72 * Get document JSON object 73 */ 74 get json() { 75 if (this._json != null) { 76 return this._json; 77 } 78 this._json = JSON.parse(this._raw); 79 this._raw = null; 80 return this._json; 81 } 82 83 /** 84 * @param {number} id Document ID 85 * @param {string} raw Document JSON as string 86 */ 87 constructor(id, raw) { 88 this.id = id; 89 this._raw = raw; 90 this._json = null; 91 } 92 93 toString() { 94 return `JBDOC: ${this.id} ${this._raw != null ? this._raw : JSON.stringify(this.json)}`; 95 } 96} 97 98/** 99 * EJDB Query resultset stream. 100 */ 101class JBDOCStream extends Readable { 102 103 get _impl() { 104 return this.jql._impl; 105 } 106 107 get writable() { 108 return !this._paused && !this._destroyed; 109 } 110 111 /** 112 * Query result stream. 113 * @param {JQL} jql 114 */ 115 constructor(jql, opts) { 116 super({ 117 objectMode: true, 118 highWaterMark: 64 119 }); 120 this._pending = []; 121 this._paused = true; 122 this._destroyed = false; 123 this._aborted = false; 124 this.jql = jql; 125 this.opts = opts; 126 this.promise = this._impl.jql_stream_attach(jql, this, [opts.limit, opts.explainCallback]) 127 .catch((err) => this.destroy(err)); 128 } 129 130 abort() { 131 // unpause if paused 132 // needed to release frozen on pause db query thread 133 this._doResume(); 134 // set internal abort state 135 this._impl.jql_stream_abort(this); 136 this._aborted = true; 137 } 138 139 /** 140 * Destroy stream 141 */ 142 _destroy(err, callback) { 143 if (!this._destroyed) { 144 this.abort(); 145 this._destroyed = true; 146 setImmediate(() => this._impl.jql_stream_destroy(this)); 147 } 148 callback(err); 149 } 150 151 _read() { 152 if (this._destroyed) { 153 return; 154 } 155 this._doResume(); 156 if (this._pending.length > 0) { 157 // Flush pending records to consumer 158 // Maintaining pending list since `napi_threadsafe_function` queue 159 // may be not empty at the time of `jql_stream_pause` call 160 let pv = this._pending.shift(); 161 while (this.writable && pv) { 162 if (pv.length == 0) { // Got pending EOF 163 this._pending.length = 0; 164 this.destroy(); 165 break; 166 } 167 addStreamResult(this, pv[0], pv[1]); 168 pv = this._pending.shift(); 169 } 170 } 171 } 172 173 _doResume() { 174 if (this._paused) { 175 this._impl.jql_stream_resume(this); 176 this._paused = false; 177 } 178 } 179 180 _doPause() { 181 if (!this._paused) { 182 this._impl.jql_stream_pause(this); 183 this._paused = true; 184 } 185 } 186} 187 188// Global module function for add results to query stream 189function addStreamResult(stream, id, jsondoc, log) { 190 if (stream._destroyed) { 191 return; 192 } 193 if (log != null && stream.opts.explainCallback != null) { 194 stream.opts.explainCallback(log); 195 delete stream.opts.explainCallback; 196 } 197 198 const count = (typeof jsondoc === 'number'); 199 if (id >= 0 || count) { 200 if (!stream._aborted) { 201 if (stream._paused) { 202 // Maintaining pending list since `napi_threadsafe_function` queue 203 // may be not empty at the time of `jql_stream_pause` call 204 stream.pending.push([id, jsondoc]); 205 return; 206 } 207 let doc; 208 if (count) { // count int response 209 doc = new JBDOC(jsondoc, jsondoc); 210 } else if (jsondoc != null) { 211 doc = new JBDOC(id, jsondoc); 212 } 213 if (doc != null && stream.push(doc) == false) { 214 stream._doPause(); 215 } 216 } 217 } 218 219 if (id < 0) { // last record 220 if (!stream._aborted && stream._paused) { 221 stream.pending.push([]); 222 } else { 223 stream.destroy(); 224 } 225 } 226} 227 228/** 229 * EJDB Query. 230 */ 231class JQL { 232 233 get _impl() { 234 return this.db._impl; 235 } 236 237 /** 238 * Get `limit` value used by query. 239 */ 240 get limit() { 241 return this._impl.jql_limit(this); 242 } 243 244 /** 245 * @param {EJDB2} db 246 * @param {string} query 247 * @param {string} collection 248 */ 249 constructor(db, query, collection) { 250 this.db = db; 251 this.query = query; 252 this.collection = collection; 253 this._impl.jql_init(this, query, collection); 254 } 255 256 /** 257 * Executes a query and returns a 258 * readable stream of matched documents. 259 * 260 * @param {Object} [opts] 261 * @return {ReadableStream<JBDOC>} 262 */ 263 stream(opts) { 264 return new JBDOCStream(this, opts || {}); 265 } 266 267 /** 268 * Executes this query and waits its completion. 269 * 270 * @param {Promise} opts 271 */ 272 completionPromise(opts) { 273 const stream = this.stream(opts || {}); 274 return new Promise((resolve, reject) => { 275 stream.on('data', () => stream.destroy()); 276 stream.on('close', () => resolve()); 277 stream.on('error', (err) => reject(err)); 278 }); 279 } 280 281 /** 282 * Returns a scalar integer value as result of query execution. 283 * Eg.: A count query: `/... | count` 284 * @param {Object} [opts] 285 * @return {Promise<number>} 286 */ 287 scalarInt(opts) { 288 const stream = this.stream(opts); 289 return new Promise((resolve, reject) => { 290 stream.on('data', (doc) => { 291 resolve(doc.id); 292 stream.destroy(); 293 }); 294 stream.on('error', (err) => reject(err)); 295 }); 296 } 297 298 /** 299 * Returns result set as a list. 300 * Use it with caution on large data sets. 301 * 302 * @param {Object} [opts] 303 * @return {Promise<Array<JBDOC>>} 304 */ 305 list(opts) { 306 const ret = []; 307 const stream = this.stream(opts); 308 return new Promise((resolve, reject) => { 309 stream.on('data', (doc) => ret.push(doc)); 310 stream.on('close', () => resolve(ret)); 311 stream.on('error', (err) => reject(err)); 312 }); 313 } 314 315 /** 316 * Collects up to [n] documents from result set into array. 317 * @param {number} n 318 * @param {Object} [opts] 319 * @return {Promise<Array<JBDOC>>} 320 */ 321 firstN(n, opts) { 322 opts = opts || {}; 323 opts.limit = n; 324 const ret = []; 325 const stream = this.stream(opts); 326 return new Promise((resolve, reject) => { 327 stream.on('data', (doc) => { 328 ret.push(doc); 329 if (ret.length >= n) { 330 stream.destroy(); 331 } 332 }); 333 stream.on('close', () => resolve(ret)); 334 stream.on('error', (err) => reject(err)); 335 }); 336 } 337 338 /** 339 * Returns a first record in result set. 340 * If record is not found promise with `undefined` will be returned. 341 * 342 * @param {Object} [opts] 343 * @return {Promise<JBDOC|undefined>} 344 */ 345 async first(opts) { 346 const fv = await this.firstN(1, opts); 347 return fv[0]; 348 } 349 350 /** 351 * Set [json] at the specified [placeholder]. 352 * @param {string|number} placeholder 353 * @param {string|object} val 354 * @return {JQL} 355 */ 356 setJSON(placeholder, val) { 357 this._checkPlaceholder(placeholder); 358 if (typeof val !== 'string') { 359 val = JSON.stringify(val); 360 } 361 this._impl.jql_set(this, placeholder, val, 1); 362 return this; 363 } 364 365 /** 366 * Set [regexp] string at the specified [placeholder]. 367 * @param {string|number} placeholder 368 * @param {string|RegExp} val 369 * @return {JQL} 370 */ 371 setRegexp(placeholder, val) { 372 this._checkPlaceholder(placeholder); 373 if (val instanceof RegExp) { 374 const sval = val.toString(); 375 val = sval.substring(1, sval.lastIndexOf('/')); 376 } else if (typeof val !== 'string') { 377 throw new Error('Regexp argument must be a string or RegExp object'); 378 } 379 this._impl.jql_set(this, placeholder, val, 2); 380 return this; 381 } 382 383 /** 384 * Set number [val] at the specified [placeholder]. 385 * @param {string|number} placeholder 386 * @param {number} val 387 * @return {JQL} 388 */ 389 setNumber(placeholder, val) { 390 this._checkPlaceholder(placeholder); 391 if (typeof val !== 'number') { 392 throw new Error('Value must be a number'); 393 } 394 this._impl.jql_set(this, placeholder, val, this._isInteger(val) ? 3 : 4); 395 return this; 396 } 397 398 /** 399 * Set boolean [val] at the specified [placeholder]. 400 * @param {string|number} placeholder 401 * @param {boolean} val 402 * @return {JQL} 403 */ 404 setBoolean(placeholder, val) { 405 this._checkPlaceholder(placeholder); 406 this._impl.jql_set(this, placeholder, !!val, 5); 407 return this; 408 } 409 410 /** 411 * Set string [val] at the specified [placeholder]. 412 * @param {string|number} placeholder 413 * @param {string} val 414 * @return {JQL} 415 */ 416 setString(placeholder, val) { 417 this._checkPlaceholder(placeholder); 418 if (val != null && typeof val !== 'string') { 419 val = val.toString(); 420 } 421 this._impl.jql_set(this, placeholder, val, 6); 422 return this; 423 } 424 425 /** 426 * Set `null` at the specified [placeholder]. 427 * @param {string|number} placeholder 428 * @return {JQL} 429 */ 430 setNull(placeholder) { 431 this._checkPlaceholder(placeholder); 432 this._impl.jql_set(this, placeholder, null, 7); 433 return this; 434 } 435 436 _isInteger(n) { 437 return n === +n && n === (n | 0); 438 } 439 440 _checkPlaceholder(placeholder) { 441 const t = typeof placeholder; 442 if (t !== 'number' && t !== 'string') { 443 throw new Error('Invalid placeholder specified, must be either string or number'); 444 } 445 } 446} 447 448/** 449 * EJDB2 Nodejs wrapper. 450 */ 451class EJDB2 { 452 453 /** 454 * Open database instance. 455 * 456 * @param {String} path Path to database 457 * @param {Object} [opts] 458 * @returns {Promise<EJDB2>} EJDB2 instance promise 459 */ 460 static open(path, opts) { 461 opts = opts || {}; 462 463 function toArgs() { 464 let oflags = 0; 465 const ret = [path]; 466 if (opts['readonly']) { 467 oflags |= 0x02; 468 } 469 if (opts['truncate']) { 470 oflags |= 0x04; 471 } 472 ret.push(oflags); 473 ret.push(opts['wal_enabled'] != null ? !!opts['wal_enabled'] : true); 474 ret.push(opts['wal_check_crc_on_checkpoint']); 475 ret.push(opts['wal_checkpoint_buffer_sz']); 476 ret.push(opts['wal_checkpoint_timeout_sec']); 477 ret.push(opts['wal_savepoint_timeout_sec']); 478 ret.push(opts['wal_wal_buffer_sz']); 479 ret.push(opts['document_buffer_sz']); 480 ret.push(opts['sort_buffer_sz']); 481 ret.push(opts['http_enabled']); 482 ret.push(opts['http_access_token']); 483 ret.push(opts['http_bind']); 484 ret.push(opts['http_max_body_size']); 485 ret.push(opts['http_port']); 486 ret.push(opts['http_read_anon']); 487 return ret; 488 } 489 490 const inst = new EJDB2(toArgs()); 491 return inst._impl.open().then(() => inst); 492 } 493 494 constructor(args) { 495 this._impl = new EJDB2Impl(args); 496 } 497 498 /** 499 * Closes database instance. 500 * @return {Promise<void>} 501 */ 502 close() { 503 return this._impl.close(); 504 } 505 506 /** 507 * Saves [json] document under specified [id] or create a document 508 * with new generated `id`. Returns promise holding actual document `id`. 509 * 510 * @param {String} collection 511 * @param {Object|string} json 512 * @param {number} [id] 513 * @returns {Promise<number>} 514 */ 515 put(collection, json, id) { 516 if (typeof json !== 'string') { 517 json = JSON.stringify(json); 518 } 519 return this._impl.put(collection, json, id); 520 } 521 522 /** 523 * Apply rfc6902/rfc7386 JSON [patch] to the document identified by [id]. 524 * 525 * @param {String} collection 526 * @param {Object|string} json 527 * @param {number} id 528 * @return {Promise<void>} 529 */ 530 patch(collection, json, id) { 531 return this._impl.patch(collection, json, id); 532 } 533 534 /** 535 * Apply JSON merge patch (rfc7396) to the document identified by `id` or 536 * insert new document under specified `id`. 537 * 538 * @param {String} collection 539 * @param {Object|string} json 540 * @param {number} id 541 * @return {Promise<void>} 542 */ 543 patchOrPut(collection, json, id) { 544 return this._impl.patch_or_put(collection, json, id); 545 } 546 547 /** 548 * Get json body of document identified by [id] and stored in [collection]. 549 * 550 * @param {String} collection 551 * @param {number} id 552 * @return {Promise<object>} JSON object 553 */ 554 get(collection, id) { 555 return this._impl.get(collection, id).then((raw) => JSON.parse(raw)); 556 } 557 558 /** 559 * Get json body of document identified by [id] and stored in [collection]. 560 * If document with given `id` is not found then `null` will be resoved. 561 * 562 * @param {string} collection 563 * @param {number} id 564 * @return {Promise<object|null>} JSON object 565 */ 566 getOrNull(collection, id) { 567 return this.get(collection, id).catch((err) => { 568 if (JBE.isNotFound(err)) { 569 return null; 570 } else { 571 return Promise.reject(err); 572 } 573 }); 574 } 575 576 /** 577 * Get json body with database metadata. 578 * 579 * @return {Promise<object>} 580 */ 581 info() { 582 return this._impl.info().then((raw) => JSON.parse(raw)); 583 } 584 585 /** 586 * Removes document idenfied by [id] from [collection]. 587 * 588 * @param {String} collection 589 * @param {number} id 590 * @return {Promise<void>} 591 */ 592 del(collection, id) { 593 return this._impl.del(collection, id); 594 } 595 596 /** 597 * Renames collection. 598 * 599 * @param {String} oldCollectionName Collection to be renamed 600 * @param {String} newCollectionName New name of collection 601 * @return {Promise<void>} 602 */ 603 renameCollection(oldCollectionName, newCollectionName) { 604 return this._impl.rename_collection(oldCollectionName, newCollectionName); 605 } 606 607 /** 608 * Ensures json document database index specified by [path] json pointer to string data type. 609 * 610 * @param {String} collection 611 * @param {String} path 612 * @param {boolean} [unique=false] 613 * @return {Promise<void>} 614 */ 615 ensureStringIndex(collection, path, unique) { 616 return this._impl.index(collection, path, 0x04 | (unique ? 0x01 : 0), false); 617 } 618 619 /** 620 * Removes specified database index. 621 * 622 * @param {String} collection 623 * @param {String} path 624 * @param {boolean} [unique=false] 625 * @return {Promise<void>} 626 */ 627 removeStringIndex(collection, path, unique) { 628 return this._impl.index(collection, path, 0x04 | (unique ? 0x01 : 0), true); 629 } 630 631 /** 632 * Ensures json document database index specified by [path] json pointer to integer data type. 633 * 634 * @param {String} collection 635 * @param {String} path 636 * @param {boolean} [unique=false] 637 * @return {Promise<void>} 638 */ 639 ensureIntIndex(collection, path, unique) { 640 return this._impl.index(collection, path, 0x08 | (unique ? 0x01 : 0), false); 641 } 642 643 /** 644 * Removes specified database index. 645 * 646 * @param {String} collection 647 * @param {String} path 648 * @param {boolean} [unique=false] 649 * @return {Promise<void>} 650 */ 651 removeIntIndex(collection, path, unique) { 652 return this._impl.index(collection, path, 0x08 | (unique ? 0x01 : 0), true); 653 } 654 655 /** 656 * Ensures json document database index specified by [path] json pointer to floating point data type. 657 * 658 * @param {String} collection 659 * @param {String} path 660 * @param {boolean} [unique=false] 661 * @return {Promise<void>} 662 */ 663 ensureFloatIndex(collection, path, unique) { 664 return this._impl.index(collection, path, 0x10 | (unique ? 0x01 : 0), false); 665 } 666 667 /** 668 * Removes specified database index. 669 * 670 * @param {String} collection 671 * @param {String} path 672 * @param {boolean} [unique=false] 673 * @return {Promise<void>} 674 */ 675 removeFloatIndex(collection, path, unique) { 676 return this._impl.index(collection, path, 0x10 | (unique ? 0x01 : 0), true); 677 } 678 679 /** 680 * Removes database [collection]. 681 * 682 * @param {String} collection 683 * @return {Promise<void>} 684 */ 685 removeCollection(collection) { 686 return this._impl.rmcoll(collection); 687 } 688 689 /** 690 * Create instance of [query] specified for [collection]. 691 * If [collection] is not specified a [query] spec must contain collection name, 692 * eg: `@mycollection/[foo=bar]` 693 * 694 * @param {String} query 695 * @param {String} [collection] 696 * @returns {JQL} 697 */ 698 createQuery(query, collection) { 699 return new JQL(this, query, collection); 700 } 701 702 /** 703 * Creates an online database backup image and copies it into the specified [fileName]. 704 * During online backup phase read/write database operations are allowed and not 705 * blocked for significant amount of time. Returns promise with backup 706 * finish time as number of milliseconds since epoch. 707 * 708 * @param {String} fileName Backup image file path. 709 * @returns {Promise<number>} 710 */ 711 onlineBackup(fileName) { 712 return this._impl.online_backup(fileName); 713 } 714} 715 716module.exports = { 717 EJDB2, 718 JBE 719}; 720 721 722