1 #include "iwkv_internal.h"
2 #include <sys/types.h>
3 #include <fcntl.h>
4 #include <time.h>
5
6 #ifdef _WIN32
7 #include "win32/mman/mman.h"
8 #else
9
10 #include <sys/mman.h>
11
12 #endif
13
14 extern atomic_uint_fast64_t g_trigger;
15
16 #define BKP_STARTED 0x1 /**< Backup started */
17 #define BKP_WAL_CLEANUP 0x2 /**< Do checkpoint and truncate WAL file */
18 #define BKP_MAIN_COPY 0x3 /**< Copy main database file */
19 #define BKP_WAL_COPY1 0x4 /**< Copy most of WAL file content */
20 #define BKP_WAL_COPY2 0x5 /**< Copy rest of WAL file in exclusive locked mode */
21
22 typedef struct IWAL {
23 IWDLSNR lsnr;
24 atomic_bool applying; /**< WAL applying */
25 atomic_bool open; /**< Is WAL in use */
26 atomic_bool force_cp; /**< Next checkpoint scheduled */
27 atomic_bool synched; /**< WAL is synched or WBFIXPOINT is the last write operation */
28 bool force_sp; /**< Next savepoint scheduled */
29 bool check_cp_crc; /**< Check CRC32 sum of data blocks during checkpoint. Default: false */
30 iwkv_openflags oflags; /**< File open flags */
31 atomic_int bkp_stage; /**< Online backup stage */
32 size_t wal_buffer_sz; /**< WAL file intermediate buffer size. */
33 size_t checkpoint_buffer_sz; /**< Checkpoint buffer size in bytes. */
34 uint32_t bufpos; /**< Current position in buffer */
35 uint32_t bufsz; /**< Size of buffer */
36 HANDLE fh; /**< File handle */
37 uint8_t *buf; /**< File buffer */
38 char *path; /**< WAL file path */
39 pthread_mutex_t *mtxp; /**< Global WAL mutex */
40 pthread_cond_t *cpt_condp; /**< Checkpoint thread cond variable */
41 pthread_t *cptp; /**< Checkpoint thread */
42 iwrc(*wal_lock_interceptor)(bool, void *);
43 /**< Optional function called
44 - before acquiring
45 - after releasing
46 exclusive database lock by WAL checkpoint thread.
47 In the case of `before lock` first argument will be set to true */
48 void *wal_lock_interceptor_opaque;/**< Opaque data for `wal_lock_interceptor` */
49 uint32_t savepoint_timeout_sec; /**< Savepoint timeout seconds */
50 uint32_t checkpoint_timeout_sec; /**< Checkpoint timeout seconds */
51 atomic_size_t mbytes; /**< Estimated size of modifed private mmaped memory bytes */
52 off_t rollforward_offset; /**< Rollforward offset during online backup */
53 uint64_t checkpoint_ts; /**< Last checkpoint timestamp milliseconds */
54 pthread_mutex_t mtx; /**< Global WAL mutex */
55 pthread_cond_t cpt_cond; /**< Checkpoint thread cond variable */
56 pthread_t cpt; /**< Checkpoint thread */
57 IWKV iwkv;
58 } IWAL;
59
60 static iwrc _checkpoint_exl(IWAL *wal, uint64_t *tsp, bool no_fixpoint);
61
_lock(IWAL * wal)62 IW_INLINE iwrc _lock(IWAL *wal) {
63 int rci = pthread_mutex_lock(wal->mtxp);
64 return (rci ? iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci) : 0);
65 }
66
_unlock(IWAL * wal)67 IW_INLINE iwrc _unlock(IWAL *wal) {
68 int rci = pthread_mutex_unlock(wal->mtxp);
69 return (rci ? iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci) : 0);
70 }
71
_excl_lock(IWAL * wal)72 static iwrc _excl_lock(IWAL *wal) {
73 iwrc rc = 0;
74 if (wal->wal_lock_interceptor) {
75 rc = wal->wal_lock_interceptor(true, wal->wal_lock_interceptor_opaque);
76 RCRET(rc);
77 }
78 rc = iwkv_exclusive_lock(wal->iwkv);
79 if (rc) {
80 if (wal->wal_lock_interceptor) {
81 IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc);
82 }
83 return rc;
84 }
85 rc = _lock(wal);
86 if (rc) {
87 IWRC(iwkv_exclusive_unlock(wal->iwkv), rc);
88 if (wal->wal_lock_interceptor) {
89 IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc);
90 }
91 }
92 return rc;
93 }
94
_excl_unlock(IWAL * wal)95 static iwrc _excl_unlock(IWAL *wal) {
96 iwrc rc = _unlock(wal);
97 IWRC(iwkv_exclusive_unlock(wal->iwkv), rc);
98 if (wal->wal_lock_interceptor) {
99 IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc);
100 }
101 return rc;
102 }
103
_init_locks(IWAL * wal)104 static iwrc _init_locks(IWAL *wal) {
105 int rci = pthread_mutex_init(&wal->mtx, 0);
106 if (rci) {
107 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
108 }
109 wal->mtxp = &wal->mtx;
110 return 0;
111 }
112
_destroy(IWAL * wal)113 static void _destroy(IWAL *wal) {
114 if (wal) {
115 wal->open = false;
116 if (!INVALIDHANDLE(wal->fh)) {
117 iwp_unlock(wal->fh);
118 iwp_closefh(wal->fh);
119 }
120 if (wal->cpt_condp) {
121 pthread_cond_destroy(wal->cpt_condp);
122 wal->cpt_condp = 0;
123 }
124 if (wal->mtxp) {
125 pthread_mutex_destroy(wal->mtxp);
126 wal->mtxp = 0;
127 }
128 free(wal->path);
129 if (wal->buf) {
130 wal->buf -= sizeof(WBSEP);
131 free(wal->buf);
132 }
133 free(wal);
134 }
135 }
136
_flush_wl(IWAL * wal,bool sync)137 static iwrc _flush_wl(IWAL *wal, bool sync) {
138 iwrc rc = 0;
139 if (wal->bufpos) {
140 uint32_t crc = wal->check_cp_crc ? iwu_crc32(wal->buf, wal->bufpos, 0) : 0;
141 WBSEP sep = {
142 .id = WOP_SEP,
143 .crc = crc,
144 .len = wal->bufpos
145 };
146 size_t wz = wal->bufpos + sizeof(WBSEP);
147 uint8_t *wp = wal->buf - sizeof(WBSEP);
148 memcpy(wp, &sep, sizeof(WBSEP));
149 rc = iwp_write(wal->fh, wp, wz);
150 RCRET(rc);
151 wal->bufpos = 0;
152 }
153 if (sync) {
154 rc = iwp_fsync(wal->fh);
155 }
156 return rc;
157 }
158
_truncate_wl(IWAL * wal)159 IW_INLINE iwrc _truncate_wl(IWAL *wal) {
160 iwrc rc = iwp_ftruncate(wal->fh, 0);
161 RCRET(rc);
162 wal->rollforward_offset = 0;
163 rc = iwp_lseek(wal->fh, 0, IWP_SEEK_SET, 0);
164 RCRET(rc);
165 rc = iwp_fsync(wal->fh);
166 return rc;
167 }
168
_write_wl(IWAL * wal,const void * op,off_t oplen,const uint8_t * data,off_t len)169 static iwrc _write_wl(IWAL *wal, const void *op, off_t oplen, const uint8_t *data, off_t len) {
170 iwrc rc = 0;
171 const off_t bufsz = wal->bufsz;
172 wal->synched = false;
173 if (bufsz - wal->bufpos < oplen) {
174 rc = _flush_wl(wal, false);
175 RCRET(rc);
176 }
177 assert(bufsz - wal->bufpos >= oplen);
178 memcpy(wal->buf + wal->bufpos, op, (size_t) oplen);
179 wal->bufpos += oplen;
180 if (bufsz - wal->bufpos < len) {
181 rc = _flush_wl(wal, false);
182 RCRET(rc);
183 rc = iwp_write(wal->fh, data, (size_t) len);
184 RCRET(rc);
185 } else {
186 assert(bufsz - wal->bufpos >= len);
187 memcpy(wal->buf + wal->bufpos, data, (size_t) len);
188 wal->bufpos += len;
189 }
190 return rc;
191 }
192
_write_op(IWAL * wal,const void * op,off_t oplen,const uint8_t * data,off_t len)193 IW_INLINE iwrc _write_op(IWAL *wal, const void *op, off_t oplen, const uint8_t *data, off_t len) {
194 iwrc rc = _lock(wal);
195 RCRET(rc);
196 rc = _write_wl(wal, op, oplen, data, len);
197 IWRC(_unlock(wal), rc);
198 return rc;
199 }
200
iwal_sync(IWKV iwkv)201 iwrc iwal_sync(IWKV iwkv) {
202 IWAL *wal = (IWAL *) iwkv->dlsnr;
203 iwrc rc = _lock(wal);
204 RCRET(rc);
205 rc = _flush_wl(wal, true);
206 IWRC(_unlock(wal), rc);
207 return rc;
208 }
209
_onopen(struct IWDLSNR * self,const char * path,int mode)210 static iwrc _onopen(struct IWDLSNR *self, const char *path, int mode) {
211 return 0;
212 }
213
_onclosing(struct IWDLSNR * self)214 static iwrc _onclosing(struct IWDLSNR *self) {
215 IWAL *wal = (IWAL *) self;
216 #ifdef IW_TESTS
217 uint64_t tv = g_trigger;
218 if (tv & IWKVD_WAL_NO_CHECKPOINT_ON_CLOSE) {
219 _destroy(wal);
220 return 0;
221 }
222 #endif
223 iwrc rc = _checkpoint_exl(wal, 0, false);
224 _destroy(wal);
225 return rc;
226 }
227
_onset(struct IWDLSNR * self,off_t off,uint8_t val,off_t len,int flags)228 static iwrc _onset(struct IWDLSNR *self, off_t off, uint8_t val, off_t len, int flags) {
229 IWAL *wal = (IWAL *) self;
230 if (wal->applying) {
231 return 0;
232 }
233 WBSET wb = {
234 .id = WOP_SET,
235 .val = val,
236 .off = off,
237 .len = len
238 };
239 wal->mbytes += len;
240 return _write_op((IWAL *) self, &wb, sizeof(wb), 0, 0);
241 }
242
_oncopy(struct IWDLSNR * self,off_t off,off_t len,off_t noff,int flags)243 static iwrc _oncopy(struct IWDLSNR *self, off_t off, off_t len, off_t noff, int flags) {
244 IWAL *wal = (IWAL *) self;
245 if (wal->applying) {
246 return 0;
247 }
248 WBCOPY wb = {
249 .id = WOP_COPY,
250 .off = off,
251 .len = len,
252 .noff = noff
253 };
254 wal->mbytes += len;
255 return _write_op(wal, &wb, sizeof(wb), 0, 0);
256 }
257
_onwrite(struct IWDLSNR * self,off_t off,const void * buf,off_t len,int flags)258 static iwrc _onwrite(struct IWDLSNR *self, off_t off, const void *buf, off_t len, int flags) {
259 assert(len <= (size_t)(-1));
260 IWAL *wal = (IWAL *) self;
261 if (wal->applying) {
262 return 0;
263 }
264 WBWRITE wb = {
265 .id = WOP_WRITE,
266 .crc = wal->check_cp_crc ? iwu_crc32(buf, len, 0) : 0,
267 .len = len,
268 .off = off
269 };
270 wal->mbytes += len;
271 return _write_op(wal, &wb, sizeof(wb), buf, len);
272 }
273
_onresize(struct IWDLSNR * self,off_t osize,off_t nsize,int flags,bool * handled)274 static iwrc _onresize(struct IWDLSNR *self, off_t osize, off_t nsize, int flags, bool *handled) {
275 IWAL *wal = (IWAL *) self;
276 if (wal->applying) {
277 *handled = false;
278 return 0;
279 }
280 *handled = true;
281 WBRESIZE wb = {
282 .id = WOP_RESIZE,
283 .osize = osize,
284 .nsize = nsize
285 };
286 iwrc rc = _lock(wal);
287 RCRET(rc);
288 rc = _write_wl(wal, &wb, sizeof(wb), 0, 0);
289 RCGO(rc, finish);
290 rc = _checkpoint_exl(wal, 0, true);
291 finish:
292 IWRC(_unlock(wal), rc);
293 return rc;
294 }
295
_onsynced(struct IWDLSNR * self,int flags)296 static iwrc _onsynced(struct IWDLSNR *self, int flags) {
297 IWAL *wal = (IWAL *) self;
298 if (wal->applying) {
299 return 0;
300 }
301 iwrc rc = _lock(wal);
302 RCRET(rc);
303 rc = _flush_wl(wal, true);
304 IWRC(_unlock(wal), rc);
305 return rc;
306 }
307
_last_fix_and_reset_points(IWAL * wal,uint8_t * wmm,off_t fsz,off_t * fpos,off_t * rpos)308 static void _last_fix_and_reset_points(IWAL *wal, uint8_t *wmm, off_t fsz, off_t *fpos, off_t *rpos) {
309 uint8_t *rp = wmm;
310 *fpos = 0;
311 *rpos = 0;
312
313 for (uint32_t i = 0; rp - wmm < fsz; ++i) {
314 uint8_t opid;
315 off_t avail = fsz - (rp - wmm);
316 memcpy(&opid, rp, 1);
317 if (i == 0 && opid != WOP_SEP) {
318 return;
319 }
320 switch (opid) {
321 case WOP_SEP: {
322 WBSEP wb;
323 if (avail < sizeof(wb)) {
324 return;
325 }
326 memcpy(&wb, rp, sizeof(wb));
327 rp += sizeof(wb);
328 if (wb.len > avail) {
329 return;
330 }
331 break;
332 }
333 case WOP_SET: {
334 if (avail < sizeof(WBSET)) {
335 return;
336 }
337 rp += sizeof(WBSET);
338 break;
339 }
340 case WOP_COPY: {
341 if (avail < sizeof(WBCOPY)) {
342 return;
343 }
344 rp += sizeof(WBCOPY);
345 break;
346 }
347 case WOP_WRITE: {
348 WBWRITE wb;
349 if (avail < sizeof(wb)) {
350 return;
351 }
352 memcpy(&wb, rp, sizeof(wb));
353 rp += sizeof(wb);
354 if (avail < wb.len) {
355 return;
356 }
357 rp += wb.len;
358 break;
359 }
360 case WOP_RESIZE: {
361 if (avail < sizeof(WBRESIZE)) {
362 return;
363 }
364 rp += sizeof(WBRESIZE);
365 break;
366 }
367 case WOP_FIXPOINT: {
368 *fpos = (rp - wmm);
369 rp += sizeof(WBFIXPOINT);
370 break;
371 }
372 case WOP_RESET: {
373 *rpos = (rp - wmm);
374 rp += sizeof(WBRESET);
375 break;
376 }
377 default: {
378 return;
379 break;
380 }
381 }
382 }
383 }
384
_rollforward_exl(IWAL * wal,IWFS_EXT * extf,int recover_mode)385 static iwrc _rollforward_exl(IWAL *wal, IWFS_EXT *extf, int recover_mode) {
386 assert(wal->bufpos == 0);
387 off_t fsz = 0;
388 iwrc rc = iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz);
389 RCRET(rc);
390 if (!fsz) { // empty wal log
391 return 0;
392 }
393 size_t sp;
394 uint8_t *mm;
395 const bool ccrc = wal->check_cp_crc;
396 off_t fpos = 0; // checkpoint
397 #ifndef _WIN32
398 off_t pfsz = IW_ROUNDUP(fsz, iwp_page_size());
399 uint8_t *wmm = mmap(0, (size_t) pfsz, PROT_READ, MAP_PRIVATE, wal->fh, 0);
400 madvise(wmm, (size_t) fsz, MADV_SEQUENTIAL);
401 #else
402 off_t pfsz = fsz;
403 uint8_t *wmm = mmap(0, 0, PROT_READ, MAP_PRIVATE, wal->fh, 0);
404 #endif
405 if (wmm == MAP_FAILED) {
406 return iwrc_set_errno(IW_ERROR_ERRNO, errno);
407 }
408 // Temporary turn off extf locking
409 wal->applying = true;
410
411 // Remap fsm in MAP_SHARED mode
412 extf->remove_mmap_unsafe(extf, 0);
413 rc = extf->add_mmap_unsafe(extf, 0, SIZE_T_MAX, IWFS_MMAP_SHARED);
414 if (rc) {
415 munmap(wmm, (size_t) pfsz);
416 wal->iwkv->fatalrc = rc;
417 wal->applying = false;
418 return rc;
419 }
420
421 #define _WAL_CORRUPTED(msg_) do { \
422 rc = IWKV_ERROR_CORRUPTED_WAL_FILE; \
423 iwlog_ecode_error2(rc, msg_); \
424 goto finish; \
425 } while(0);
426
427 if (recover_mode) {
428 off_t rpos; // reset point
429 _last_fix_and_reset_points(wal, wmm, fsz, &fpos, &rpos);
430 if (!fpos) {
431 goto finish;
432 }
433 if (rpos > 0 && recover_mode == 1) {
434 // Recover from last known reset point
435 if (fpos < rpos) {
436 goto finish;
437 }
438 // WBSEP__WBRESET
439 // \_rpos
440 rpos -= sizeof(WBSEP);
441 // WBSEP__WBRESET
442 // \_rpos
443 wmm += rpos;
444 fsz -= rpos;
445 }
446 } else if (wal->rollforward_offset > 0) {
447 if (wal->rollforward_offset >= fsz) {
448 _WAL_CORRUPTED("Invalid rollforward offset");
449 }
450 wmm += wal->rollforward_offset;
451 fsz -= wal->rollforward_offset;
452 }
453
454 uint8_t *rp = wmm;
455 for (uint32_t i = 0; rp - wmm < fsz; ++i) {
456 uint8_t opid;
457 off_t avail = fsz - (rp - wmm);
458 memcpy(&opid, rp, 1);
459 if (i == 0 && opid != WOP_SEP) {
460 rc = IWKV_ERROR_CORRUPTED_WAL_FILE;
461 goto finish;
462 }
463 switch (opid) {
464 case WOP_SEP: {
465 WBSEP wb;
466 if (avail < sizeof(wb)) _WAL_CORRUPTED("Premature end of WAL (WBSEP)");
467 memcpy(&wb, rp, sizeof(wb));
468 rp += sizeof(wb);
469 if (wb.len > avail) _WAL_CORRUPTED("Premature end of WAL (WBSEP)");
470 if (ccrc && wb.crc) {
471 uint32_t crc = iwu_crc32(rp, wb.len, 0);
472 if (crc != wb.crc) {
473 _WAL_CORRUPTED("Invalid CRC32 checksum of WAL segment (WBSEP)");
474 }
475 }
476 break;
477 }
478 case WOP_SET: {
479 WBSET wb;
480 if (avail < sizeof(wb)) _WAL_CORRUPTED("Premature end of WAL (WBSET)");
481 memcpy(&wb, rp, sizeof(wb));
482 rp += sizeof(wb);
483 rc = extf->probe_mmap_unsafe(extf, 0, &mm, &sp);
484 RCGO(rc, finish);
485 memset(mm + wb.off, wb.val, (size_t) wb.len);
486 break;
487 }
488 case WOP_COPY: {
489 WBCOPY wb;
490 if (avail < sizeof(wb)) _WAL_CORRUPTED("Premature end of WAL (WBCOPY)");
491 memcpy(&wb, rp, sizeof(wb));
492 rp += sizeof(wb);
493 rc = extf->probe_mmap_unsafe(extf, 0, &mm, &sp);
494 RCGO(rc, finish);
495 memmove(mm + wb.noff, mm + wb.off, (size_t) wb.len);
496 break;
497 }
498 case WOP_WRITE: {
499 WBWRITE wb;
500 if (avail < sizeof(wb)) _WAL_CORRUPTED("Premature end of WAL (WBWRITE)");
501 memcpy(&wb, rp, sizeof(wb));
502 rp += sizeof(wb);
503 if (avail < wb.len) _WAL_CORRUPTED("Premature end of WAL (WBWRITE)");
504 if (ccrc && wb.crc) {
505 uint32_t crc = iwu_crc32(rp, wb.len, 0);
506 if (crc != wb.crc) {
507 _WAL_CORRUPTED("Invalid CRC32 checksum of WAL segment (WBWRITE)");
508 }
509 }
510 rc = extf->probe_mmap_unsafe(extf, 0, &mm, &sp);
511 RCGO(rc, finish);
512 memmove(mm + wb.off, rp, wb.len);
513 rp += wb.len;
514 break;
515 }
516 case WOP_RESIZE: {
517 WBRESIZE wb;
518 if (avail < sizeof(wb)) _WAL_CORRUPTED("Premature end of WAL (WBRESIZE)");
519 memcpy(&wb, rp, sizeof(wb));
520 rp += sizeof(wb);
521 rc = extf->truncate_unsafe(extf, wb.nsize);
522 RCGO(rc, finish);
523 break;
524 }
525 case WOP_FIXPOINT:
526 if (fpos == rp - wmm) { // last fixpoint to
527 WBFIXPOINT wb;
528 memcpy(&wb, rp, sizeof(wb));
529 iwlog_warn("Database recovered at point of time: %"
530 PRIu64
531 " ms since epoch\n", wb.ts);
532 goto finish;
533 }
534 rp += sizeof(WBFIXPOINT);
535 break;
536 case WOP_RESET: {
537 rp += sizeof(WBRESET);
538 break;
539 }
540 default: {
541 _WAL_CORRUPTED("Invalid WAL command");
542 break;
543 }
544 }
545 }
546 #undef _WAL_CORRUPTED
547
548 finish:
549 if (!rc) {
550 rc = extf->sync_mmap_unsafe(extf, 0, IWFS_SYNCDEFAULT);
551 }
552 munmap(wmm, (size_t) pfsz);
553 IWRC(extf->remove_mmap_unsafe(extf, 0), rc);
554 IWRC(extf->add_mmap_unsafe(extf, 0, SIZE_T_MAX, IWFS_MMAP_PRIVATE), rc);
555 if (!rc) {
556 int stage = wal->bkp_stage;
557 if (stage == 0 || stage == BKP_WAL_CLEANUP) {
558 rc = _truncate_wl(wal);
559 } else {
560 // Don't truncate WAL during online backup.
561 // Just append the WBRESET mark
562 WBRESET wb = {
563 .id = WOP_RESET
564 };
565 IWRC(_flush_wl(wal, false), rc);
566 // Write: WBSEP + WBRESET
567 IWRC(_write_wl(wal, &wb, sizeof(wb), 0, 0), rc);
568 IWRC(_flush_wl(wal, true), rc);
569 IWRC(iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz), rc);
570 if (!rc) {
571 // rollforward_offset points here --> WBSEP __ WBRESET __ EOF
572 wal->rollforward_offset = fsz - (sizeof(WBSEP) + sizeof(WBRESET));
573 }
574 }
575 }
576 if (rc && !wal->iwkv->fatalrc) {
577 wal->iwkv->fatalrc = rc;
578 }
579 wal->synched = true;
580 wal->applying = false;
581 return rc;
582 }
583
_recover_wl(IWKV iwkv,IWAL * wal,IWFS_FSM_OPTS * fsmopts,bool recover_backup)584 static iwrc _recover_wl(IWKV iwkv, IWAL *wal, IWFS_FSM_OPTS *fsmopts, bool recover_backup) {
585 off_t fsz = 0;
586 iwrc rc = iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz);
587 RCRET(rc);
588 if (!fsz) { // empty wal log
589 return 0;
590 }
591 IWFS_EXT extf;
592 IWFS_EXT_OPTS extopts;
593 memcpy(&extopts, &fsmopts->exfile, sizeof(extopts));
594 extopts.use_locks = false;
595 extopts.file.omode = IWFS_OCREATE | IWFS_OWRITE;
596 extopts.file.dlsnr = 0;
597 rc = iwfs_exfile_open(&extf, &extopts);
598 RCRET(rc);
599 rc = _rollforward_exl(wal, &extf, recover_backup ? 2 : 1);
600 IWRC(extf.close(&extf), rc);
601 return rc;
602 }
603
_need_checkpoint(IWAL * wal)604 IW_INLINE bool _need_checkpoint(IWAL *wal) {
605 uint64_t mbytes = wal->mbytes;
606 bool force = wal->force_cp;
607 return (force || mbytes >= wal->checkpoint_buffer_sz);
608 }
609
_checkpoint_exl(IWAL * wal,uint64_t * tsp,bool no_fixpoint)610 static iwrc _checkpoint_exl(IWAL *wal, uint64_t *tsp, bool no_fixpoint) {
611 if (tsp) {
612 *tsp = 0;
613 }
614 int stage = wal->bkp_stage;
615 if (stage == BKP_MAIN_COPY) {
616 // No checkpoints during main file copying
617 return 0;
618 }
619 iwrc rc = 0;
620 IWFS_EXT *extf;
621 IWKV iwkv = wal->iwkv;
622 if (!no_fixpoint) {
623 wal->force_cp = false;
624 wal->force_sp = false;
625 WBFIXPOINT wb = {
626 .id = WOP_FIXPOINT
627 };
628 rc = iwp_current_time_ms(&wb.ts, false);
629 RCGO(rc, finish);
630 rc = _write_wl(wal, &wb, sizeof(wb), 0, 0);
631 RCGO(rc, finish);
632 }
633 rc = _flush_wl(wal, true);
634 RCGO(rc, finish);
635 rc = iwkv->fsm.extfile(&iwkv->fsm, &extf);
636 RCGO(rc, finish);
637
638 rc = _rollforward_exl(wal, extf, 0);
639 wal->mbytes = 0;
640 wal->synched = true;
641 iwp_current_time_ms(&wal->checkpoint_ts, true);
642 if (tsp) {
643 *tsp = wal->checkpoint_ts;
644 }
645
646 finish:
647 if (rc) {
648 if (iwkv->fatalrc) {
649 iwlog_ecode_error3(rc);
650 } else {
651 iwkv->fatalrc = rc;
652 }
653 }
654 return rc;
655 }
656
657 #ifdef IW_TESTS
658
iwal_test_checkpoint(IWKV iwkv)659 iwrc iwal_test_checkpoint(IWKV iwkv) {
660 if (!iwkv->dlsnr) {
661 return IWKV_ERROR_WAL_MODE_REQUIRED;
662 }
663 IWAL *wal = (IWAL *) iwkv->dlsnr;
664 iwrc rc = _excl_lock(wal);
665 RCRET(rc);
666 rc = _checkpoint_exl(wal, 0, false);
667 IWRC(_excl_unlock(wal), rc);
668 return rc;
669 }
670
671 #endif
672
673 //--------------------------------------- Public API
674
iwal_poke_checkpoint(IWKV iwkv,bool force)675 WUR iwrc iwal_poke_checkpoint(IWKV iwkv, bool force) {
676 IWAL *wal = (IWAL *) iwkv->dlsnr;
677 if (!wal || !(force || _need_checkpoint(wal))) {
678 return 0;
679 }
680 iwrc rc = _lock(wal);
681 RCRET(rc);
682 bool cforce = wal->force_cp;
683 if (cforce) { // Forced already
684 _unlock(wal);
685 return 0;
686 } else if (force) {
687 wal->force_cp = true;
688 } else if (!_need_checkpoint(wal)) {
689 _unlock(wal);
690 return 0;
691 }
692 int rci = pthread_cond_broadcast(wal->cpt_condp);
693 if (rci) {
694 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
695 }
696 _unlock(wal);
697 return rc;
698 }
699
iwal_poke_savepoint(IWKV iwkv)700 iwrc iwal_poke_savepoint(IWKV iwkv) {
701 IWAL *wal = (IWAL *) iwkv->dlsnr;
702 if (!wal) {
703 return 0;
704 }
705 iwrc rc = _lock(wal);
706 RCRET(rc);
707 bool fsp = wal->force_sp;
708 if (!fsp) {
709 wal->force_sp = true;
710 int rci = pthread_cond_broadcast(wal->cpt_condp);
711 if (rci) {
712 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
713 }
714 }
715 _unlock(wal);
716 return rc;
717 }
718
_savepoint_exl(IWAL * wal,uint64_t * tsp,bool sync)719 iwrc _savepoint_exl(IWAL *wal, uint64_t *tsp, bool sync) {
720 if (tsp) {
721 *tsp = 0;
722 }
723 wal->force_sp = false;
724 WBFIXPOINT wbfp = {
725 .id = WOP_FIXPOINT
726 };
727 iwrc rc = iwp_current_time_ms(&wbfp.ts, false);
728 RCRET(rc);
729 rc = _write_wl(wal, &wbfp, sizeof(wbfp), 0, 0);
730 RCRET(rc);
731 rc = _flush_wl(wal, sync);
732 RCRET(rc);
733 if (sync) {
734 wal->synched = true;
735 }
736 if (tsp) {
737 *tsp = wbfp.ts;
738 }
739 return 0;
740 }
741
iwal_synched(IWKV iwkv)742 bool iwal_synched(IWKV iwkv) {
743 IWAL *wal = (IWAL *) iwkv->dlsnr;
744 if (!wal) {
745 return false;
746 }
747 return wal->synched;
748 }
749
iwal_savepoint_exl(IWKV iwkv,bool sync)750 iwrc iwal_savepoint_exl(IWKV iwkv, bool sync) {
751 IWAL *wal = (IWAL *) iwkv->dlsnr;
752 if (!wal) {
753 return 0;
754 }
755 return _savepoint_exl(wal, 0, sync);
756 }
757
iwal_shutdown(IWKV iwkv)758 void iwal_shutdown(IWKV iwkv) {
759 IWAL *wal = (IWAL *) iwkv->dlsnr;
760 if (!wal) {
761 return;
762 }
763 while (wal->bkp_stage) { // todo: review
764 iwp_sleep(50);
765 }
766 wal->open = false;
767 if (wal->mtxp && wal->cpt_condp) {
768 pthread_mutex_lock(wal->mtxp);
769 pthread_cond_broadcast(wal->cpt_condp);
770 pthread_mutex_unlock(wal->mtxp);
771 }
772 if (wal->cptp) {
773 pthread_join(wal->cpt, 0);
774 wal->cpt = 0;
775 }
776 }
777
_cpt_worker_fn(void * op)778 static void *_cpt_worker_fn(void *op) {
779 int rci;
780 iwrc rc = 0;
781 IWAL *wal = op;
782 IWKV iwkv = wal->iwkv;
783 uint64_t savepoint_ts = 0;
784
785 while (wal->open) {
786 struct timespec tp;
787 uint64_t tick_ts;
788 bool sp = false, cp = false;
789 rc = _lock(wal);
790 RCBREAK(rc);
791
792 if (_need_checkpoint(wal)) {
793 cp = true;
794 _unlock(wal);
795 goto cprun;
796 } else if (wal->force_sp) {
797 sp = true;
798 _unlock(wal);
799 goto cprun;
800 }
801
802 #if defined(IW_HAVE_CLOCK_MONOTONIC) && defined(IW_HAVE_PTHREAD_CONDATTR_SETCLOCK)
803 rc = iwp_clock_get_time(CLOCK_MONOTONIC, &tp);
804 #else
805 rc = iwp_clock_get_time(CLOCK_REALTIME, &tp);
806 #endif
807 if (rc) {
808 _unlock(wal);
809 break;
810 }
811 tp.tv_sec += 1; // one sec tick
812 tick_ts = tp.tv_sec * 1000 + (uint64_t) round(tp.tv_nsec / 1.0e6);
813 rci = pthread_cond_timedwait(wal->cpt_condp, wal->mtxp, &tp);
814 if (rci && rci != ETIMEDOUT) {
815 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
816 _unlock(wal);
817 break;
818 }
819 if (!wal->open || iwkv->fatalrc) {
820 _unlock(wal);
821 break;
822 }
823 bool synched = wal->synched;
824 size_t mbytes = wal->mbytes;
825 cp = _need_checkpoint(wal) || ((mbytes && (tick_ts - wal->checkpoint_ts) >= 1000LL * wal->checkpoint_timeout_sec));
826 if (!cp) {
827 sp = !synched && (wal->force_sp || ((tick_ts - savepoint_ts) >= 1000LL * wal->savepoint_timeout_sec));
828 }
829 _unlock(wal);
830
831 cprun:
832 if (cp || sp) {
833 rc = _excl_lock(wal);
834 RCBREAK(rc);
835 if (iwkv->open) {
836 if (cp) {
837 rc = _checkpoint_exl(wal, &savepoint_ts, false);
838 } else {
839 rc = _savepoint_exl(wal, &savepoint_ts, true);
840 }
841 }
842 _excl_unlock(wal);
843 if (rc) {
844 iwlog_ecode_error2(rc, "WAL worker savepoint/checkpoint error\n");
845 rc = 0;
846 }
847 }
848 }
849 if (rc) {
850 iwkv->fatalrc = iwkv->fatalrc ? iwkv->fatalrc : rc;
851 iwlog_ecode_error2(rc, "WAL worker exited with error\n");
852 }
853 return 0;
854 }
855
iwal_online_backup(IWKV iwkv,uint64_t * ts,const char * target_file)856 iwrc iwal_online_backup(IWKV iwkv, uint64_t *ts, const char *target_file) {
857 iwrc rc;
858 size_t sp;
859 uint32_t lv;
860 uint64_t llv;
861 char buf[16384];
862 off_t off = 0, fsize = 0;
863 *ts = 0;
864
865 if (!target_file) {
866 return IW_ERROR_INVALID_ARGS;
867 }
868 IWAL *wal = (IWAL *) iwkv->dlsnr;
869 if (!wal) {
870 return IWKV_ERROR_WAL_MODE_REQUIRED;
871 }
872 rc = _lock(wal);
873 RCRET(rc);
874 if (wal->bkp_stage) {
875 rc = IWKV_ERROR_BACKUP_IN_PROGRESS;
876 } else {
877 wal->bkp_stage = BKP_STARTED;
878 }
879 _unlock(wal);
880
881 #ifndef _WIN32
882 HANDLE fh = open(target_file, O_CREAT | O_WRONLY | O_TRUNC, 00600);
883 if (INVALIDHANDLE(fh)) {
884 rc = iwrc_set_errno(IW_ERROR_IO_ERRNO, errno);
885 goto finish;
886 }
887 #else
888 HANDLE fh = CreateFile(target_file, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE,
889 NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
890 if (INVALIDHANDLE(fh)) {
891 rc = iwrc_set_werror(IW_ERROR_IO_ERRNO, GetLastError());
892 goto finish;
893 }
894 #endif
895
896 // Flush all pending WAL changes
897 rc = _excl_lock(wal);
898 RCGO(rc, finish);
899 wal->bkp_stage = BKP_WAL_CLEANUP;
900 rc = _checkpoint_exl(wal, 0, false);
901 wal->bkp_stage = BKP_MAIN_COPY;
902 _excl_unlock(wal);
903 RCGO(rc, finish);
904
905 // Copy main database file
906 IWFS_FSM_STATE fstate = {0};
907 rc = iwkv->fsm.state(&iwkv->fsm, &fstate);
908 RCGO(rc, finish);
909 do {
910 rc = iwp_pread(fstate.exfile.file.fh, off, buf, sizeof(buf), &sp);
911 RCGO(rc, finish);
912 if (sp > 0) {
913 rc = iwp_write(fh, buf, sp);
914 RCGO(rc, finish);
915 off += sp;
916 }
917 } while (sp > 0);
918
919 // Copy most of WAL file content
920 rc = _lock(wal);
921 RCGO(rc, finish);
922 wal->bkp_stage = BKP_WAL_COPY1;
923 rc = _flush_wl(wal, false);
924 _unlock(wal);
925 RCGO(rc, finish);
926
927 fsize = off;
928 off = 0;
929 do {
930 rc = iwp_pread(wal->fh, off, buf, sizeof(buf), &sp);
931 RCGO(rc, finish);
932 if (sp > 0) {
933 rc = iwp_write(fh, buf, sp);
934 RCGO(rc, finish);
935 off += sp;
936 }
937 } while (sp > 0);
938
939
940 // Copy rest of WAL file in exclusive locked mode
941 rc = _excl_lock(wal);
942 RCGO(rc, finish);
943 wal->bkp_stage = BKP_WAL_COPY2;
944 rc = _savepoint_exl(wal, ts, true);
945 RCGO(rc, unlock);
946 do {
947 rc = iwp_pread(wal->fh, off, buf, sizeof(buf), &sp);
948 RCGO(rc, unlock);
949 if (sp > 0) {
950 rc = iwp_write(fh, buf, sp);
951 RCGO(rc, unlock);
952 off += sp;
953 }
954 } while (sp > 0);
955
956 llv = IW_HTOILL(fsize);
957 rc = iwp_write(fh, &llv, sizeof(llv));
958 RCGO(rc, unlock);
959
960 lv = IW_HTOIL(IWKV_BACKUP_MAGIC);
961 rc = iwp_write(fh, &lv, sizeof(lv));
962 RCGO(rc, unlock);
963
964 unlock:
965 wal->bkp_stage = 0;
966 IWRC(_excl_unlock(wal), rc);
967
968 finish:
969 if (rc) {
970 _lock(wal);
971 wal->bkp_stage = 0;
972 _unlock(wal);
973 } else {
974 rc = iwal_poke_checkpoint(iwkv, true);
975 }
976 if (!INVALIDHANDLE(fh)) {
977 IWRC(iwp_fdatasync(fh), rc);
978 IWRC(iwp_closefh(fh), rc);
979 }
980 return rc;
981 }
982
_init_cpt(IWAL * wal)983 iwrc _init_cpt(IWAL *wal) {
984 if (wal->savepoint_timeout_sec == UINT32_MAX
985 && wal->checkpoint_timeout_sec == UINT32_MAX) {
986 // do not start checkpoint thread
987 return 0;
988 }
989 pthread_attr_t pattr;
990 pthread_condattr_t cattr;
991 int rci = pthread_condattr_init(&cattr);
992 if (rci) {
993 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
994 }
995 #if defined(IW_HAVE_CLOCK_MONOTONIC) && defined(IW_HAVE_PTHREAD_CONDATTR_SETCLOCK)
996 rci = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
997 if (rci) {
998 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
999 }
1000 #endif
1001 rci = pthread_cond_init(&wal->cpt_cond, &cattr);
1002 if (rci) {
1003 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1004 }
1005 wal->cpt_condp = &wal->cpt_cond;
1006 rci = pthread_attr_init(&pattr);
1007 if (rci) {
1008 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1009 }
1010 pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_JOINABLE);
1011 rci = pthread_create(&wal->cpt, &pattr, _cpt_worker_fn, wal);
1012 if (rci) {
1013 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1014 }
1015 wal->cptp = &wal->cpt;
1016 return 0;
1017 }
1018
iwal_create(IWKV iwkv,const IWKV_OPTS * opts,IWFS_FSM_OPTS * fsmopts,bool recover_backup)1019 iwrc iwal_create(IWKV iwkv, const IWKV_OPTS *opts, IWFS_FSM_OPTS *fsmopts, bool recover_backup) {
1020 assert(!iwkv->dlsnr && opts && fsmopts);
1021 if (!opts) {
1022 return IW_ERROR_INVALID_ARGS;
1023 }
1024 if ((opts->oflags & IWKV_RDONLY) || !opts->wal.enabled) {
1025 return 0;
1026 }
1027 iwrc rc = 0;
1028 IWAL *wal = calloc(1, sizeof(*wal));
1029 if (!wal) {
1030 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1031 }
1032
1033 wal->wal_lock_interceptor = opts->wal.wal_lock_interceptor;
1034 wal->wal_lock_interceptor_opaque = opts->wal.wal_lock_interceptor_opaque;
1035
1036 size_t sz = strlen(opts->path);
1037 char *wpath = malloc(sz + 4 /*-wal*/ + 1 /*\0*/);
1038 if (!wpath) {
1039 free(wal);
1040 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1041 }
1042 memcpy(wpath, opts->path, sz);
1043 memcpy(wpath + sz, "-wal", 4);
1044 wpath[sz + 4] = '\0';
1045
1046 wal->fh = INVALID_HANDLE_VALUE;
1047 wal->path = wpath;
1048 wal->oflags = opts->oflags;
1049 wal->iwkv = iwkv;
1050 iwp_current_time_ms(&wal->checkpoint_ts, true);
1051
1052 rc = _init_locks(wal);
1053 RCGO(rc, finish);
1054
1055 IWDLSNR *dlsnr = &wal->lsnr;
1056 dlsnr->onopen = _onopen;
1057 dlsnr->onclosing = _onclosing;
1058 dlsnr->onset = _onset;
1059 dlsnr->oncopy = _oncopy;
1060 dlsnr->onwrite = _onwrite;
1061 dlsnr->onresize = _onresize;
1062 dlsnr->onsynced = _onsynced;
1063 iwkv->dlsnr = (IWDLSNR *) wal;
1064
1065 wal->wal_buffer_sz =
1066 opts->wal.wal_buffer_sz > 0 ?
1067 opts->wal.wal_buffer_sz :
1068 #if defined __ANDROID__ || defined TARGET_OS_IPHONE
1069 2 * 1024 * 1024; // 2M
1070 #else
1071 8 * 1024 * 1024; // 8M
1072 #endif
1073 if (wal->wal_buffer_sz < 4096) {
1074 wal->wal_buffer_sz = 4096;
1075 }
1076
1077 wal->checkpoint_buffer_sz
1078 = opts->wal.checkpoint_buffer_sz > 0 ?
1079 opts->wal.checkpoint_buffer_sz :
1080 #if defined __ANDROID__ || defined TARGET_OS_IPHONE
1081 64ULL * 1024 * 1024; // 64M
1082 #else
1083 1024ULL * 1024 * 1024; // 1G
1084 #endif
1085 if (wal->checkpoint_buffer_sz < 1024 * 1024) { // 1M minimal
1086 wal->checkpoint_buffer_sz = 1024 * 1024;
1087 }
1088
1089 wal->savepoint_timeout_sec
1090 = opts->wal.savepoint_timeout_sec > 0 ?
1091 opts->wal.savepoint_timeout_sec : 10; // 10 sec
1092
1093 wal->checkpoint_timeout_sec
1094 = opts->wal.checkpoint_timeout_sec > 0 ?
1095 #if defined __ANDROID__ || defined TARGET_OS_IPHONE
1096 opts->wal.checkpoint_timeout_sec : 60; // 1 min
1097 #else
1098 opts->wal.checkpoint_timeout_sec : 300; // 5 min
1099 #endif
1100
1101 if (wal->checkpoint_timeout_sec < 10) { // 10 sec minimal
1102 wal->checkpoint_timeout_sec = 10;
1103 }
1104 if (wal->savepoint_timeout_sec >= wal->checkpoint_timeout_sec) {
1105 wal->savepoint_timeout_sec = wal->checkpoint_timeout_sec / 2;
1106 }
1107
1108 wal->check_cp_crc = opts->wal.check_crc_on_checkpoint;
1109
1110 wal->buf = malloc(wal->wal_buffer_sz);
1111 if (!wal->buf) {
1112 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1113 goto finish;
1114 }
1115 wal->buf += sizeof(WBSEP);
1116 wal->bufsz = wal->wal_buffer_sz - sizeof(WBSEP);
1117
1118 // Now open WAL file
1119
1120 #ifndef _WIN32
1121 HANDLE fh = open(wal->path, O_CREAT | O_RDWR, IWFS_DEFAULT_FILEMODE);
1122 if (INVALIDHANDLE(fh)) {
1123 rc = iwrc_set_errno(IW_ERROR_IO_ERRNO, errno);
1124 goto finish;
1125 }
1126 #else
1127 HANDLE fh = CreateFile(wal->path, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ,
1128 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
1129 if (INVALIDHANDLE(fh)) {
1130 rc = iwrc_set_werror(IW_ERROR_IO_ERRNO, GetLastError());
1131 goto finish;
1132 }
1133 #endif
1134
1135 wal->fh = fh;
1136 rc = iwp_flock(wal->fh, IWP_WLOCK);
1137 RCGO(rc, finish);
1138
1139 // Now force all fsm data to be privately mmaped.
1140 // We will apply wal log to main database file
1141 // then re-read our private mmaps
1142 fsmopts->mmap_opts = IWFS_MMAP_PRIVATE;
1143 fsmopts->exfile.file.dlsnr = iwkv->dlsnr;
1144
1145 if (wal->oflags & IWKV_TRUNC) {
1146 rc = _truncate_wl(wal);
1147 RCGO(rc, finish);
1148 } else {
1149 rc = _recover_wl(iwkv, wal, fsmopts, recover_backup);
1150 RCGO(rc, finish);
1151 }
1152
1153 wal->open = true;
1154 // Start checkpoint thread
1155 rc = _init_cpt(wal);
1156
1157 finish:
1158 if (rc) {
1159 iwkv->dlsnr = 0;
1160 iwkv->fatalrc = iwkv->fatalrc ? iwkv->fatalrc : rc;
1161 iwal_shutdown(iwkv);
1162 _destroy(wal);
1163 }
1164 return rc;
1165 }
1166