1 #include "iwkv_internal.h"
2 #include <sys/types.h>
3 #include <fcntl.h>
4 #include <time.h>
5
6 #ifdef _WIN32
7 #include "win32/mman/mman.h"
8 #else
9
10 #ifndef O_CLOEXEC
11 #define O_CLOEXEC 0
12 #endif
13
14 #include <sys/mman.h>
15
16 #endif
17
18 extern atomic_uint_fast64_t g_trigger;
19
20 #define BKP_STARTED 0x1 /**< Backup started */
21 #define BKP_WAL_CLEANUP 0x2 /**< Do checkpoint and truncate WAL file */
22 #define BKP_MAIN_COPY 0x3 /**< Copy main database file */
23 #define BKP_WAL_COPY1 0x4 /**< Copy most of WAL file content */
24 #define BKP_WAL_COPY2 0x5 /**< Copy rest of WAL file in exclusive locked mode */
25
26 typedef struct IWAL {
27 IWDLSNR lsnr;
28 atomic_bool applying; /**< WAL applying */
29 atomic_bool open; /**< Is WAL in use */
30 atomic_bool force_cp; /**< Next checkpoint scheduled */
31 atomic_bool synched; /**< WAL is synched or WBFIXPOINT is the last write operation */
32 bool force_sp; /**< Next savepoint scheduled */
33 bool check_cp_crc; /**< Check CRC32 sum of data blocks during checkpoint. Default: false */
34 iwkv_openflags oflags; /**< File open flags */
35 atomic_int bkp_stage; /**< Online backup stage */
36 size_t wal_buffer_sz; /**< WAL file intermediate buffer size. */
37 size_t checkpoint_buffer_sz; /**< Checkpoint buffer size in bytes. */
38 uint32_t bufpos; /**< Current position in buffer */
39 uint32_t bufsz; /**< Size of buffer */
40 HANDLE fh; /**< File handle */
41 uint8_t *buf; /**< File buffer */
42 char *path; /**< WAL file path */
43 pthread_mutex_t *mtxp; /**< Global WAL mutex */
44 pthread_cond_t *cpt_condp; /**< Checkpoint thread cond variable */
45 pthread_t *cptp; /**< Checkpoint thread */
46 iwrc (*wal_lock_interceptor)(bool, void*);
47 /**< Optional function called
48 - before acquiring
49 - after releasing
50 exclusive database lock by WAL checkpoint thread.
51 In the case of `before lock` first argument will be set to true */
52 void *wal_lock_interceptor_opaque; /**< Opaque data for `wal_lock_interceptor` */
53 uint32_t savepoint_timeout_sec; /**< Savepoint timeout seconds */
54 uint32_t checkpoint_timeout_sec; /**< Checkpoint timeout seconds */
55 atomic_size_t mbytes; /**< Estimated size of modifed private mmaped memory bytes */
56 off_t rollforward_offset; /**< Rollforward offset during online backup */
57 uint64_t checkpoint_ts; /**< Last checkpoint timestamp milliseconds */
58 pthread_mutex_t mtx; /**< Global WAL mutex */
59 pthread_cond_t cpt_cond; /**< Checkpoint thread cond variable */
60 pthread_t cpt; /**< Checkpoint thread */
61 IWKV iwkv;
62 } IWAL;
63
64 static iwrc _checkpoint_exl(IWAL *wal, uint64_t *tsp, bool no_fixpoint);
65
_lock(IWAL * wal)66 IW_INLINE iwrc _lock(IWAL *wal) {
67 int rci = pthread_mutex_lock(wal->mtxp);
68 return (rci ? iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci) : 0);
69 }
70
_unlock(IWAL * wal)71 IW_INLINE iwrc _unlock(IWAL *wal) {
72 int rci = pthread_mutex_unlock(wal->mtxp);
73 return (rci ? iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci) : 0);
74 }
75
_excl_lock(IWAL * wal)76 static iwrc _excl_lock(IWAL *wal) {
77 iwrc rc = 0;
78 if (wal->wal_lock_interceptor) {
79 rc = wal->wal_lock_interceptor(true, wal->wal_lock_interceptor_opaque);
80 RCRET(rc);
81 }
82 rc = iwkv_exclusive_lock(wal->iwkv);
83 if (rc) {
84 if (wal->wal_lock_interceptor) {
85 IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc);
86 }
87 return rc;
88 }
89 rc = _lock(wal);
90 if (rc) {
91 IWRC(iwkv_exclusive_unlock(wal->iwkv), rc);
92 if (wal->wal_lock_interceptor) {
93 IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc);
94 }
95 }
96 return rc;
97 }
98
_excl_unlock(IWAL * wal)99 static iwrc _excl_unlock(IWAL *wal) {
100 iwrc rc = _unlock(wal);
101 IWRC(iwkv_exclusive_unlock(wal->iwkv), rc);
102 if (wal->wal_lock_interceptor) {
103 IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc);
104 }
105 return rc;
106 }
107
_init_locks(IWAL * wal)108 static iwrc _init_locks(IWAL *wal) {
109 int rci = pthread_mutex_init(&wal->mtx, 0);
110 if (rci) {
111 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
112 }
113 wal->mtxp = &wal->mtx;
114 return 0;
115 }
116
_destroy(IWAL * wal)117 static void _destroy(IWAL *wal) {
118 if (wal) {
119 wal->open = false;
120 if (!INVALIDHANDLE(wal->fh)) {
121 iwp_unlock(wal->fh);
122 iwp_closefh(wal->fh);
123 }
124 if (wal->cpt_condp) {
125 pthread_cond_destroy(wal->cpt_condp);
126 wal->cpt_condp = 0;
127 }
128 if (wal->mtxp) {
129 pthread_mutex_destroy(wal->mtxp);
130 wal->mtxp = 0;
131 }
132 free(wal->path);
133 if (wal->buf) {
134 wal->buf -= sizeof(WBSEP);
135 free(wal->buf);
136 }
137 free(wal);
138 }
139 }
140
_flush_wl(IWAL * wal,bool sync)141 static iwrc _flush_wl(IWAL *wal, bool sync) {
142 iwrc rc = 0;
143 if (wal->bufpos) {
144 uint32_t crc = wal->check_cp_crc ? iwu_crc32(wal->buf, wal->bufpos, 0) : 0;
145 WBSEP sep = {
146 .id = WOP_SEP,
147 .crc = crc,
148 .len = wal->bufpos
149 };
150 size_t wz = wal->bufpos + sizeof(WBSEP);
151 uint8_t *wp = wal->buf - sizeof(WBSEP);
152 memcpy(wp, &sep, sizeof(WBSEP));
153 rc = iwp_write(wal->fh, wp, wz);
154 RCRET(rc);
155 wal->bufpos = 0;
156 }
157 if (sync) {
158 rc = iwp_fsync(wal->fh);
159 }
160 return rc;
161 }
162
_truncate_wl(IWAL * wal)163 IW_INLINE iwrc _truncate_wl(IWAL *wal) {
164 iwrc rc = iwp_ftruncate(wal->fh, 0);
165 RCRET(rc);
166 wal->rollforward_offset = 0;
167 rc = iwp_lseek(wal->fh, 0, IWP_SEEK_SET, 0);
168 RCRET(rc);
169 rc = iwp_fsync(wal->fh);
170 return rc;
171 }
172
_write_wl(IWAL * wal,const void * op,off_t oplen,const uint8_t * data,off_t len)173 static iwrc _write_wl(IWAL *wal, const void *op, off_t oplen, const uint8_t *data, off_t len) {
174 iwrc rc = 0;
175 const off_t bufsz = wal->bufsz;
176 wal->synched = false;
177 if (bufsz - wal->bufpos < oplen) {
178 rc = _flush_wl(wal, false);
179 RCRET(rc);
180 }
181 assert(bufsz - wal->bufpos >= oplen);
182 memcpy(wal->buf + wal->bufpos, op, (size_t) oplen);
183 wal->bufpos += oplen;
184 if (bufsz - wal->bufpos < len) {
185 rc = _flush_wl(wal, false);
186 RCRET(rc);
187 rc = iwp_write(wal->fh, data, (size_t) len);
188 RCRET(rc);
189 } else if (len > 0){
190 assert(bufsz - wal->bufpos >= len);
191 memcpy(wal->buf + wal->bufpos, data, (size_t) len);
192 wal->bufpos += len;
193 }
194 return rc;
195 }
196
_write_op(IWAL * wal,const void * op,off_t oplen,const uint8_t * data,off_t len)197 IW_INLINE iwrc _write_op(IWAL *wal, const void *op, off_t oplen, const uint8_t *data, off_t len) {
198 iwrc rc = _lock(wal);
199 RCRET(rc);
200 rc = _write_wl(wal, op, oplen, data, len);
201 IWRC(_unlock(wal), rc);
202 return rc;
203 }
204
iwal_sync(IWKV iwkv)205 iwrc iwal_sync(IWKV iwkv) {
206 IWAL *wal = (IWAL*) iwkv->dlsnr;
207 iwrc rc = _lock(wal);
208 RCRET(rc);
209 rc = _flush_wl(wal, true);
210 IWRC(_unlock(wal), rc);
211 return rc;
212 }
213
_onopen(struct IWDLSNR * self,const char * path,int mode)214 static iwrc _onopen(struct IWDLSNR *self, const char *path, int mode) {
215 return 0;
216 }
217
_onclosing(struct IWDLSNR * self)218 static iwrc _onclosing(struct IWDLSNR *self) {
219 IWAL *wal = (IWAL*) self;
220 #ifdef IW_TESTS
221 uint64_t tv = g_trigger;
222 if (tv & IWKVD_WAL_NO_CHECKPOINT_ON_CLOSE) {
223 _destroy(wal);
224 return 0;
225 }
226 #endif
227 iwrc rc = _checkpoint_exl(wal, 0, false);
228 _destroy(wal);
229 return rc;
230 }
231
_onset(struct IWDLSNR * self,off_t off,uint8_t val,off_t len,int flags)232 static iwrc _onset(struct IWDLSNR *self, off_t off, uint8_t val, off_t len, int flags) {
233 IWAL *wal = (IWAL*) self;
234 if (wal->applying) {
235 return 0;
236 }
237 WBSET wb = {
238 .id = WOP_SET,
239 .val = val,
240 .off = off,
241 .len = len
242 };
243 wal->mbytes += len;
244 return _write_op((IWAL*) self, &wb, sizeof(wb), 0, 0);
245 }
246
_oncopy(struct IWDLSNR * self,off_t off,off_t len,off_t noff,int flags)247 static iwrc _oncopy(struct IWDLSNR *self, off_t off, off_t len, off_t noff, int flags) {
248 IWAL *wal = (IWAL*) self;
249 if (wal->applying) {
250 return 0;
251 }
252 WBCOPY wb = {
253 .id = WOP_COPY,
254 .off = off,
255 .len = len,
256 .noff = noff
257 };
258 wal->mbytes += len;
259 return _write_op(wal, &wb, sizeof(wb), 0, 0);
260 }
261
_onwrite(struct IWDLSNR * self,off_t off,const void * buf,off_t len,int flags)262 static iwrc _onwrite(struct IWDLSNR *self, off_t off, const void *buf, off_t len, int flags) {
263 assert(len <= (size_t) (-1));
264 IWAL *wal = (IWAL*) self;
265 if (wal->applying) {
266 return 0;
267 }
268 WBWRITE wb = {
269 .id = WOP_WRITE,
270 .crc = wal->check_cp_crc ? iwu_crc32(buf,len, 0) : 0,
271 .len = len,
272 .off = off
273 };
274 wal->mbytes += len;
275 return _write_op(wal, &wb, sizeof(wb), buf, len);
276 }
277
_onresize(struct IWDLSNR * self,off_t osize,off_t nsize,int flags,bool * handled)278 static iwrc _onresize(struct IWDLSNR *self, off_t osize, off_t nsize, int flags, bool *handled) {
279 IWAL *wal = (IWAL*) self;
280 if (wal->applying) {
281 *handled = false;
282 return 0;
283 }
284 *handled = true;
285 WBRESIZE wb = {
286 .id = WOP_RESIZE,
287 .osize = osize,
288 .nsize = nsize
289 };
290 iwrc rc = _lock(wal);
291 RCRET(rc);
292 rc = _write_wl(wal, &wb, sizeof(wb), 0, 0);
293 RCGO(rc, finish);
294 rc = _checkpoint_exl(wal, 0, true);
295 finish:
296 IWRC(_unlock(wal), rc);
297 return rc;
298 }
299
_onsynced(struct IWDLSNR * self,int flags)300 static iwrc _onsynced(struct IWDLSNR *self, int flags) {
301 IWAL *wal = (IWAL*) self;
302 if (wal->applying) {
303 return 0;
304 }
305 iwrc rc = _lock(wal);
306 RCRET(rc);
307 rc = _flush_wl(wal, true);
308 IWRC(_unlock(wal), rc);
309 return rc;
310 }
311
_last_fix_and_reset_points(IWAL * wal,uint8_t * wmm,off_t fsz,off_t * fpos,off_t * rpos)312 static void _last_fix_and_reset_points(IWAL *wal, uint8_t *wmm, off_t fsz, off_t *fpos, off_t *rpos) {
313 uint8_t *rp = wmm;
314 *fpos = 0;
315 *rpos = 0;
316
317 for (uint32_t i = 0; rp - wmm < fsz; ++i) {
318 uint8_t opid;
319 off_t avail = fsz - (rp - wmm);
320 memcpy(&opid, rp, 1);
321 if ((i == 0) && (opid != WOP_SEP)) {
322 return;
323 }
324 switch (opid) {
325 case WOP_SEP: {
326 WBSEP wb;
327 if (avail < sizeof(wb)) {
328 return;
329 }
330 memcpy(&wb, rp, sizeof(wb));
331 rp += sizeof(wb);
332 if (wb.len > avail) {
333 return;
334 }
335 break;
336 }
337 case WOP_SET: {
338 if (avail < sizeof(WBSET)) {
339 return;
340 }
341 rp += sizeof(WBSET);
342 break;
343 }
344 case WOP_COPY: {
345 if (avail < sizeof(WBCOPY)) {
346 return;
347 }
348 rp += sizeof(WBCOPY);
349 break;
350 }
351 case WOP_WRITE: {
352 WBWRITE wb;
353 if (avail < sizeof(wb)) {
354 return;
355 }
356 memcpy(&wb, rp, sizeof(wb));
357 rp += sizeof(wb);
358 if (avail < wb.len) {
359 return;
360 }
361 rp += wb.len;
362 break;
363 }
364 case WOP_RESIZE: {
365 if (avail < sizeof(WBRESIZE)) {
366 return;
367 }
368 rp += sizeof(WBRESIZE);
369 break;
370 }
371 case WOP_SAVEPOINT: {
372 *fpos = (rp - wmm);
373 rp += sizeof(WBSAVEPOINT);
374 break;
375 }
376 case WOP_RESET: {
377 *rpos = (rp - wmm);
378 rp += sizeof(WBRESET);
379 break;
380 }
381 default: {
382 return;
383 break;
384 }
385 }
386 }
387 }
388
_rollforward_exl(IWAL * wal,IWFS_EXT * extf,int recover_mode)389 static iwrc _rollforward_exl(IWAL *wal, IWFS_EXT *extf, int recover_mode) {
390 assert(wal->bufpos == 0);
391 off_t fsz = 0;
392 iwrc rc = iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz);
393 RCRET(rc);
394 if (!fsz) { // empty wal log
395 return 0;
396 }
397 size_t sp;
398 uint8_t *mm;
399 const bool ccrc = wal->check_cp_crc;
400 off_t fpos = 0; // checkpoint
401 #ifndef _WIN32
402 off_t pfsz = IW_ROUNDUP(fsz, iwp_page_size());
403 uint8_t *wmm = mmap(0, (size_t) pfsz, PROT_READ, MAP_PRIVATE, wal->fh, 0);
404 #if defined(MADV_SEQUENTIAL) || defined(MADV_DONTFORK)
405 int adv = 0;
406 #ifdef MADV_SEQUENTIAL
407 adv |= MADV_SEQUENTIAL;
408 #endif
409 #ifdef MADV_DONTFORK
410 adv |= MADV_DONTFORK;
411 #endif
412 madvise(wmm, (size_t) fsz, adv);
413 #endif
414 #else
415 off_t pfsz = fsz;
416 uint8_t *wmm = mmap(0, 0, PROT_READ, MAP_PRIVATE, wal->fh, 0);
417 #endif
418 if (wmm == MAP_FAILED) {
419 return iwrc_set_errno(IW_ERROR_ERRNO, errno);
420 }
421 // Temporary turn off extf locking
422 wal->applying = true;
423
424 // Remap fsm in MAP_SHARED mode
425 extf->remove_mmap_unsafe(extf, 0);
426 rc = extf->add_mmap_unsafe(extf, 0, SIZE_T_MAX, IWFS_MMAP_SHARED);
427 if (rc) {
428 munmap(wmm, (size_t) pfsz);
429 wal->iwkv->fatalrc = rc;
430 wal->applying = false;
431 return rc;
432 }
433
434 #define _WAL_CORRUPTED(msg_) do { \
435 rc = IWKV_ERROR_CORRUPTED_WAL_FILE; \
436 iwlog_ecode_error2(rc, msg_); \
437 goto finish; \
438 } while (0);
439
440 if (recover_mode) {
441 off_t rpos; // reset point
442 _last_fix_and_reset_points(wal, wmm, fsz, &fpos, &rpos);
443 if (!fpos) {
444 goto finish;
445 }
446 if ((rpos > 0) && (recover_mode == 1)) {
447 // Recover from last known reset point
448 if (fpos < rpos) {
449 goto finish;
450 }
451 // WBSEP__WBRESET
452 // \_rpos
453 rpos -= sizeof(WBSEP);
454 // WBSEP__WBRESET
455 // \_rpos
456 wmm += rpos;
457 fsz -= rpos;
458 }
459 } else if (wal->rollforward_offset > 0) {
460 if (wal->rollforward_offset >= fsz) {
461 _WAL_CORRUPTED("Invalid rollforward offset");
462 }
463 wmm += wal->rollforward_offset;
464 fsz -= wal->rollforward_offset;
465 }
466
467 uint8_t *rp = wmm;
468 for (uint32_t i = 0; rp - wmm < fsz; ++i) {
469 uint8_t opid;
470 off_t avail = fsz - (rp - wmm);
471 memcpy(&opid, rp, 1);
472 if ((i == 0) && (opid != WOP_SEP)) {
473 rc = IWKV_ERROR_CORRUPTED_WAL_FILE;
474 goto finish;
475 }
476 switch (opid) {
477 case WOP_SEP: {
478 WBSEP wb;
479 if (avail < sizeof(wb)) {
480 _WAL_CORRUPTED("Premature end of WAL (WBSEP)");
481 }
482 memcpy(&wb, rp, sizeof(wb));
483 rp += sizeof(wb);
484 if (wb.len > avail) {
485 _WAL_CORRUPTED("Premature end of WAL (WBSEP)");
486 }
487 if (ccrc && wb.crc) {
488 uint32_t crc = iwu_crc32(rp, wb.len, 0);
489 if (crc != wb.crc) {
490 _WAL_CORRUPTED("Invalid CRC32 checksum of WAL segment (WBSEP)");
491 }
492 }
493 break;
494 }
495 case WOP_SET: {
496 WBSET wb;
497 if (avail < sizeof(wb)) {
498 _WAL_CORRUPTED("Premature end of WAL (WBSET)");
499 }
500 memcpy(&wb, rp, sizeof(wb));
501 rp += sizeof(wb);
502 rc = extf->probe_mmap_unsafe(extf, 0, &mm, &sp);
503 RCGO(rc, finish);
504 memset(mm + wb.off, wb.val, (size_t) wb.len);
505 break;
506 }
507 case WOP_COPY: {
508 WBCOPY wb;
509 if (avail < sizeof(wb)) {
510 _WAL_CORRUPTED("Premature end of WAL (WBCOPY)");
511 }
512 memcpy(&wb, rp, sizeof(wb));
513 rp += sizeof(wb);
514 rc = extf->probe_mmap_unsafe(extf, 0, &mm, &sp);
515 RCGO(rc, finish);
516 memmove(mm + wb.noff, mm + wb.off, (size_t) wb.len);
517 break;
518 }
519 case WOP_WRITE: {
520 WBWRITE wb;
521 if (avail < sizeof(wb)) {
522 _WAL_CORRUPTED("Premature end of WAL (WBWRITE)");
523 }
524 memcpy(&wb, rp, sizeof(wb));
525 rp += sizeof(wb);
526 if (avail < wb.len) {
527 _WAL_CORRUPTED("Premature end of WAL (WBWRITE)");
528 }
529 if (ccrc && wb.crc) {
530 uint32_t crc = iwu_crc32(rp, wb.len, 0);
531 if (crc != wb.crc) {
532 _WAL_CORRUPTED("Invalid CRC32 checksum of WAL segment (WBWRITE)");
533 }
534 }
535 rc = extf->probe_mmap_unsafe(extf, 0, &mm, &sp);
536 RCGO(rc, finish);
537 memmove(mm + wb.off, rp, wb.len);
538 rp += wb.len;
539 break;
540 }
541 case WOP_RESIZE: {
542 WBRESIZE wb;
543 if (avail < sizeof(wb)) {
544 _WAL_CORRUPTED("Premature end of WAL (WBRESIZE)");
545 }
546 memcpy(&wb, rp, sizeof(wb));
547 rp += sizeof(wb);
548 rc = extf->truncate_unsafe(extf, wb.nsize);
549 RCGO(rc, finish);
550 break;
551 }
552 case WOP_SAVEPOINT:
553 if (fpos == rp - wmm) { // last fixpoint to
554 WBSAVEPOINT wb;
555 memcpy(&wb, rp, sizeof(wb));
556 iwlog_warn("Database recovered at point of time: %"
557 PRIu64
558 " ms since epoch\n", wb.ts);
559 goto finish;
560 }
561 rp += sizeof(WBSAVEPOINT);
562 break;
563 case WOP_RESET: {
564 rp += sizeof(WBRESET);
565 break;
566 }
567 default: {
568 _WAL_CORRUPTED("Invalid WAL command");
569 break;
570 }
571 }
572 }
573 #undef _WAL_CORRUPTED
574
575 finish:
576 if (!rc) {
577 rc = extf->sync_mmap_unsafe(extf, 0, IWFS_SYNCDEFAULT);
578 }
579 munmap(wmm, (size_t) pfsz);
580 IWRC(extf->remove_mmap_unsafe(extf, 0), rc);
581 IWRC(extf->add_mmap_unsafe(extf, 0, SIZE_T_MAX, IWFS_MMAP_PRIVATE), rc);
582 if (!rc) {
583 int stage = wal->bkp_stage;
584 if ((stage == 0) || (stage == BKP_WAL_CLEANUP)) {
585 rc = _truncate_wl(wal);
586 } else {
587 // Don't truncate WAL during online backup.
588 // Just append the WBRESET mark
589 WBRESET wb = {
590 .id = WOP_RESET
591 };
592 IWRC(_flush_wl(wal, false), rc);
593 // Write: WBSEP + WBRESET
594 IWRC(_write_wl(wal, &wb, sizeof(wb), 0, 0), rc);
595 IWRC(_flush_wl(wal, true), rc);
596 IWRC(iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz), rc);
597 if (!rc) {
598 // rollforward_offset points here --> WBSEP __ WBRESET __ EOF
599 wal->rollforward_offset = fsz - (sizeof(WBSEP) + sizeof(WBRESET));
600 }
601 }
602 }
603 if (rc && !wal->iwkv->fatalrc) {
604 wal->iwkv->fatalrc = rc;
605 }
606 wal->synched = true;
607 wal->applying = false;
608 return rc;
609 }
610
_recover_wl(IWKV iwkv,IWAL * wal,IWFS_FSM_OPTS * fsmopts,bool recover_backup)611 static iwrc _recover_wl(IWKV iwkv, IWAL *wal, IWFS_FSM_OPTS *fsmopts, bool recover_backup) {
612 off_t fsz = 0;
613 iwrc rc = iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz);
614 RCRET(rc);
615 if (!fsz) { // empty wal log
616 return 0;
617 }
618 IWFS_EXT extf;
619 IWFS_EXT_OPTS extopts;
620 memcpy(&extopts, &fsmopts->exfile, sizeof(extopts));
621 extopts.use_locks = false;
622 extopts.file.omode = IWFS_OCREATE | IWFS_OWRITE;
623 extopts.file.dlsnr = 0;
624 rc = iwfs_exfile_open(&extf, &extopts);
625 RCRET(rc);
626 rc = _rollforward_exl(wal, &extf, recover_backup ? 2 : 1);
627 IWRC(extf.close(&extf), rc);
628 return rc;
629 }
630
_need_checkpoint(IWAL * wal)631 IW_INLINE bool _need_checkpoint(IWAL *wal) {
632 uint64_t mbytes = wal->mbytes;
633 bool force = wal->force_cp;
634 return (force || mbytes >= wal->checkpoint_buffer_sz);
635 }
636
_checkpoint_exl(IWAL * wal,uint64_t * tsp,bool no_fixpoint)637 static iwrc _checkpoint_exl(IWAL *wal, uint64_t *tsp, bool no_fixpoint) {
638 if (tsp) {
639 *tsp = 0;
640 }
641 int stage = wal->bkp_stage;
642 if (stage == BKP_MAIN_COPY) {
643 // No checkpoints during main file copying
644 return 0;
645 }
646 iwrc rc = 0;
647 IWFS_EXT *extf;
648 IWKV iwkv = wal->iwkv;
649 if (!no_fixpoint) {
650 wal->force_cp = false;
651 wal->force_sp = false;
652 WBSAVEPOINT wb = {
653 .id = WOP_SAVEPOINT
654 };
655 rc = iwp_current_time_ms(&wb.ts, false);
656 RCGO(rc, finish);
657 rc = _write_wl(wal, &wb, sizeof(wb), 0, 0);
658 RCGO(rc, finish);
659 }
660 rc = _flush_wl(wal, true);
661 RCGO(rc, finish);
662 rc = iwkv->fsm.extfile(&iwkv->fsm, &extf);
663 RCGO(rc, finish);
664
665 rc = _rollforward_exl(wal, extf, 0);
666 wal->mbytes = 0;
667 wal->synched = true;
668 iwp_current_time_ms(&wal->checkpoint_ts, true);
669 if (tsp) {
670 *tsp = wal->checkpoint_ts;
671 }
672
673 finish:
674 if (rc) {
675 if (iwkv->fatalrc) {
676 iwlog_ecode_error3(rc);
677 } else {
678 iwkv->fatalrc = rc;
679 }
680 }
681 return rc;
682 }
683
684 #ifdef IW_TESTS
685
iwal_test_checkpoint(IWKV iwkv)686 iwrc iwal_test_checkpoint(IWKV iwkv) {
687 if (!iwkv->dlsnr) {
688 return IWKV_ERROR_WAL_MODE_REQUIRED;
689 }
690 IWAL *wal = (IWAL*) iwkv->dlsnr;
691 iwrc rc = _excl_lock(wal);
692 RCRET(rc);
693 rc = _checkpoint_exl(wal, 0, false);
694 IWRC(_excl_unlock(wal), rc);
695 return rc;
696 }
697
698 #endif
699
700 //--------------------------------------- Public API
701
iwal_poke_checkpoint(IWKV iwkv,bool force)702 WUR iwrc iwal_poke_checkpoint(IWKV iwkv, bool force) {
703 IWAL *wal = (IWAL*) iwkv->dlsnr;
704 if (!wal || !(force || _need_checkpoint(wal))) {
705 return 0;
706 }
707 iwrc rc = _lock(wal);
708 RCRET(rc);
709 bool cforce = wal->force_cp;
710 if (cforce) { // Forced already
711 _unlock(wal);
712 return 0;
713 } else if (force) {
714 wal->force_cp = true;
715 } else if (!_need_checkpoint(wal)) {
716 _unlock(wal);
717 return 0;
718 }
719 int rci = pthread_cond_broadcast(wal->cpt_condp);
720 if (rci) {
721 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
722 }
723 _unlock(wal);
724 return rc;
725 }
726
iwal_poke_savepoint(IWKV iwkv)727 iwrc iwal_poke_savepoint(IWKV iwkv) {
728 IWAL *wal = (IWAL*) iwkv->dlsnr;
729 if (!wal) {
730 return 0;
731 }
732 iwrc rc = _lock(wal);
733 RCRET(rc);
734 bool fsp = wal->force_sp;
735 if (!fsp) {
736 wal->force_sp = true;
737 int rci = pthread_cond_broadcast(wal->cpt_condp);
738 if (rci) {
739 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
740 }
741 }
742 _unlock(wal);
743 return rc;
744 }
745
_savepoint_exl(IWAL * wal,uint64_t * tsp,bool sync)746 iwrc _savepoint_exl(IWAL *wal, uint64_t *tsp, bool sync) {
747 if (tsp) {
748 *tsp = 0;
749 }
750 wal->force_sp = false;
751 WBSAVEPOINT wbfp = {
752 .id = WOP_SAVEPOINT
753 };
754 iwrc rc = iwp_current_time_ms(&wbfp.ts, false);
755 RCRET(rc);
756 rc = _write_wl(wal, &wbfp, sizeof(wbfp), 0, 0);
757 RCRET(rc);
758 rc = _flush_wl(wal, sync);
759 RCRET(rc);
760 if (sync) {
761 wal->synched = true;
762 }
763 if (tsp) {
764 *tsp = wbfp.ts;
765 }
766 return 0;
767 }
768
iwal_synched(IWKV iwkv)769 bool iwal_synched(IWKV iwkv) {
770 IWAL *wal = (IWAL*) iwkv->dlsnr;
771 if (!wal) {
772 return false;
773 }
774 return wal->synched;
775 }
776
iwal_savepoint_exl(IWKV iwkv,bool sync)777 iwrc iwal_savepoint_exl(IWKV iwkv, bool sync) {
778 IWAL *wal = (IWAL*) iwkv->dlsnr;
779 if (!wal) {
780 return 0;
781 }
782 return _savepoint_exl(wal, 0, sync);
783 }
784
iwal_shutdown(IWKV iwkv)785 void iwal_shutdown(IWKV iwkv) {
786 IWAL *wal = (IWAL*) iwkv->dlsnr;
787 if (!wal) {
788 return;
789 }
790 while (wal->bkp_stage) { // todo: review
791 iwp_sleep(50);
792 }
793 wal->open = false;
794 if (wal->mtxp && wal->cpt_condp) {
795 pthread_mutex_lock(wal->mtxp);
796 pthread_cond_broadcast(wal->cpt_condp);
797 pthread_mutex_unlock(wal->mtxp);
798 }
799 if (wal->cptp) {
800 pthread_join(wal->cpt, 0);
801 wal->cpt = 0;
802 }
803 }
804
_cpt_worker_fn(void * op)805 static void* _cpt_worker_fn(void *op) {
806 int rci;
807 iwrc rc = 0;
808 IWAL *wal = op;
809 IWKV iwkv = wal->iwkv;
810 uint64_t savepoint_ts = 0;
811
812 while (wal->open) {
813 struct timespec tp;
814 uint64_t tick_ts;
815 bool sp = false, cp = false;
816 rc = _lock(wal);
817 RCBREAK(rc);
818
819 if (_need_checkpoint(wal)) {
820 cp = true;
821 _unlock(wal);
822 goto cprun;
823 } else if (wal->force_sp) {
824 sp = true;
825 _unlock(wal);
826 goto cprun;
827 }
828
829 #if defined(IW_HAVE_CLOCK_MONOTONIC) && defined(IW_HAVE_PTHREAD_CONDATTR_SETCLOCK)
830 rc = iwp_clock_get_time(CLOCK_MONOTONIC, &tp);
831 #else
832 rc = iwp_clock_get_time(CLOCK_REALTIME, &tp);
833 #endif
834 if (rc) {
835 _unlock(wal);
836 break;
837 }
838 tp.tv_sec += 1; // one sec tick
839 tick_ts = tp.tv_sec * 1000 + (uint64_t) round(tp.tv_nsec / 1.0e6);
840 do {
841 rci = pthread_cond_timedwait(wal->cpt_condp, wal->mtxp, &tp);
842 } while (rci == EINTR);
843 if (rci && (rci != ETIMEDOUT)) {
844 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
845 _unlock(wal);
846 break;
847 }
848 if (!wal->open || iwkv->fatalrc) {
849 _unlock(wal);
850 break;
851 }
852 bool synched = wal->synched;
853 size_t mbytes = wal->mbytes;
854 cp = _need_checkpoint(wal) || ((mbytes && (tick_ts - wal->checkpoint_ts) >= 1000LL * wal->checkpoint_timeout_sec));
855 if (!cp) {
856 sp = !synched && (wal->force_sp || ((tick_ts - savepoint_ts) >= 1000LL * wal->savepoint_timeout_sec));
857 }
858 _unlock(wal);
859
860 cprun:
861 if (cp || sp) {
862 rc = _excl_lock(wal);
863 RCBREAK(rc);
864 if (iwkv->open) {
865 if (cp) {
866 rc = _checkpoint_exl(wal, &savepoint_ts, false);
867 } else {
868 rc = _savepoint_exl(wal, &savepoint_ts, true);
869 }
870 }
871 _excl_unlock(wal);
872 if (rc) {
873 iwlog_ecode_error2(rc, "WAL worker savepoint/checkpoint error\n");
874 rc = 0;
875 }
876 }
877 }
878 if (rc) {
879 iwkv->fatalrc = iwkv->fatalrc ? iwkv->fatalrc : rc;
880 iwlog_ecode_error2(rc, "WAL worker exited with error\n");
881 }
882 return 0;
883 }
884
iwal_online_backup(IWKV iwkv,uint64_t * ts,const char * target_file)885 iwrc iwal_online_backup(IWKV iwkv, uint64_t *ts, const char *target_file) {
886 iwrc rc;
887 size_t sp;
888 uint32_t lv;
889 uint64_t llv;
890 char buf[16384];
891 off_t off = 0, fsize = 0;
892 *ts = 0;
893
894 if (!target_file) {
895 return IW_ERROR_INVALID_ARGS;
896 }
897 IWAL *wal = (IWAL*) iwkv->dlsnr;
898 if (!wal) {
899 return IWKV_ERROR_WAL_MODE_REQUIRED;
900 }
901 rc = _lock(wal);
902 RCRET(rc);
903 if (wal->bkp_stage) {
904 rc = IWKV_ERROR_BACKUP_IN_PROGRESS;
905 } else {
906 wal->bkp_stage = BKP_STARTED;
907 }
908 _unlock(wal);
909
910 #ifndef _WIN32
911 HANDLE fh = open(target_file, O_CREAT | O_WRONLY | O_TRUNC, 00600);
912 if (INVALIDHANDLE(fh)) {
913 rc = iwrc_set_errno(IW_ERROR_IO_ERRNO, errno);
914 goto finish;
915 }
916 #else
917 HANDLE fh = CreateFile(target_file, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE,
918 NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
919 if (INVALIDHANDLE(fh)) {
920 rc = iwrc_set_werror(IW_ERROR_IO_ERRNO, GetLastError());
921 goto finish;
922 }
923 #endif
924
925 // Flush all pending WAL changes
926 rc = _excl_lock(wal);
927 RCGO(rc, finish);
928 wal->bkp_stage = BKP_WAL_CLEANUP;
929 rc = _checkpoint_exl(wal, 0, false);
930 wal->bkp_stage = BKP_MAIN_COPY;
931 _excl_unlock(wal);
932 RCGO(rc, finish);
933
934 // Copy main database file
935 IWFS_FSM_STATE fstate = { 0 };
936 rc = iwkv->fsm.state(&iwkv->fsm, &fstate);
937 RCGO(rc, finish);
938 do {
939 rc = iwp_pread(fstate.exfile.file.fh, off, buf, sizeof(buf), &sp);
940 RCGO(rc, finish);
941 if (sp > 0) {
942 rc = iwp_write(fh, buf, sp);
943 RCGO(rc, finish);
944 off += sp;
945 }
946 } while (sp > 0);
947
948 // Copy most of WAL file content
949 rc = _lock(wal);
950 RCGO(rc, finish);
951 wal->bkp_stage = BKP_WAL_COPY1;
952 rc = _flush_wl(wal, false);
953 _unlock(wal);
954 RCGO(rc, finish);
955
956 fsize = off;
957 off = 0;
958 do {
959 rc = iwp_pread(wal->fh, off, buf, sizeof(buf), &sp);
960 RCGO(rc, finish);
961 if (sp > 0) {
962 rc = iwp_write(fh, buf, sp);
963 RCGO(rc, finish);
964 off += sp;
965 }
966 } while (sp > 0);
967
968
969 // Copy rest of WAL file in exclusive locked mode
970 rc = _excl_lock(wal);
971 RCGO(rc, finish);
972 wal->bkp_stage = BKP_WAL_COPY2;
973 rc = _savepoint_exl(wal, ts, true);
974 RCGO(rc, unlock);
975 do {
976 rc = iwp_pread(wal->fh, off, buf, sizeof(buf), &sp);
977 RCGO(rc, unlock);
978 if (sp > 0) {
979 rc = iwp_write(fh, buf, sp);
980 RCGO(rc, unlock);
981 off += sp;
982 }
983 } while (sp > 0);
984
985 llv = IW_HTOILL(fsize);
986 rc = iwp_write(fh, &llv, sizeof(llv));
987 RCGO(rc, unlock);
988
989 lv = IW_HTOIL(IWKV_BACKUP_MAGIC);
990 rc = iwp_write(fh, &lv, sizeof(lv));
991 RCGO(rc, unlock);
992
993 unlock:
994 wal->bkp_stage = 0;
995 IWRC(_excl_unlock(wal), rc);
996
997 finish:
998 if (rc) {
999 _lock(wal);
1000 wal->bkp_stage = 0;
1001 _unlock(wal);
1002 } else {
1003 rc = iwal_poke_checkpoint(iwkv, true);
1004 }
1005 if (!INVALIDHANDLE(fh)) {
1006 IWRC(iwp_fdatasync(fh), rc);
1007 IWRC(iwp_closefh(fh), rc);
1008 }
1009 return rc;
1010 }
1011
_init_cpt(IWAL * wal)1012 iwrc _init_cpt(IWAL *wal) {
1013 if ( (wal->savepoint_timeout_sec == UINT32_MAX)
1014 && (wal->checkpoint_timeout_sec == UINT32_MAX)) {
1015 // do not start checkpoint thread
1016 return 0;
1017 }
1018 pthread_attr_t pattr;
1019 pthread_condattr_t cattr;
1020 int rci = pthread_condattr_init(&cattr);
1021 if (rci) {
1022 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1023 }
1024 #if defined(IW_HAVE_CLOCK_MONOTONIC) && defined(IW_HAVE_PTHREAD_CONDATTR_SETCLOCK)
1025 rci = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
1026 if (rci) {
1027 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1028 }
1029 #endif
1030 rci = pthread_cond_init(&wal->cpt_cond, &cattr);
1031 if (rci) {
1032 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1033 }
1034 wal->cpt_condp = &wal->cpt_cond;
1035 rci = pthread_attr_init(&pattr);
1036 if (rci) {
1037 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1038 }
1039 pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_JOINABLE);
1040 rci = pthread_create(&wal->cpt, &pattr, _cpt_worker_fn, wal);
1041 if (rci) {
1042 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1043 }
1044 wal->cptp = &wal->cpt;
1045 return 0;
1046 }
1047
iwal_create(IWKV iwkv,const IWKV_OPTS * opts,IWFS_FSM_OPTS * fsmopts,bool recover_backup)1048 iwrc iwal_create(IWKV iwkv, const IWKV_OPTS *opts, IWFS_FSM_OPTS *fsmopts, bool recover_backup) {
1049 assert(!iwkv->dlsnr && opts && fsmopts);
1050 if (!opts) {
1051 return IW_ERROR_INVALID_ARGS;
1052 }
1053 if ((opts->oflags & IWKV_RDONLY) || !opts->wal.enabled) {
1054 return 0;
1055 }
1056 iwrc rc = 0;
1057 IWAL *wal = calloc(1, sizeof(*wal));
1058 if (!wal) {
1059 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1060 }
1061
1062 wal->wal_lock_interceptor = opts->wal.wal_lock_interceptor;
1063 wal->wal_lock_interceptor_opaque = opts->wal.wal_lock_interceptor_opaque;
1064
1065 size_t sz = strlen(opts->path);
1066 char *wpath = malloc(sz + 4 /*-wal*/ + 1 /*\0*/);
1067 if (!wpath) {
1068 free(wal);
1069 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1070 }
1071 memcpy(wpath, opts->path, sz);
1072 memcpy(wpath + sz, "-wal", 4);
1073 wpath[sz + 4] = '\0';
1074
1075 wal->fh = INVALID_HANDLE_VALUE;
1076 wal->path = wpath;
1077 wal->oflags = opts->oflags;
1078 wal->iwkv = iwkv;
1079 iwp_current_time_ms(&wal->checkpoint_ts, true);
1080
1081 rc = _init_locks(wal);
1082 RCGO(rc, finish);
1083
1084 IWDLSNR *dlsnr = &wal->lsnr;
1085 dlsnr->onopen = _onopen;
1086 dlsnr->onclosing = _onclosing;
1087 dlsnr->onset = _onset;
1088 dlsnr->oncopy = _oncopy;
1089 dlsnr->onwrite = _onwrite;
1090 dlsnr->onresize = _onresize;
1091 dlsnr->onsynced = _onsynced;
1092 iwkv->dlsnr = (IWDLSNR*) wal;
1093
1094 wal->wal_buffer_sz
1095 = opts->wal.wal_buffer_sz > 0
1096 ? opts->wal.wal_buffer_sz :
1097 #if defined __ANDROID__ || defined TARGET_OS_IPHONE
1098 2 * 1024 * 1024; // 2M
1099 #else
1100 8 * 1024 * 1024; // 8M
1101 #endif
1102 if (wal->wal_buffer_sz < 4096) {
1103 wal->wal_buffer_sz = 4096;
1104 }
1105
1106 wal->checkpoint_buffer_sz
1107 = opts->wal.checkpoint_buffer_sz > 0
1108 ? opts->wal.checkpoint_buffer_sz :
1109 #if defined __ANDROID__ || defined TARGET_OS_IPHONE
1110 64ULL * 1024 * 1024; // 64M
1111 #else
1112 1024ULL * 1024 * 1024; // 1G
1113 #endif
1114 if (wal->checkpoint_buffer_sz < 1024 * 1024) { // 1M minimal
1115 wal->checkpoint_buffer_sz = 1024 * 1024;
1116 }
1117
1118 wal->savepoint_timeout_sec
1119 = opts->wal.savepoint_timeout_sec > 0
1120 ? opts->wal.savepoint_timeout_sec : 10; // 10 sec
1121
1122 wal->checkpoint_timeout_sec
1123 = opts->wal.checkpoint_timeout_sec > 0 ?
1124 #if defined __ANDROID__ || defined TARGET_OS_IPHONE
1125 opts->wal.checkpoint_timeout_sec : 60; // 1 min
1126 #else
1127 opts->wal.checkpoint_timeout_sec : 300; // 5 min
1128 #endif
1129
1130 if (wal->checkpoint_timeout_sec < 10) { // 10 sec minimal
1131 wal->checkpoint_timeout_sec = 10;
1132 }
1133 if (wal->savepoint_timeout_sec >= wal->checkpoint_timeout_sec) {
1134 wal->savepoint_timeout_sec = wal->checkpoint_timeout_sec / 2;
1135 }
1136
1137 wal->check_cp_crc = opts->wal.check_crc_on_checkpoint;
1138
1139 wal->buf = malloc(wal->wal_buffer_sz);
1140 if (!wal->buf) {
1141 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1142 goto finish;
1143 }
1144 wal->buf += sizeof(WBSEP);
1145 wal->bufsz = wal->wal_buffer_sz - sizeof(WBSEP);
1146
1147 // Now open WAL file
1148
1149 #ifndef _WIN32
1150 HANDLE fh = open(wal->path, O_CREAT | O_RDWR | O_CLOEXEC, IWFS_DEFAULT_FILEMODE);
1151 if (INVALIDHANDLE(fh)) {
1152 rc = iwrc_set_errno(IW_ERROR_IO_ERRNO, errno);
1153 goto finish;
1154 }
1155 #else
1156 HANDLE fh = CreateFile(wal->path, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ,
1157 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
1158 if (INVALIDHANDLE(fh)) {
1159 rc = iwrc_set_werror(IW_ERROR_IO_ERRNO, GetLastError());
1160 goto finish;
1161 }
1162 #endif
1163
1164 wal->fh = fh;
1165 rc = iwp_flock(wal->fh, IWP_WLOCK);
1166 RCGO(rc, finish);
1167
1168 // Now force all fsm data to be privately mmaped.
1169 // We will apply wal log to main database file
1170 // then re-read our private mmaps
1171 fsmopts->mmap_opts = IWFS_MMAP_PRIVATE;
1172 fsmopts->exfile.file.dlsnr = iwkv->dlsnr;
1173
1174 if (wal->oflags & IWKV_TRUNC) {
1175 rc = _truncate_wl(wal);
1176 RCGO(rc, finish);
1177 } else {
1178 rc = _recover_wl(iwkv, wal, fsmopts, recover_backup);
1179 RCGO(rc, finish);
1180 }
1181
1182 wal->open = true;
1183 // Start checkpoint thread
1184 rc = _init_cpt(wal);
1185
1186 finish:
1187 if (rc) {
1188 iwkv->dlsnr = 0;
1189 iwkv->fatalrc = iwkv->fatalrc ? iwkv->fatalrc : rc;
1190 iwal_shutdown(iwkv);
1191 _destroy(wal);
1192 }
1193 return rc;
1194 }
1195