1/************************************************************************************************** 2 * EJDB2 Node.js native API binding. 3 * 4 * MIT License 5 * 6 * Copyright (c) 2012-2022 Softmotions Ltd <info@softmotions.com> 7 * 8 * Permission is hereby granted, free of charge, to any person obtaining a copy 9 * of this software and associated documentation files (the "Software"), to deal 10 * in the Software without restriction, including without limitation the rights 11 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 12 * copies of the Software, and to permit persons to whom the Software is 13 * furnished to do so, subject to the following conditions: 14 * 15 * The above copyright notice and this permission notice shall be included in all 16 * copies or substantial portions of the Software. 17 * 18 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 19 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 20 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 21 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 22 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 23 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 24 * SOFTWARE. 25 *************************************************************************************************/ 26 27const semver = require('semver'); 28const {engines} = require('./package'); 29 30if (!semver.satisfies(process.version, engines.node)) { 31 console.log(`Required node version ${engines.node} not satisfied with current version ${process.version}.`); 32 process.exit(1); 33} 34 35global.__ejdb_add_stream_result__ = addStreamResult; // Passing it to ejdb2_node init 36const {EJDB2Impl} = require('./binary')('ejdb2_node'); 37const {Readable} = require('stream'); 38delete global.__ejdb_add_stream_result__; 39 40/** 41 * EJDB2 Error helpers. 42 */ 43class JBE { 44 45 /** 46 * Returns `true` if given error [err] is `IWKV_ERROR_NOTFOUND` 47 * @param {Error} err 48 * @returns {boolean} 49 */ 50 static isNotFound(err) { 51 const code = (err.code || '').toString(); 52 return code.indexOf('@ejdb IWRC:75001') == 0; 53 } 54 55 /** 56 * Returns `true` if given errror [err] is `JQL_ERROR_QUERY_PARSE` 57 * @param {Error} err 58 * @return {boolean} 59 */ 60 static isInvalidQuery(err) { 61 const code = (err.code || '').toString(); 62 return code.indexOf('@ejdb IWRC:87001') == 0; 63 } 64} 65 66/** 67 * EJDB document. 68 */ 69class JBDOC { 70 71 /** 72 * Get document JSON object 73 */ 74 get json() { 75 if (this._json != null) { 76 return this._json; 77 } 78 this._json = JSON.parse(this._raw); 79 this._raw = null; 80 return this._json; 81 } 82 83 /** 84 * @param {number} id Document ID 85 * @param {string} raw Document JSON as string 86 */ 87 constructor(id, raw) { 88 this.id = id; 89 this._raw = raw; 90 this._json = null; 91 } 92 93 toString() { 94 return `JBDOC: ${this.id} ${this._raw != null ? this._raw : JSON.stringify(this.json)}`; 95 } 96} 97 98/** 99 * EJDB Query resultset stream. 100 */ 101class JBDOCStream extends Readable { 102 103 get _impl() { 104 return this.jql._impl; 105 } 106 107 get writable() { 108 return !this._paused && !this._destroyed; 109 } 110 111 /** 112 * Query result stream. 113 * @param {JQL} jql 114 */ 115 constructor(jql, opts) { 116 super({ 117 objectMode: true, 118 highWaterMark: 64 119 }); 120 this._pending = []; 121 this._paused = true; 122 this._destroyed = false; 123 this._aborted = false; 124 this.jql = jql; 125 this.opts = opts; 126 this.promise = this._impl.jql_stream_attach(jql, this, [opts.limit, opts.explainCallback]) 127 .catch((err) => this.destroy(err)); 128 } 129 130 abort() { 131 // unpause if paused 132 // needed to release frozen on pause db query thread 133 this._doResume(); 134 // set internal abort state 135 this._impl.jql_stream_abort(this); 136 this._aborted = true; 137 } 138 139 /** 140 * Destroy stream 141 */ 142 _destroy(err, callback) { 143 if (!this._destroyed) { 144 this.abort(); 145 this._destroyed = true; 146 setImmediate(() => this._impl.jql_stream_destroy(this)); 147 } 148 callback(err); 149 } 150 151 _read() { 152 if (this._destroyed) { 153 return; 154 } 155 this._doResume(); 156 if (this._pending.length > 0) { 157 // Flush pending records to consumer 158 // Maintaining pending list since `napi_threadsafe_function` queue 159 // may be not empty at the time of `jql_stream_pause` call 160 let pv = this._pending.shift(); 161 while (this.writable && pv) { 162 if (pv.length == 0) { // Got pending EOF 163 this._pending.length = 0; 164 this.destroy(); 165 break; 166 } 167 addStreamResult(this, pv[0], pv[1]); 168 pv = this._pending.shift(); 169 } 170 } 171 } 172 173 _doResume() { 174 if (this._paused) { 175 this._impl.jql_stream_resume(this); 176 this._paused = false; 177 } 178 } 179 180 _doPause() { 181 if (!this._paused) { 182 this._impl.jql_stream_pause(this); 183 this._paused = true; 184 } 185 } 186} 187 188// Global module function for add results to query stream 189function addStreamResult(stream, id, jsondoc, log) { 190 if (stream._destroyed) { 191 return; 192 } 193 if (log != null && stream.opts.explainCallback != null) { 194 stream.opts.explainCallback(log); 195 delete stream.opts.explainCallback; 196 } 197 198 const count = (typeof jsondoc === 'number'); 199 if (id >= 0 || count) { 200 if (!stream._aborted) { 201 if (stream._paused) { 202 // Maintaining pending list since `napi_threadsafe_function` queue 203 // may be not empty at the time of `jql_stream_pause` call 204 stream.pending.push([id, jsondoc]); 205 return; 206 } 207 let doc; 208 if (count) { // count int response 209 doc = new JBDOC(jsondoc, jsondoc); 210 } else if (jsondoc != null) { 211 doc = new JBDOC(id, jsondoc); 212 } 213 if (doc != null && stream.push(doc) == false) { 214 stream._doPause(); 215 } 216 } 217 } 218 219 if (id < 0) { // last record 220 if (!stream._aborted && stream._paused) { 221 stream.pending.push([]); 222 } else { 223 stream.push(null); 224 stream.destroy(); 225 } 226 } 227} 228 229/** 230 * EJDB Query. 231 */ 232class JQL { 233 234 get _impl() { 235 return this.db._impl; 236 } 237 238 /** 239 * Get `limit` value used by query. 240 */ 241 get limit() { 242 return this._impl.jql_limit(this); 243 } 244 245 /** 246 * @param {EJDB2} db 247 * @param {string} query 248 * @param {string} collection 249 */ 250 constructor(db, query, collection) { 251 this.db = db; 252 this.query = query; 253 this.collection = collection; 254 this._impl.jql_init(this, query, collection); 255 } 256 257 /** 258 * Executes a query and returns a 259 * readable stream of matched documents. 260 * 261 * @param {Object} [opts] 262 * @return {ReadableStream<JBDOC>} 263 */ 264 stream(opts) { 265 return new JBDOCStream(this, opts || {}); 266 } 267 268 /** 269 * Executes this query and waits its completion. 270 * 271 * @param {Promise} opts 272 */ 273 completionPromise(opts) { 274 const stream = this.stream(opts || {}); 275 return new Promise((resolve, reject) => { 276 stream.on('data', () => stream.destroy()); 277 stream.on('close', () => resolve()); 278 stream.on('error', (err) => reject(err)); 279 }); 280 } 281 282 /** 283 * Returns a scalar integer value as result of query execution. 284 * Eg.: A count query: `/... | count` 285 * @param {Object} [opts] 286 * @return {Promise<number>} 287 */ 288 scalarInt(opts) { 289 const stream = this.stream(opts); 290 return new Promise((resolve, reject) => { 291 stream.on('data', (doc) => { 292 resolve(doc.id); 293 stream.destroy(); 294 }); 295 stream.on('error', (err) => reject(err)); 296 }); 297 } 298 299 /** 300 * Returns result set as a list. 301 * Use it with caution on large data sets. 302 * 303 * @param {Object} [opts] 304 * @return {Promise<Array<JBDOC>>} 305 */ 306 list(opts) { 307 const ret = []; 308 const stream = this.stream(opts); 309 return new Promise((resolve, reject) => { 310 stream.on('data', (doc) => ret.push(doc)); 311 stream.on('close', () => resolve(ret)); 312 stream.on('error', (err) => reject(err)); 313 }); 314 } 315 316 /** 317 * Collects up to [n] documents from result set into array. 318 * @param {number} n 319 * @param {Object} [opts] 320 * @return {Promise<Array<JBDOC>>} 321 */ 322 firstN(n, opts) { 323 opts = opts || {}; 324 opts.limit = n; 325 const ret = []; 326 const stream = this.stream(opts); 327 return new Promise((resolve, reject) => { 328 stream.on('data', (doc) => { 329 ret.push(doc); 330 if (ret.length >= n) { 331 stream.destroy(); 332 } 333 }); 334 stream.on('close', () => resolve(ret)); 335 stream.on('error', (err) => reject(err)); 336 }); 337 } 338 339 /** 340 * Returns a first record in result set. 341 * If record is not found promise with `undefined` will be returned. 342 * 343 * @param {Object} [opts] 344 * @return {Promise<JBDOC|undefined>} 345 */ 346 async first(opts) { 347 const fv = await this.firstN(1, opts); 348 return fv[0]; 349 } 350 351 /** 352 * Set [json] at the specified [placeholder]. 353 * @param {string|number} placeholder 354 * @param {string|object} val 355 * @return {JQL} 356 */ 357 setJSON(placeholder, val) { 358 this._checkPlaceholder(placeholder); 359 if (typeof val !== 'string') { 360 val = JSON.stringify(val); 361 } 362 this._impl.jql_set(this, placeholder, val, 1); 363 return this; 364 } 365 366 /** 367 * Set [regexp] string at the specified [placeholder]. 368 * @param {string|number} placeholder 369 * @param {string|RegExp} val 370 * @return {JQL} 371 */ 372 setRegexp(placeholder, val) { 373 this._checkPlaceholder(placeholder); 374 if (val instanceof RegExp) { 375 const sval = val.toString(); 376 val = sval.substring(1, sval.lastIndexOf('/')); 377 } else if (typeof val !== 'string') { 378 throw new Error('Regexp argument must be a string or RegExp object'); 379 } 380 this._impl.jql_set(this, placeholder, val, 2); 381 return this; 382 } 383 384 /** 385 * Set number [val] at the specified [placeholder]. 386 * @param {string|number} placeholder 387 * @param {number} val 388 * @return {JQL} 389 */ 390 setNumber(placeholder, val) { 391 this._checkPlaceholder(placeholder); 392 if (typeof val !== 'number') { 393 throw new Error('Value must be a number'); 394 } 395 this._impl.jql_set(this, placeholder, val, this._isInteger(val) ? 3 : 4); 396 return this; 397 } 398 399 /** 400 * Set boolean [val] at the specified [placeholder]. 401 * @param {string|number} placeholder 402 * @param {boolean} val 403 * @return {JQL} 404 */ 405 setBoolean(placeholder, val) { 406 this._checkPlaceholder(placeholder); 407 this._impl.jql_set(this, placeholder, !!val, 5); 408 return this; 409 } 410 411 /** 412 * Set string [val] at the specified [placeholder]. 413 * @param {string|number} placeholder 414 * @param {string} val 415 * @return {JQL} 416 */ 417 setString(placeholder, val) { 418 this._checkPlaceholder(placeholder); 419 if (val != null && typeof val !== 'string') { 420 val = val.toString(); 421 } 422 this._impl.jql_set(this, placeholder, val, 6); 423 return this; 424 } 425 426 /** 427 * Set `null` at the specified [placeholder]. 428 * @param {string|number} placeholder 429 * @return {JQL} 430 */ 431 setNull(placeholder) { 432 this._checkPlaceholder(placeholder); 433 this._impl.jql_set(this, placeholder, null, 7); 434 return this; 435 } 436 437 _isInteger(n) { 438 return n === +n && n === (n | 0); 439 } 440 441 _checkPlaceholder(placeholder) { 442 const t = typeof placeholder; 443 if (t !== 'number' && t !== 'string') { 444 throw new Error('Invalid placeholder specified, must be either string or number'); 445 } 446 } 447} 448 449/** 450 * EJDB2 Nodejs wrapper. 451 */ 452class EJDB2 { 453 454 /** 455 * Open database instance. 456 * 457 * @param {String} path Path to database 458 * @param {Object} [opts] 459 * @returns {Promise<EJDB2>} EJDB2 instance promise 460 */ 461 static open(path, opts) { 462 opts = opts || {}; 463 464 function toArgs() { 465 let oflags = 0; 466 const ret = [path]; 467 if (opts['readonly']) { 468 oflags |= 0x02; 469 } 470 if (opts['truncate']) { 471 oflags |= 0x04; 472 } 473 ret.push(oflags); 474 ret.push(opts['wal_enabled'] != null ? !!opts['wal_enabled'] : true); 475 ret.push(opts['wal_check_crc_on_checkpoint']); 476 ret.push(opts['wal_checkpoint_buffer_sz']); 477 ret.push(opts['wal_checkpoint_timeout_sec']); 478 ret.push(opts['wal_savepoint_timeout_sec']); 479 ret.push(opts['wal_wal_buffer_sz']); 480 ret.push(opts['document_buffer_sz']); 481 ret.push(opts['sort_buffer_sz']); 482 ret.push(opts['http_enabled']); 483 ret.push(opts['http_access_token']); 484 ret.push(opts['http_bind']); 485 ret.push(opts['http_max_body_size']); 486 ret.push(opts['http_port']); 487 ret.push(opts['http_read_anon']); 488 return ret; 489 } 490 491 const inst = new EJDB2(toArgs()); 492 return inst._impl.open().then(() => inst); 493 } 494 495 constructor(args) { 496 this._impl = new EJDB2Impl(args); 497 } 498 499 /** 500 * Closes database instance. 501 * @return {Promise<void>} 502 */ 503 close() { 504 return this._impl.close(); 505 } 506 507 /** 508 * Saves [json] document under specified [id] or create a document 509 * with new generated `id`. Returns promise holding actual document `id`. 510 * 511 * @param {String} collection 512 * @param {Object|string} json 513 * @param {number} [id] 514 * @returns {Promise<number>} 515 */ 516 put(collection, json, id) { 517 if (typeof json !== 'string') { 518 json = JSON.stringify(json); 519 } 520 return this._impl.put(collection, json, id); 521 } 522 523 /** 524 * Apply rfc6902/rfc7386 JSON [patch] to the document identified by [id]. 525 * 526 * @param {String} collection 527 * @param {Object|string} json 528 * @param {number} id 529 * @return {Promise<void>} 530 */ 531 patch(collection, json, id) { 532 return this._impl.patch(collection, json, id); 533 } 534 535 /** 536 * Apply JSON merge patch (rfc7396) to the document identified by `id` or 537 * insert new document under specified `id`. 538 * 539 * @param {String} collection 540 * @param {Object|string} json 541 * @param {number} id 542 * @return {Promise<void>} 543 */ 544 patchOrPut(collection, json, id) { 545 return this._impl.patch_or_put(collection, json, id); 546 } 547 548 /** 549 * Get json body of document identified by [id] and stored in [collection]. 550 * 551 * @param {String} collection 552 * @param {number} id 553 * @return {Promise<object>} JSON object 554 */ 555 get(collection, id) { 556 return this._impl.get(collection, id).then((raw) => JSON.parse(raw)); 557 } 558 559 /** 560 * Get json body of document identified by [id] and stored in [collection]. 561 * If document with given `id` is not found then `null` will be resoved. 562 * 563 * @param {string} collection 564 * @param {number} id 565 * @return {Promise<object|null>} JSON object 566 */ 567 getOrNull(collection, id) { 568 return this.get(collection, id).catch((err) => { 569 if (JBE.isNotFound(err)) { 570 return null; 571 } else { 572 return Promise.reject(err); 573 } 574 }); 575 } 576 577 /** 578 * Get json body with database metadata. 579 * 580 * @return {Promise<object>} 581 */ 582 info() { 583 return this._impl.info().then((raw) => JSON.parse(raw)); 584 } 585 586 /** 587 * Removes document idenfied by [id] from [collection]. 588 * 589 * @param {String} collection 590 * @param {number} id 591 * @return {Promise<void>} 592 */ 593 del(collection, id) { 594 return this._impl.del(collection, id); 595 } 596 597 /** 598 * Renames collection. 599 * 600 * @param {String} oldCollectionName Collection to be renamed 601 * @param {String} newCollectionName New name of collection 602 * @return {Promise<void>} 603 */ 604 renameCollection(oldCollectionName, newCollectionName) { 605 return this._impl.rename_collection(oldCollectionName, newCollectionName); 606 } 607 608 /** 609 * Ensures json document database index specified by [path] json pointer to string data type. 610 * 611 * @param {String} collection 612 * @param {String} path 613 * @param {boolean} [unique=false] 614 * @return {Promise<void>} 615 */ 616 ensureStringIndex(collection, path, unique) { 617 return this._impl.index(collection, path, 0x04 | (unique ? 0x01 : 0), false); 618 } 619 620 /** 621 * Removes specified database index. 622 * 623 * @param {String} collection 624 * @param {String} path 625 * @param {boolean} [unique=false] 626 * @return {Promise<void>} 627 */ 628 removeStringIndex(collection, path, unique) { 629 return this._impl.index(collection, path, 0x04 | (unique ? 0x01 : 0), true); 630 } 631 632 /** 633 * Ensures json document database index specified by [path] json pointer to integer data type. 634 * 635 * @param {String} collection 636 * @param {String} path 637 * @param {boolean} [unique=false] 638 * @return {Promise<void>} 639 */ 640 ensureIntIndex(collection, path, unique) { 641 return this._impl.index(collection, path, 0x08 | (unique ? 0x01 : 0), false); 642 } 643 644 /** 645 * Removes specified database index. 646 * 647 * @param {String} collection 648 * @param {String} path 649 * @param {boolean} [unique=false] 650 * @return {Promise<void>} 651 */ 652 removeIntIndex(collection, path, unique) { 653 return this._impl.index(collection, path, 0x08 | (unique ? 0x01 : 0), true); 654 } 655 656 /** 657 * Ensures json document database index specified by [path] json pointer to floating point data type. 658 * 659 * @param {String} collection 660 * @param {String} path 661 * @param {boolean} [unique=false] 662 * @return {Promise<void>} 663 */ 664 ensureFloatIndex(collection, path, unique) { 665 return this._impl.index(collection, path, 0x10 | (unique ? 0x01 : 0), false); 666 } 667 668 /** 669 * Removes specified database index. 670 * 671 * @param {String} collection 672 * @param {String} path 673 * @param {boolean} [unique=false] 674 * @return {Promise<void>} 675 */ 676 removeFloatIndex(collection, path, unique) { 677 return this._impl.index(collection, path, 0x10 | (unique ? 0x01 : 0), true); 678 } 679 680 /** 681 * Removes database [collection]. 682 * 683 * @param {String} collection 684 * @return {Promise<void>} 685 */ 686 removeCollection(collection) { 687 return this._impl.rmcoll(collection); 688 } 689 690 /** 691 * Create instance of [query] specified for [collection]. 692 * If [collection] is not specified a [query] spec must contain collection name, 693 * eg: `@mycollection/[foo=bar]` 694 * 695 * @param {String} query 696 * @param {String} [collection] 697 * @returns {JQL} 698 */ 699 createQuery(query, collection) { 700 return new JQL(this, query, collection); 701 } 702 703 /** 704 * Creates an online database backup image and copies it into the specified [fileName]. 705 * During online backup phase read/write database operations are allowed and not 706 * blocked for significant amount of time. Returns promise with backup 707 * finish time as number of milliseconds since epoch. 708 * 709 * @param {String} fileName Backup image file path. 710 * @returns {Promise<number>} 711 */ 712 onlineBackup(fileName) { 713 return this._impl.online_backup(fileName); 714 } 715} 716 717module.exports = { 718 EJDB2, 719 JBE 720}; 721 722 723