• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "iwkv_internal.h"
2 #include <sys/types.h>
3 #include <fcntl.h>
4 #include <time.h>
5 
6 #ifdef _WIN32
7 #include "win32/mman/mman.h"
8 #else
9 
10 #include <sys/mman.h>
11 
12 #endif
13 
14 extern atomic_uint_fast64_t g_trigger;
15 
16 #define BKP_STARTED           0x1 /**< Backup started */
17 #define BKP_WAL_CLEANUP       0x2 /**< Do checkpoint and truncate WAL file */
18 #define BKP_MAIN_COPY         0x3 /**< Copy main database file */
19 #define BKP_WAL_COPY1         0x4 /**< Copy most of WAL file content */
20 #define BKP_WAL_COPY2         0x5 /**< Copy rest of WAL file in exclusive locked mode */
21 
22 typedef struct IWAL {
23   IWDLSNR lsnr;
24   atomic_bool applying;             /**< WAL applying */
25   atomic_bool open;                 /**< Is WAL in use */
26   atomic_bool force_cp;             /**< Next checkpoint scheduled */
27   atomic_bool synched;              /**< WAL is synched or WBFIXPOINT is the last write operation */
28   bool force_sp;                    /**< Next savepoint scheduled */
29   bool check_cp_crc;                /**< Check CRC32 sum of data blocks during checkpoint. Default: false  */
30   iwkv_openflags oflags;            /**< File open flags */
31   atomic_int bkp_stage;             /**< Online backup stage */
32   size_t wal_buffer_sz;             /**< WAL file intermediate buffer size. */
33   size_t checkpoint_buffer_sz;      /**< Checkpoint buffer size in bytes. */
34   uint32_t bufpos;                  /**< Current position in buffer */
35   uint32_t bufsz;                   /**< Size of buffer */
36   HANDLE fh;                        /**< File handle */
37   uint8_t *buf;                     /**< File buffer */
38   char *path;                       /**< WAL file path */
39   pthread_mutex_t *mtxp;            /**< Global WAL mutex */
40   pthread_cond_t *cpt_condp;        /**< Checkpoint thread cond variable */
41   pthread_t *cptp;                  /**< Checkpoint thread */
42   iwrc(*wal_lock_interceptor)(bool, void *);
43   /**< Optional function called
44        - before acquiring
45        - after releasing
46        exclusive database lock by WAL checkpoint thread.
47        In the case of `before lock` first argument will be set to true */
48   void *wal_lock_interceptor_opaque;/**< Opaque data for `wal_lock_interceptor` */
49   uint32_t savepoint_timeout_sec;   /**< Savepoint timeout seconds */
50   uint32_t checkpoint_timeout_sec;  /**< Checkpoint timeout seconds */
51   atomic_size_t mbytes;             /**< Estimated size of modifed private mmaped memory bytes */
52   off_t rollforward_offset;         /**< Rollforward offset during online backup */
53   uint64_t checkpoint_ts;           /**< Last checkpoint timestamp milliseconds */
54   pthread_mutex_t mtx;              /**< Global WAL mutex */
55   pthread_cond_t cpt_cond;          /**< Checkpoint thread cond variable */
56   pthread_t cpt;                    /**< Checkpoint thread */
57   IWKV iwkv;
58 } IWAL;
59 
60 static iwrc _checkpoint_exl(IWAL *wal, uint64_t *tsp, bool no_fixpoint);
61 
_lock(IWAL * wal)62 IW_INLINE iwrc _lock(IWAL *wal) {
63   int rci = pthread_mutex_lock(wal->mtxp);
64   return (rci ? iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci) : 0);
65 }
66 
_unlock(IWAL * wal)67 IW_INLINE iwrc _unlock(IWAL *wal) {
68   int rci = pthread_mutex_unlock(wal->mtxp);
69   return (rci ? iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci) : 0);
70 }
71 
_excl_lock(IWAL * wal)72 static iwrc _excl_lock(IWAL *wal) {
73   iwrc rc = 0;
74   if (wal->wal_lock_interceptor) {
75     rc = wal->wal_lock_interceptor(true, wal->wal_lock_interceptor_opaque);
76     RCRET(rc);
77   }
78   rc = iwkv_exclusive_lock(wal->iwkv);
79   if (rc) {
80     if (wal->wal_lock_interceptor) {
81       IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc);
82     }
83     return rc;
84   }
85   rc = _lock(wal);
86   if (rc) {
87     IWRC(iwkv_exclusive_unlock(wal->iwkv), rc);
88     if (wal->wal_lock_interceptor) {
89       IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc);
90     }
91   }
92   return rc;
93 }
94 
_excl_unlock(IWAL * wal)95 static iwrc _excl_unlock(IWAL *wal) {
96   iwrc rc = _unlock(wal);
97   IWRC(iwkv_exclusive_unlock(wal->iwkv), rc);
98   if (wal->wal_lock_interceptor) {
99     IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc);
100   }
101   return rc;
102 }
103 
_init_locks(IWAL * wal)104 static iwrc _init_locks(IWAL *wal) {
105   int rci = pthread_mutex_init(&wal->mtx, 0);
106   if (rci) {
107     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
108   }
109   wal->mtxp = &wal->mtx;
110   return 0;
111 }
112 
_destroy(IWAL * wal)113 static void _destroy(IWAL *wal) {
114   if (wal) {
115     wal->open = false;
116     if (!INVALIDHANDLE(wal->fh)) {
117       iwp_unlock(wal->fh);
118       iwp_closefh(wal->fh);
119     }
120     if (wal->cpt_condp) {
121       pthread_cond_destroy(wal->cpt_condp);
122       wal->cpt_condp = 0;
123     }
124     if (wal->mtxp) {
125       pthread_mutex_destroy(wal->mtxp);
126       wal->mtxp = 0;
127     }
128     free(wal->path);
129     if (wal->buf) {
130       wal->buf -= sizeof(WBSEP);
131       free(wal->buf);
132     }
133     free(wal);
134   }
135 }
136 
_flush_wl(IWAL * wal,bool sync)137 static iwrc _flush_wl(IWAL *wal, bool sync) {
138   iwrc rc = 0;
139   if (wal->bufpos) {
140     uint32_t crc = wal->check_cp_crc ? iwu_crc32(wal->buf, wal->bufpos, 0) : 0;
141     WBSEP sep = {
142       .id = WOP_SEP,
143       .crc = crc,
144       .len = wal->bufpos
145     };
146     size_t wz = wal->bufpos + sizeof(WBSEP);
147     uint8_t *wp = wal->buf - sizeof(WBSEP);
148     memcpy(wp, &sep, sizeof(WBSEP));
149     rc = iwp_write(wal->fh, wp, wz);
150     RCRET(rc);
151     wal->bufpos = 0;
152   }
153   if (sync) {
154     rc = iwp_fsync(wal->fh);
155   }
156   return rc;
157 }
158 
_truncate_wl(IWAL * wal)159 IW_INLINE iwrc _truncate_wl(IWAL *wal) {
160   iwrc rc = iwp_ftruncate(wal->fh, 0);
161   RCRET(rc);
162   wal->rollforward_offset = 0;
163   rc = iwp_lseek(wal->fh, 0, IWP_SEEK_SET, 0);
164   RCRET(rc);
165   rc = iwp_fsync(wal->fh);
166   return rc;
167 }
168 
_write_wl(IWAL * wal,const void * op,off_t oplen,const uint8_t * data,off_t len)169 static iwrc _write_wl(IWAL *wal, const void *op, off_t oplen, const uint8_t *data, off_t len) {
170   iwrc rc = 0;
171   const off_t bufsz = wal->bufsz;
172   wal->synched = false;
173   if (bufsz - wal->bufpos < oplen) {
174     rc = _flush_wl(wal, false);
175     RCRET(rc);
176   }
177   assert(bufsz - wal->bufpos >= oplen);
178   memcpy(wal->buf + wal->bufpos, op, (size_t) oplen);
179   wal->bufpos += oplen;
180   if (bufsz - wal->bufpos < len) {
181     rc = _flush_wl(wal, false);
182     RCRET(rc);
183     rc = iwp_write(wal->fh, data, (size_t) len);
184     RCRET(rc);
185   } else {
186     assert(bufsz - wal->bufpos >= len);
187     memcpy(wal->buf + wal->bufpos, data, (size_t) len);
188     wal->bufpos += len;
189   }
190   return rc;
191 }
192 
_write_op(IWAL * wal,const void * op,off_t oplen,const uint8_t * data,off_t len)193 IW_INLINE iwrc _write_op(IWAL *wal, const void *op, off_t oplen, const uint8_t *data, off_t len) {
194   iwrc rc = _lock(wal);
195   RCRET(rc);
196   rc = _write_wl(wal, op, oplen, data, len);
197   IWRC(_unlock(wal), rc);
198   return rc;
199 }
200 
iwal_sync(IWKV iwkv)201 iwrc iwal_sync(IWKV iwkv) {
202   IWAL *wal = (IWAL *) iwkv->dlsnr;
203   iwrc rc = _lock(wal);
204   RCRET(rc);
205   rc = _flush_wl(wal, true);
206   IWRC(_unlock(wal), rc);
207   return rc;
208 }
209 
_onopen(struct IWDLSNR * self,const char * path,int mode)210 static iwrc _onopen(struct IWDLSNR *self, const char *path, int mode) {
211   return 0;
212 }
213 
_onclosing(struct IWDLSNR * self)214 static iwrc _onclosing(struct IWDLSNR *self) {
215   IWAL *wal = (IWAL *) self;
216 #ifdef IW_TESTS
217   uint64_t tv = g_trigger;
218   if (tv & IWKVD_WAL_NO_CHECKPOINT_ON_CLOSE) {
219     _destroy(wal);
220     return 0;
221   }
222 #endif
223   iwrc rc = _checkpoint_exl(wal, 0, false);
224   _destroy(wal);
225   return rc;
226 }
227 
_onset(struct IWDLSNR * self,off_t off,uint8_t val,off_t len,int flags)228 static iwrc _onset(struct IWDLSNR *self, off_t off, uint8_t val, off_t len, int flags) {
229   IWAL *wal = (IWAL *) self;
230   if (wal->applying) {
231     return 0;
232   }
233   WBSET wb = {
234     .id = WOP_SET,
235     .val = val,
236     .off = off,
237     .len = len
238   };
239   wal->mbytes += len;
240   return _write_op((IWAL *) self, &wb, sizeof(wb), 0, 0);
241 }
242 
_oncopy(struct IWDLSNR * self,off_t off,off_t len,off_t noff,int flags)243 static iwrc _oncopy(struct IWDLSNR *self, off_t off, off_t len, off_t noff, int flags) {
244   IWAL *wal = (IWAL *) self;
245   if (wal->applying) {
246     return 0;
247   }
248   WBCOPY wb = {
249     .id = WOP_COPY,
250     .off = off,
251     .len = len,
252     .noff = noff
253   };
254   wal->mbytes += len;
255   return _write_op(wal, &wb, sizeof(wb), 0, 0);
256 }
257 
_onwrite(struct IWDLSNR * self,off_t off,const void * buf,off_t len,int flags)258 static iwrc _onwrite(struct IWDLSNR *self, off_t off, const void *buf, off_t len, int flags) {
259   assert(len <= (size_t)(-1));
260   IWAL *wal = (IWAL *) self;
261   if (wal->applying) {
262     return 0;
263   }
264   WBWRITE wb = {
265     .id = WOP_WRITE,
266     .crc = wal->check_cp_crc ? iwu_crc32(buf, len, 0) : 0,
267     .len = len,
268     .off = off
269   };
270   wal->mbytes += len;
271   return _write_op(wal, &wb, sizeof(wb), buf, len);
272 }
273 
_onresize(struct IWDLSNR * self,off_t osize,off_t nsize,int flags,bool * handled)274 static iwrc _onresize(struct IWDLSNR *self, off_t osize, off_t nsize, int flags, bool *handled) {
275   IWAL *wal = (IWAL *) self;
276   if (wal->applying) {
277     *handled = false;
278     return 0;
279   }
280   *handled = true;
281   WBRESIZE wb = {
282     .id = WOP_RESIZE,
283     .osize = osize,
284     .nsize = nsize
285   };
286   iwrc rc = _lock(wal);
287   RCRET(rc);
288   rc = _write_wl(wal, &wb, sizeof(wb), 0, 0);
289   RCGO(rc, finish);
290   rc = _checkpoint_exl(wal, 0, true);
291 finish:
292   IWRC(_unlock(wal), rc);
293   return rc;
294 }
295 
_onsynced(struct IWDLSNR * self,int flags)296 static iwrc _onsynced(struct IWDLSNR *self, int flags) {
297   IWAL *wal = (IWAL *) self;
298   if (wal->applying) {
299     return 0;
300   }
301   iwrc rc = _lock(wal);
302   RCRET(rc);
303   rc = _flush_wl(wal, true);
304   IWRC(_unlock(wal), rc);
305   return rc;
306 }
307 
_last_fix_and_reset_points(IWAL * wal,uint8_t * wmm,off_t fsz,off_t * fpos,off_t * rpos)308 static void _last_fix_and_reset_points(IWAL *wal, uint8_t *wmm, off_t fsz, off_t *fpos, off_t *rpos) {
309   uint8_t *rp = wmm;
310   *fpos = 0;
311   *rpos = 0;
312 
313   for (uint32_t i = 0; rp - wmm < fsz; ++i) {
314     uint8_t opid;
315     off_t avail = fsz - (rp - wmm);
316     memcpy(&opid, rp, 1);
317     if (i == 0 && opid != WOP_SEP) {
318       return;
319     }
320     switch (opid) {
321       case WOP_SEP: {
322         WBSEP wb;
323         if (avail < sizeof(wb)) {
324           return;
325         }
326         memcpy(&wb, rp, sizeof(wb));
327         rp += sizeof(wb);
328         if (wb.len > avail) {
329           return;
330         }
331         break;
332       }
333       case WOP_SET: {
334         if (avail < sizeof(WBSET)) {
335           return;
336         }
337         rp += sizeof(WBSET);
338         break;
339       }
340       case WOP_COPY: {
341         if (avail < sizeof(WBCOPY)) {
342           return;
343         }
344         rp += sizeof(WBCOPY);
345         break;
346       }
347       case WOP_WRITE: {
348         WBWRITE wb;
349         if (avail < sizeof(wb)) {
350           return;
351         }
352         memcpy(&wb, rp, sizeof(wb));
353         rp += sizeof(wb);
354         if (avail < wb.len) {
355           return;
356         }
357         rp += wb.len;
358         break;
359       }
360       case WOP_RESIZE: {
361         if (avail < sizeof(WBRESIZE)) {
362           return;
363         }
364         rp += sizeof(WBRESIZE);
365         break;
366       }
367       case WOP_FIXPOINT: {
368         *fpos = (rp - wmm);
369         rp += sizeof(WBFIXPOINT);
370         break;
371       }
372       case WOP_RESET: {
373         *rpos = (rp - wmm);
374         rp += sizeof(WBRESET);
375         break;
376       }
377       default: {
378         return;
379         break;
380       }
381     }
382   }
383 }
384 
_rollforward_exl(IWAL * wal,IWFS_EXT * extf,int recover_mode)385 static iwrc _rollforward_exl(IWAL *wal, IWFS_EXT *extf, int recover_mode) {
386   assert(wal->bufpos == 0);
387   off_t fsz = 0;
388   iwrc rc = iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz);
389   RCRET(rc);
390   if (!fsz) { // empty wal log
391     return 0;
392   }
393   size_t sp;
394   uint8_t *mm;
395   const bool ccrc = wal->check_cp_crc;
396   off_t fpos = 0; // checkpoint
397 #ifndef _WIN32
398   off_t pfsz = IW_ROUNDUP(fsz, iwp_page_size());
399   uint8_t *wmm = mmap(0, (size_t) pfsz, PROT_READ, MAP_PRIVATE, wal->fh, 0);
400   madvise(wmm, (size_t) fsz, MADV_SEQUENTIAL);
401 #else
402   off_t pfsz = fsz;
403   uint8_t *wmm = mmap(0, 0, PROT_READ, MAP_PRIVATE, wal->fh, 0);
404 #endif
405   if (wmm == MAP_FAILED) {
406     return iwrc_set_errno(IW_ERROR_ERRNO, errno);
407   }
408   // Temporary turn off extf locking
409   wal->applying = true;
410 
411   // Remap fsm in MAP_SHARED mode
412   extf->remove_mmap_unsafe(extf, 0);
413   rc = extf->add_mmap_unsafe(extf, 0, SIZE_T_MAX, IWFS_MMAP_SHARED);
414   if (rc) {
415     munmap(wmm, (size_t) pfsz);
416     wal->iwkv->fatalrc = rc;
417     wal->applying = false;
418     return rc;
419   }
420 
421 #define _WAL_CORRUPTED(msg_) do { \
422     rc = IWKV_ERROR_CORRUPTED_WAL_FILE; \
423     iwlog_ecode_error2(rc, msg_); \
424     goto finish; \
425   } while(0);
426 
427   if (recover_mode) {
428     off_t rpos; // reset point
429     _last_fix_and_reset_points(wal, wmm, fsz, &fpos, &rpos);
430     if (!fpos) {
431       goto finish;
432     }
433     if (rpos > 0 && recover_mode == 1) {
434       // Recover from last known reset point
435       if (fpos < rpos) {
436         goto finish;
437       }
438       // WBSEP__WBRESET
439       //        \_rpos
440       rpos -= sizeof(WBSEP);
441       // WBSEP__WBRESET
442       // \_rpos
443       wmm += rpos;
444       fsz -= rpos;
445     }
446   } else if (wal->rollforward_offset > 0) {
447     if (wal->rollforward_offset >= fsz) {
448       _WAL_CORRUPTED("Invalid rollforward offset");
449     }
450     wmm += wal->rollforward_offset;
451     fsz -= wal->rollforward_offset;
452   }
453 
454   uint8_t *rp = wmm;
455   for (uint32_t i = 0; rp - wmm < fsz; ++i) {
456     uint8_t opid;
457     off_t avail = fsz - (rp - wmm);
458     memcpy(&opid, rp, 1);
459     if (i == 0 && opid != WOP_SEP) {
460       rc = IWKV_ERROR_CORRUPTED_WAL_FILE;
461       goto finish;
462     }
463     switch (opid) {
464       case WOP_SEP: {
465         WBSEP wb;
466         if (avail < sizeof(wb)) _WAL_CORRUPTED("Premature end of WAL (WBSEP)");
467         memcpy(&wb, rp, sizeof(wb));
468         rp += sizeof(wb);
469         if (wb.len > avail) _WAL_CORRUPTED("Premature end of WAL (WBSEP)");
470         if (ccrc && wb.crc) {
471           uint32_t crc = iwu_crc32(rp, wb.len, 0);
472           if (crc != wb.crc) {
473             _WAL_CORRUPTED("Invalid CRC32 checksum of WAL segment (WBSEP)");
474           }
475         }
476         break;
477       }
478       case WOP_SET: {
479         WBSET wb;
480         if (avail < sizeof(wb)) _WAL_CORRUPTED("Premature end of WAL (WBSET)");
481         memcpy(&wb, rp, sizeof(wb));
482         rp += sizeof(wb);
483         rc = extf->probe_mmap_unsafe(extf, 0, &mm, &sp);
484         RCGO(rc, finish);
485         memset(mm + wb.off, wb.val, (size_t) wb.len);
486         break;
487       }
488       case WOP_COPY: {
489         WBCOPY wb;
490         if (avail < sizeof(wb)) _WAL_CORRUPTED("Premature end of WAL (WBCOPY)");
491         memcpy(&wb, rp, sizeof(wb));
492         rp += sizeof(wb);
493         rc = extf->probe_mmap_unsafe(extf, 0, &mm, &sp);
494         RCGO(rc, finish);
495         memmove(mm + wb.noff, mm + wb.off, (size_t) wb.len);
496         break;
497       }
498       case WOP_WRITE: {
499         WBWRITE wb;
500         if (avail < sizeof(wb)) _WAL_CORRUPTED("Premature end of WAL (WBWRITE)");
501         memcpy(&wb, rp, sizeof(wb));
502         rp += sizeof(wb);
503         if (avail < wb.len) _WAL_CORRUPTED("Premature end of WAL (WBWRITE)");
504         if (ccrc && wb.crc) {
505           uint32_t crc = iwu_crc32(rp, wb.len, 0);
506           if (crc != wb.crc) {
507             _WAL_CORRUPTED("Invalid CRC32 checksum of WAL segment (WBWRITE)");
508           }
509         }
510         rc = extf->probe_mmap_unsafe(extf, 0, &mm, &sp);
511         RCGO(rc, finish);
512         memmove(mm + wb.off, rp, wb.len);
513         rp += wb.len;
514         break;
515       }
516       case WOP_RESIZE: {
517         WBRESIZE wb;
518         if (avail < sizeof(wb)) _WAL_CORRUPTED("Premature end of WAL (WBRESIZE)");
519         memcpy(&wb, rp, sizeof(wb));
520         rp += sizeof(wb);
521         rc = extf->truncate_unsafe(extf, wb.nsize);
522         RCGO(rc, finish);
523         break;
524       }
525       case WOP_FIXPOINT:
526         if (fpos == rp - wmm) { // last fixpoint to
527           WBFIXPOINT wb;
528           memcpy(&wb, rp, sizeof(wb));
529           iwlog_warn("Database recovered at point of time: %"
530                      PRIu64
531                      " ms since epoch\n", wb.ts);
532           goto finish;
533         }
534         rp += sizeof(WBFIXPOINT);
535         break;
536       case WOP_RESET: {
537         rp += sizeof(WBRESET);
538         break;
539       }
540       default: {
541         _WAL_CORRUPTED("Invalid WAL command");
542         break;
543       }
544     }
545   }
546 #undef _WAL_CORRUPTED
547 
548 finish:
549   if (!rc) {
550     rc = extf->sync_mmap_unsafe(extf, 0, IWFS_SYNCDEFAULT);
551   }
552   munmap(wmm, (size_t) pfsz);
553   IWRC(extf->remove_mmap_unsafe(extf, 0), rc);
554   IWRC(extf->add_mmap_unsafe(extf, 0, SIZE_T_MAX, IWFS_MMAP_PRIVATE), rc);
555   if (!rc) {
556     int stage = wal->bkp_stage;
557     if (stage == 0 || stage == BKP_WAL_CLEANUP) {
558       rc = _truncate_wl(wal);
559     } else {
560       // Don't truncate WAL during online backup.
561       // Just append the WBRESET mark
562       WBRESET wb = {
563         .id = WOP_RESET
564       };
565       IWRC(_flush_wl(wal, false), rc);
566       // Write: WBSEP + WBRESET
567       IWRC(_write_wl(wal, &wb, sizeof(wb), 0, 0), rc);
568       IWRC(_flush_wl(wal, true), rc);
569       IWRC(iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz), rc);
570       if (!rc) {
571         // rollforward_offset points here --> WBSEP __ WBRESET __ EOF
572         wal->rollforward_offset = fsz - (sizeof(WBSEP) + sizeof(WBRESET));
573       }
574     }
575   }
576   if (rc && !wal->iwkv->fatalrc) {
577     wal->iwkv->fatalrc = rc;
578   }
579   wal->synched = true;
580   wal->applying = false;
581   return rc;
582 }
583 
_recover_wl(IWKV iwkv,IWAL * wal,IWFS_FSM_OPTS * fsmopts,bool recover_backup)584 static iwrc _recover_wl(IWKV iwkv, IWAL *wal, IWFS_FSM_OPTS *fsmopts, bool recover_backup) {
585   off_t fsz = 0;
586   iwrc rc = iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz);
587   RCRET(rc);
588   if (!fsz) { // empty wal log
589     return 0;
590   }
591   IWFS_EXT extf;
592   IWFS_EXT_OPTS extopts;
593   memcpy(&extopts, &fsmopts->exfile, sizeof(extopts));
594   extopts.use_locks = false;
595   extopts.file.omode = IWFS_OCREATE | IWFS_OWRITE;
596   extopts.file.dlsnr = 0;
597   rc = iwfs_exfile_open(&extf, &extopts);
598   RCRET(rc);
599   rc = _rollforward_exl(wal, &extf, recover_backup ? 2 : 1);
600   IWRC(extf.close(&extf), rc);
601   return rc;
602 }
603 
_need_checkpoint(IWAL * wal)604 IW_INLINE bool _need_checkpoint(IWAL *wal) {
605   uint64_t mbytes = wal->mbytes;
606   bool force = wal->force_cp;
607   return (force || mbytes >= wal->checkpoint_buffer_sz);
608 }
609 
_checkpoint_exl(IWAL * wal,uint64_t * tsp,bool no_fixpoint)610 static iwrc _checkpoint_exl(IWAL *wal, uint64_t *tsp, bool no_fixpoint) {
611   if (tsp) {
612     *tsp = 0;
613   }
614   int stage = wal->bkp_stage;
615   if (stage == BKP_MAIN_COPY) {
616     // No checkpoints during main file copying
617     return 0;
618   }
619   iwrc rc = 0;
620   IWFS_EXT *extf;
621   IWKV iwkv = wal->iwkv;
622   if (!no_fixpoint) {
623     wal->force_cp = false;
624     wal->force_sp = false;
625     WBFIXPOINT wb = {
626       .id = WOP_FIXPOINT
627     };
628     rc = iwp_current_time_ms(&wb.ts, false);
629     RCGO(rc, finish);
630     rc = _write_wl(wal, &wb, sizeof(wb), 0, 0);
631     RCGO(rc, finish);
632   }
633   rc = _flush_wl(wal, true);
634   RCGO(rc, finish);
635   rc = iwkv->fsm.extfile(&iwkv->fsm, &extf);
636   RCGO(rc, finish);
637 
638   rc = _rollforward_exl(wal, extf, 0);
639   wal->mbytes = 0;
640   wal->synched = true;
641   iwp_current_time_ms(&wal->checkpoint_ts, true);
642   if (tsp) {
643     *tsp = wal->checkpoint_ts;
644   }
645 
646 finish:
647   if (rc) {
648     if (iwkv->fatalrc) {
649       iwlog_ecode_error3(rc);
650     } else {
651       iwkv->fatalrc = rc;
652     }
653   }
654   return rc;
655 }
656 
657 #ifdef IW_TESTS
658 
iwal_test_checkpoint(IWKV iwkv)659 iwrc iwal_test_checkpoint(IWKV iwkv) {
660   if (!iwkv->dlsnr) {
661     return IWKV_ERROR_WAL_MODE_REQUIRED;
662   }
663   IWAL *wal = (IWAL *) iwkv->dlsnr;
664   iwrc rc = _excl_lock(wal);
665   RCRET(rc);
666   rc = _checkpoint_exl(wal, 0, false);
667   IWRC(_excl_unlock(wal), rc);
668   return rc;
669 }
670 
671 #endif
672 
673 //--------------------------------------- Public API
674 
iwal_poke_checkpoint(IWKV iwkv,bool force)675 WUR iwrc iwal_poke_checkpoint(IWKV iwkv, bool force) {
676   IWAL *wal = (IWAL *) iwkv->dlsnr;
677   if (!wal || !(force || _need_checkpoint(wal))) {
678     return 0;
679   }
680   iwrc rc = _lock(wal);
681   RCRET(rc);
682   bool cforce = wal->force_cp;
683   if (cforce) { // Forced already
684     _unlock(wal);
685     return 0;
686   } else if (force) {
687     wal->force_cp = true;
688   } else if (!_need_checkpoint(wal)) {
689     _unlock(wal);
690     return 0;
691   }
692   int rci = pthread_cond_broadcast(wal->cpt_condp);
693   if (rci) {
694     rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
695   }
696   _unlock(wal);
697   return rc;
698 }
699 
iwal_poke_savepoint(IWKV iwkv)700 iwrc iwal_poke_savepoint(IWKV iwkv) {
701   IWAL *wal = (IWAL *) iwkv->dlsnr;
702   if (!wal) {
703     return 0;
704   }
705   iwrc rc = _lock(wal);
706   RCRET(rc);
707   bool fsp = wal->force_sp;
708   if (!fsp) {
709     wal->force_sp = true;
710     int rci = pthread_cond_broadcast(wal->cpt_condp);
711     if (rci) {
712       rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
713     }
714   }
715   _unlock(wal);
716   return rc;
717 }
718 
_savepoint_exl(IWAL * wal,uint64_t * tsp,bool sync)719 iwrc _savepoint_exl(IWAL *wal, uint64_t *tsp, bool sync) {
720   if (tsp) {
721     *tsp = 0;
722   }
723   wal->force_sp = false;
724   WBFIXPOINT wbfp = {
725     .id = WOP_FIXPOINT
726   };
727   iwrc rc = iwp_current_time_ms(&wbfp.ts, false);
728   RCRET(rc);
729   rc = _write_wl(wal, &wbfp, sizeof(wbfp), 0, 0);
730   RCRET(rc);
731   rc = _flush_wl(wal, sync);
732   RCRET(rc);
733   if (sync) {
734     wal->synched = true;
735   }
736   if (tsp) {
737     *tsp = wbfp.ts;
738   }
739   return 0;
740 }
741 
iwal_synched(IWKV iwkv)742 bool iwal_synched(IWKV iwkv) {
743   IWAL *wal = (IWAL *) iwkv->dlsnr;
744   if (!wal) {
745     return false;
746   }
747   return wal->synched;
748 }
749 
iwal_savepoint_exl(IWKV iwkv,bool sync)750 iwrc iwal_savepoint_exl(IWKV iwkv, bool sync) {
751   IWAL *wal = (IWAL *) iwkv->dlsnr;
752   if (!wal) {
753     return 0;
754   }
755   return _savepoint_exl(wal, 0, sync);
756 }
757 
iwal_shutdown(IWKV iwkv)758 void iwal_shutdown(IWKV iwkv) {
759   IWAL *wal = (IWAL *) iwkv->dlsnr;
760   if (!wal) {
761     return;
762   }
763   while (wal->bkp_stage) { // todo: review
764     iwp_sleep(50);
765   }
766   wal->open = false;
767   if (wal->mtxp && wal->cpt_condp) {
768     pthread_mutex_lock(wal->mtxp);
769     pthread_cond_broadcast(wal->cpt_condp);
770     pthread_mutex_unlock(wal->mtxp);
771   }
772   if (wal->cptp) {
773     pthread_join(wal->cpt, 0);
774     wal->cpt = 0;
775   }
776 }
777 
_cpt_worker_fn(void * op)778 static void *_cpt_worker_fn(void *op) {
779   int rci;
780   iwrc rc = 0;
781   IWAL *wal = op;
782   IWKV iwkv = wal->iwkv;
783   uint64_t savepoint_ts = 0;
784 
785   while (wal->open) {
786     struct timespec tp;
787     uint64_t tick_ts;
788     bool sp = false, cp = false;
789     rc = _lock(wal);
790     RCBREAK(rc);
791 
792     if (_need_checkpoint(wal)) {
793       cp = true;
794       _unlock(wal);
795       goto cprun;
796     } else if (wal->force_sp) {
797       sp = true;
798       _unlock(wal);
799       goto cprun;
800     }
801 
802 #if defined(IW_HAVE_CLOCK_MONOTONIC) && defined(IW_HAVE_PTHREAD_CONDATTR_SETCLOCK)
803     rc = iwp_clock_get_time(CLOCK_MONOTONIC, &tp);
804 #else
805     rc = iwp_clock_get_time(CLOCK_REALTIME, &tp);
806 #endif
807     if (rc) {
808       _unlock(wal);
809       break;
810     }
811     tp.tv_sec += 1; // one sec tick
812     tick_ts = tp.tv_sec * 1000 + (uint64_t) round(tp.tv_nsec / 1.0e6);
813     rci = pthread_cond_timedwait(wal->cpt_condp, wal->mtxp, &tp);
814     if (rci && rci != ETIMEDOUT) {
815       rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
816       _unlock(wal);
817       break;
818     }
819     if (!wal->open || iwkv->fatalrc) {
820       _unlock(wal);
821       break;
822     }
823     bool synched = wal->synched;
824     size_t mbytes = wal->mbytes;
825     cp = _need_checkpoint(wal) || ((mbytes && (tick_ts - wal->checkpoint_ts) >= 1000LL * wal->checkpoint_timeout_sec));
826     if (!cp) {
827       sp = !synched && (wal->force_sp || ((tick_ts - savepoint_ts) >= 1000LL * wal->savepoint_timeout_sec));
828     }
829     _unlock(wal);
830 
831 cprun:
832     if (cp || sp) {
833       rc = _excl_lock(wal);
834       RCBREAK(rc);
835       if (iwkv->open) {
836         if (cp) {
837           rc = _checkpoint_exl(wal, &savepoint_ts, false);
838         } else {
839           rc = _savepoint_exl(wal, &savepoint_ts, true);
840         }
841       }
842       _excl_unlock(wal);
843       if (rc) {
844         iwlog_ecode_error2(rc, "WAL worker savepoint/checkpoint error\n");
845         rc = 0;
846       }
847     }
848   }
849   if (rc) {
850     iwkv->fatalrc = iwkv->fatalrc ? iwkv->fatalrc : rc;
851     iwlog_ecode_error2(rc, "WAL worker exited with error\n");
852   }
853   return 0;
854 }
855 
iwal_online_backup(IWKV iwkv,uint64_t * ts,const char * target_file)856 iwrc iwal_online_backup(IWKV iwkv, uint64_t *ts, const char *target_file) {
857   iwrc rc;
858   size_t sp;
859   uint32_t lv;
860   uint64_t llv;
861   char buf[16384];
862   off_t off = 0, fsize = 0;
863   *ts = 0;
864 
865   if (!target_file) {
866     return IW_ERROR_INVALID_ARGS;
867   }
868   IWAL *wal = (IWAL *) iwkv->dlsnr;
869   if (!wal) {
870     return IWKV_ERROR_WAL_MODE_REQUIRED;
871   }
872   rc = _lock(wal);
873   RCRET(rc);
874   if (wal->bkp_stage) {
875     rc = IWKV_ERROR_BACKUP_IN_PROGRESS;
876   } else {
877     wal->bkp_stage = BKP_STARTED;
878   }
879   _unlock(wal);
880 
881 #ifndef _WIN32
882   HANDLE fh = open(target_file, O_CREAT | O_WRONLY | O_TRUNC, 00600);
883   if (INVALIDHANDLE(fh)) {
884     rc = iwrc_set_errno(IW_ERROR_IO_ERRNO, errno);
885     goto finish;
886   }
887 #else
888   HANDLE fh = CreateFile(target_file, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE,
889                          NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
890   if (INVALIDHANDLE(fh)) {
891     rc = iwrc_set_werror(IW_ERROR_IO_ERRNO, GetLastError());
892     goto finish;
893   }
894 #endif
895 
896   // Flush all pending WAL changes
897   rc = _excl_lock(wal);
898   RCGO(rc, finish);
899   wal->bkp_stage = BKP_WAL_CLEANUP;
900   rc = _checkpoint_exl(wal, 0, false);
901   wal->bkp_stage = BKP_MAIN_COPY;
902   _excl_unlock(wal);
903   RCGO(rc, finish);
904 
905   // Copy main database file
906   IWFS_FSM_STATE fstate = {0};
907   rc = iwkv->fsm.state(&iwkv->fsm, &fstate);
908   RCGO(rc, finish);
909   do {
910     rc = iwp_pread(fstate.exfile.file.fh, off, buf, sizeof(buf), &sp);
911     RCGO(rc, finish);
912     if (sp > 0) {
913       rc = iwp_write(fh, buf, sp);
914       RCGO(rc, finish);
915       off += sp;
916     }
917   } while (sp > 0);
918 
919   // Copy most of WAL file content
920   rc = _lock(wal);
921   RCGO(rc, finish);
922   wal->bkp_stage = BKP_WAL_COPY1;
923   rc = _flush_wl(wal, false);
924   _unlock(wal);
925   RCGO(rc, finish);
926 
927   fsize = off;
928   off = 0;
929   do {
930     rc = iwp_pread(wal->fh, off, buf, sizeof(buf), &sp);
931     RCGO(rc, finish);
932     if (sp > 0) {
933       rc = iwp_write(fh, buf, sp);
934       RCGO(rc, finish);
935       off += sp;
936     }
937   } while (sp > 0);
938 
939 
940   // Copy rest of WAL file in exclusive locked mode
941   rc = _excl_lock(wal);
942   RCGO(rc, finish);
943   wal->bkp_stage = BKP_WAL_COPY2;
944   rc = _savepoint_exl(wal, ts, true);
945   RCGO(rc, unlock);
946   do {
947     rc = iwp_pread(wal->fh, off, buf, sizeof(buf), &sp);
948     RCGO(rc, unlock);
949     if (sp > 0) {
950       rc = iwp_write(fh, buf, sp);
951       RCGO(rc, unlock);
952       off += sp;
953     }
954   } while (sp > 0);
955 
956   llv = IW_HTOILL(fsize);
957   rc = iwp_write(fh, &llv, sizeof(llv));
958   RCGO(rc, unlock);
959 
960   lv = IW_HTOIL(IWKV_BACKUP_MAGIC);
961   rc = iwp_write(fh, &lv, sizeof(lv));
962   RCGO(rc, unlock);
963 
964 unlock:
965   wal->bkp_stage = 0;
966   IWRC(_excl_unlock(wal), rc);
967 
968 finish:
969   if (rc) {
970     _lock(wal);
971     wal->bkp_stage = 0;
972     _unlock(wal);
973   } else {
974     rc = iwal_poke_checkpoint(iwkv, true);
975   }
976   if (!INVALIDHANDLE(fh)) {
977     IWRC(iwp_fdatasync(fh), rc);
978     IWRC(iwp_closefh(fh), rc);
979   }
980   return rc;
981 }
982 
_init_cpt(IWAL * wal)983 iwrc _init_cpt(IWAL *wal) {
984   if (wal->savepoint_timeout_sec == UINT32_MAX
985       && wal->checkpoint_timeout_sec == UINT32_MAX) {
986     // do not start checkpoint thread
987     return 0;
988   }
989   pthread_attr_t pattr;
990   pthread_condattr_t cattr;
991   int rci = pthread_condattr_init(&cattr);
992   if (rci) {
993     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
994   }
995 #if defined(IW_HAVE_CLOCK_MONOTONIC) && defined(IW_HAVE_PTHREAD_CONDATTR_SETCLOCK)
996   rci = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);
997   if (rci) {
998     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
999   }
1000 #endif
1001   rci = pthread_cond_init(&wal->cpt_cond, &cattr);
1002   if (rci) {
1003     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1004   }
1005   wal->cpt_condp = &wal->cpt_cond;
1006   rci = pthread_attr_init(&pattr);
1007   if (rci) {
1008     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1009   }
1010   pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_JOINABLE);
1011   rci = pthread_create(&wal->cpt, &pattr, _cpt_worker_fn, wal);
1012   if (rci) {
1013     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1014   }
1015   wal->cptp = &wal->cpt;
1016   return 0;
1017 }
1018 
iwal_create(IWKV iwkv,const IWKV_OPTS * opts,IWFS_FSM_OPTS * fsmopts,bool recover_backup)1019 iwrc iwal_create(IWKV iwkv, const IWKV_OPTS *opts, IWFS_FSM_OPTS *fsmopts, bool recover_backup) {
1020   assert(!iwkv->dlsnr && opts && fsmopts);
1021   if (!opts) {
1022     return IW_ERROR_INVALID_ARGS;
1023   }
1024   if ((opts->oflags & IWKV_RDONLY) || !opts->wal.enabled) {
1025     return 0;
1026   }
1027   iwrc rc = 0;
1028   IWAL *wal = calloc(1, sizeof(*wal));
1029   if (!wal) {
1030     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1031   }
1032 
1033   wal->wal_lock_interceptor = opts->wal.wal_lock_interceptor;
1034   wal->wal_lock_interceptor_opaque = opts->wal.wal_lock_interceptor_opaque;
1035 
1036   size_t sz = strlen(opts->path);
1037   char *wpath = malloc(sz + 4 /*-wal*/ + 1 /*\0*/);
1038   if (!wpath) {
1039     free(wal);
1040     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1041   }
1042   memcpy(wpath, opts->path, sz);
1043   memcpy(wpath + sz, "-wal", 4);
1044   wpath[sz + 4] = '\0';
1045 
1046   wal->fh = INVALID_HANDLE_VALUE;
1047   wal->path = wpath;
1048   wal->oflags = opts->oflags;
1049   wal->iwkv = iwkv;
1050   iwp_current_time_ms(&wal->checkpoint_ts, true);
1051 
1052   rc = _init_locks(wal);
1053   RCGO(rc, finish);
1054 
1055   IWDLSNR *dlsnr = &wal->lsnr;
1056   dlsnr->onopen = _onopen;
1057   dlsnr->onclosing = _onclosing;
1058   dlsnr->onset = _onset;
1059   dlsnr->oncopy = _oncopy;
1060   dlsnr->onwrite = _onwrite;
1061   dlsnr->onresize = _onresize;
1062   dlsnr->onsynced = _onsynced;
1063   iwkv->dlsnr = (IWDLSNR *) wal;
1064 
1065   wal->wal_buffer_sz =
1066     opts->wal.wal_buffer_sz > 0 ?
1067     opts->wal.wal_buffer_sz :
1068 #if defined __ANDROID__ || defined TARGET_OS_IPHONE
1069     2 * 1024 * 1024; // 2M
1070 #else
1071     8 * 1024 * 1024; // 8M
1072 #endif
1073   if (wal->wal_buffer_sz < 4096) {
1074     wal->wal_buffer_sz = 4096;
1075   }
1076 
1077   wal->checkpoint_buffer_sz
1078     = opts->wal.checkpoint_buffer_sz > 0 ?
1079       opts->wal.checkpoint_buffer_sz :
1080 #if defined __ANDROID__ || defined TARGET_OS_IPHONE
1081       64ULL * 1024 * 1024; // 64M
1082 #else
1083       1024ULL * 1024 * 1024; // 1G
1084 #endif
1085   if (wal->checkpoint_buffer_sz < 1024 * 1024) { // 1M minimal
1086     wal->checkpoint_buffer_sz = 1024 * 1024;
1087   }
1088 
1089   wal->savepoint_timeout_sec
1090     = opts->wal.savepoint_timeout_sec > 0 ?
1091       opts->wal.savepoint_timeout_sec : 10; // 10 sec
1092 
1093   wal->checkpoint_timeout_sec
1094     = opts->wal.checkpoint_timeout_sec > 0 ?
1095 #if defined __ANDROID__ || defined TARGET_OS_IPHONE
1096       opts->wal.checkpoint_timeout_sec : 60; // 1 min
1097 #else
1098       opts->wal.checkpoint_timeout_sec : 300; // 5 min
1099 #endif
1100 
1101   if (wal->checkpoint_timeout_sec < 10) { // 10 sec minimal
1102     wal->checkpoint_timeout_sec = 10;
1103   }
1104   if (wal->savepoint_timeout_sec >= wal->checkpoint_timeout_sec) {
1105     wal->savepoint_timeout_sec = wal->checkpoint_timeout_sec / 2;
1106   }
1107 
1108   wal->check_cp_crc = opts->wal.check_crc_on_checkpoint;
1109 
1110   wal->buf = malloc(wal->wal_buffer_sz);
1111   if (!wal->buf) {
1112     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1113     goto finish;
1114   }
1115   wal->buf += sizeof(WBSEP);
1116   wal->bufsz = wal->wal_buffer_sz - sizeof(WBSEP);
1117 
1118   // Now open WAL file
1119 
1120 #ifndef _WIN32
1121   HANDLE fh = open(wal->path, O_CREAT | O_RDWR, IWFS_DEFAULT_FILEMODE);
1122   if (INVALIDHANDLE(fh)) {
1123     rc = iwrc_set_errno(IW_ERROR_IO_ERRNO, errno);
1124     goto finish;
1125   }
1126 #else
1127   HANDLE fh = CreateFile(wal->path, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ,
1128                          NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
1129   if (INVALIDHANDLE(fh)) {
1130     rc = iwrc_set_werror(IW_ERROR_IO_ERRNO, GetLastError());
1131     goto finish;
1132   }
1133 #endif
1134 
1135   wal->fh = fh;
1136   rc = iwp_flock(wal->fh, IWP_WLOCK);
1137   RCGO(rc, finish);
1138 
1139   // Now force all fsm data to be privately mmaped.
1140   // We will apply wal log to main database file
1141   // then re-read our private mmaps
1142   fsmopts->mmap_opts = IWFS_MMAP_PRIVATE;
1143   fsmopts->exfile.file.dlsnr = iwkv->dlsnr;
1144 
1145   if (wal->oflags & IWKV_TRUNC) {
1146     rc = _truncate_wl(wal);
1147     RCGO(rc, finish);
1148   } else {
1149     rc = _recover_wl(iwkv, wal, fsmopts, recover_backup);
1150     RCGO(rc, finish);
1151   }
1152 
1153   wal->open = true;
1154   // Start checkpoint thread
1155   rc = _init_cpt(wal);
1156 
1157 finish:
1158   if (rc) {
1159     iwkv->dlsnr = 0;
1160     iwkv->fatalrc = iwkv->fatalrc ? iwkv->fatalrc : rc;
1161     iwal_shutdown(iwkv);
1162     _destroy(wal);
1163   }
1164   return rc;
1165 }
1166