Lines Matching refs:wal
60 static iwrc _checkpoint_exl(IWAL *wal, uint64_t *tsp, bool no_fixpoint);
62 IW_INLINE iwrc _lock(IWAL *wal) { in _lock() argument
63 int rci = pthread_mutex_lock(wal->mtxp); in _lock()
67 IW_INLINE iwrc _unlock(IWAL *wal) { in _unlock() argument
68 int rci = pthread_mutex_unlock(wal->mtxp); in _unlock()
72 static iwrc _excl_lock(IWAL *wal) { in _excl_lock() argument
74 if (wal->wal_lock_interceptor) { in _excl_lock()
75 rc = wal->wal_lock_interceptor(true, wal->wal_lock_interceptor_opaque); in _excl_lock()
78 rc = iwkv_exclusive_lock(wal->iwkv); in _excl_lock()
80 if (wal->wal_lock_interceptor) { in _excl_lock()
81 IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc); in _excl_lock()
85 rc = _lock(wal); in _excl_lock()
87 IWRC(iwkv_exclusive_unlock(wal->iwkv), rc); in _excl_lock()
88 if (wal->wal_lock_interceptor) { in _excl_lock()
89 IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc); in _excl_lock()
95 static iwrc _excl_unlock(IWAL *wal) { in _excl_unlock() argument
96 iwrc rc = _unlock(wal); in _excl_unlock()
97 IWRC(iwkv_exclusive_unlock(wal->iwkv), rc); in _excl_unlock()
98 if (wal->wal_lock_interceptor) { in _excl_unlock()
99 IWRC(wal->wal_lock_interceptor(false, wal->wal_lock_interceptor_opaque), rc); in _excl_unlock()
104 static iwrc _init_locks(IWAL *wal) { in _init_locks() argument
105 int rci = pthread_mutex_init(&wal->mtx, 0); in _init_locks()
109 wal->mtxp = &wal->mtx; in _init_locks()
113 static void _destroy(IWAL *wal) { in _destroy() argument
114 if (wal) { in _destroy()
115 wal->open = false; in _destroy()
116 if (!INVALIDHANDLE(wal->fh)) { in _destroy()
117 iwp_unlock(wal->fh); in _destroy()
118 iwp_closefh(wal->fh); in _destroy()
120 if (wal->cpt_condp) { in _destroy()
121 pthread_cond_destroy(wal->cpt_condp); in _destroy()
122 wal->cpt_condp = 0; in _destroy()
124 if (wal->mtxp) { in _destroy()
125 pthread_mutex_destroy(wal->mtxp); in _destroy()
126 wal->mtxp = 0; in _destroy()
128 free(wal->path); in _destroy()
129 if (wal->buf) { in _destroy()
130 wal->buf -= sizeof(WBSEP); in _destroy()
131 free(wal->buf); in _destroy()
133 free(wal); in _destroy()
137 static iwrc _flush_wl(IWAL *wal, bool sync) { in _flush_wl() argument
139 if (wal->bufpos) { in _flush_wl()
140 uint32_t crc = wal->check_cp_crc ? iwu_crc32(wal->buf, wal->bufpos, 0) : 0; in _flush_wl()
144 .len = wal->bufpos in _flush_wl()
146 size_t wz = wal->bufpos + sizeof(WBSEP); in _flush_wl()
147 uint8_t *wp = wal->buf - sizeof(WBSEP); in _flush_wl()
149 rc = iwp_write(wal->fh, wp, wz); in _flush_wl()
151 wal->bufpos = 0; in _flush_wl()
154 rc = iwp_fsync(wal->fh); in _flush_wl()
159 IW_INLINE iwrc _truncate_wl(IWAL *wal) { in _truncate_wl() argument
160 iwrc rc = iwp_ftruncate(wal->fh, 0); in _truncate_wl()
162 wal->rollforward_offset = 0; in _truncate_wl()
163 rc = iwp_lseek(wal->fh, 0, IWP_SEEK_SET, 0); in _truncate_wl()
165 rc = iwp_fsync(wal->fh); in _truncate_wl()
169 static iwrc _write_wl(IWAL *wal, const void *op, off_t oplen, const uint8_t *data, off_t len) { in _write_wl() argument
171 const off_t bufsz = wal->bufsz; in _write_wl()
172 wal->synched = false; in _write_wl()
173 if (bufsz - wal->bufpos < oplen) { in _write_wl()
174 rc = _flush_wl(wal, false); in _write_wl()
177 assert(bufsz - wal->bufpos >= oplen); in _write_wl()
178 memcpy(wal->buf + wal->bufpos, op, (size_t) oplen); in _write_wl()
179 wal->bufpos += oplen; in _write_wl()
180 if (bufsz - wal->bufpos < len) { in _write_wl()
181 rc = _flush_wl(wal, false); in _write_wl()
183 rc = iwp_write(wal->fh, data, (size_t) len); in _write_wl()
186 assert(bufsz - wal->bufpos >= len); in _write_wl()
187 memcpy(wal->buf + wal->bufpos, data, (size_t) len); in _write_wl()
188 wal->bufpos += len; in _write_wl()
193 IW_INLINE iwrc _write_op(IWAL *wal, const void *op, off_t oplen, const uint8_t *data, off_t len) { in _write_op() argument
194 iwrc rc = _lock(wal); in _write_op()
196 rc = _write_wl(wal, op, oplen, data, len); in _write_op()
197 IWRC(_unlock(wal), rc); in _write_op()
202 IWAL *wal = (IWAL *) iwkv->dlsnr; in iwal_sync() local
203 iwrc rc = _lock(wal); in iwal_sync()
205 rc = _flush_wl(wal, true); in iwal_sync()
206 IWRC(_unlock(wal), rc); in iwal_sync()
215 IWAL *wal = (IWAL *) self; in _onclosing() local
219 _destroy(wal); in _onclosing()
223 iwrc rc = _checkpoint_exl(wal, 0, false); in _onclosing()
224 _destroy(wal); in _onclosing()
229 IWAL *wal = (IWAL *) self; in _onset() local
230 if (wal->applying) { in _onset()
239 wal->mbytes += len; in _onset()
244 IWAL *wal = (IWAL *) self; in _oncopy() local
245 if (wal->applying) { in _oncopy()
254 wal->mbytes += len; in _oncopy()
255 return _write_op(wal, &wb, sizeof(wb), 0, 0); in _oncopy()
260 IWAL *wal = (IWAL *) self; in _onwrite() local
261 if (wal->applying) { in _onwrite()
266 .crc = wal->check_cp_crc ? iwu_crc32(buf, len, 0) : 0, in _onwrite()
270 wal->mbytes += len; in _onwrite()
271 return _write_op(wal, &wb, sizeof(wb), buf, len); in _onwrite()
275 IWAL *wal = (IWAL *) self; in _onresize() local
276 if (wal->applying) { in _onresize()
286 iwrc rc = _lock(wal); in _onresize()
288 rc = _write_wl(wal, &wb, sizeof(wb), 0, 0); in _onresize()
290 rc = _checkpoint_exl(wal, 0, true); in _onresize()
292 IWRC(_unlock(wal), rc); in _onresize()
297 IWAL *wal = (IWAL *) self; in _onsynced() local
298 if (wal->applying) { in _onsynced()
301 iwrc rc = _lock(wal); in _onsynced()
303 rc = _flush_wl(wal, true); in _onsynced()
304 IWRC(_unlock(wal), rc); in _onsynced()
308 static void _last_fix_and_reset_points(IWAL *wal, uint8_t *wmm, off_t fsz, off_t *fpos, off_t *rpos… in _last_fix_and_reset_points() argument
385 static iwrc _rollforward_exl(IWAL *wal, IWFS_EXT *extf, int recover_mode) { in _rollforward_exl() argument
386 assert(wal->bufpos == 0); in _rollforward_exl()
388 iwrc rc = iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz); in _rollforward_exl()
395 const bool ccrc = wal->check_cp_crc; in _rollforward_exl()
399 uint8_t *wmm = mmap(0, (size_t) pfsz, PROT_READ, MAP_PRIVATE, wal->fh, 0); in _rollforward_exl()
403 uint8_t *wmm = mmap(0, 0, PROT_READ, MAP_PRIVATE, wal->fh, 0); in _rollforward_exl()
409 wal->applying = true; in _rollforward_exl()
416 wal->iwkv->fatalrc = rc; in _rollforward_exl()
417 wal->applying = false; in _rollforward_exl()
429 _last_fix_and_reset_points(wal, wmm, fsz, &fpos, &rpos); in _rollforward_exl()
446 } else if (wal->rollforward_offset > 0) { in _rollforward_exl()
447 if (wal->rollforward_offset >= fsz) { in _rollforward_exl()
450 wmm += wal->rollforward_offset; in _rollforward_exl()
451 fsz -= wal->rollforward_offset; in _rollforward_exl()
556 int stage = wal->bkp_stage; in _rollforward_exl()
558 rc = _truncate_wl(wal); in _rollforward_exl()
565 IWRC(_flush_wl(wal, false), rc); in _rollforward_exl()
567 IWRC(_write_wl(wal, &wb, sizeof(wb), 0, 0), rc); in _rollforward_exl()
568 IWRC(_flush_wl(wal, true), rc); in _rollforward_exl()
569 IWRC(iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz), rc); in _rollforward_exl()
572 wal->rollforward_offset = fsz - (sizeof(WBSEP) + sizeof(WBRESET)); in _rollforward_exl()
576 if (rc && !wal->iwkv->fatalrc) { in _rollforward_exl()
577 wal->iwkv->fatalrc = rc; in _rollforward_exl()
579 wal->synched = true; in _rollforward_exl()
580 wal->applying = false; in _rollforward_exl()
584 static iwrc _recover_wl(IWKV iwkv, IWAL *wal, IWFS_FSM_OPTS *fsmopts, bool recover_backup) { in _recover_wl() argument
586 iwrc rc = iwp_lseek(wal->fh, 0, IWP_SEEK_END, &fsz); in _recover_wl()
599 rc = _rollforward_exl(wal, &extf, recover_backup ? 2 : 1); in _recover_wl()
604 IW_INLINE bool _need_checkpoint(IWAL *wal) { in _need_checkpoint() argument
605 uint64_t mbytes = wal->mbytes; in _need_checkpoint()
606 bool force = wal->force_cp; in _need_checkpoint()
607 return (force || mbytes >= wal->checkpoint_buffer_sz); in _need_checkpoint()
610 static iwrc _checkpoint_exl(IWAL *wal, uint64_t *tsp, bool no_fixpoint) { in _checkpoint_exl() argument
614 int stage = wal->bkp_stage; in _checkpoint_exl()
621 IWKV iwkv = wal->iwkv; in _checkpoint_exl()
623 wal->force_cp = false; in _checkpoint_exl()
624 wal->force_sp = false; in _checkpoint_exl()
630 rc = _write_wl(wal, &wb, sizeof(wb), 0, 0); in _checkpoint_exl()
633 rc = _flush_wl(wal, true); in _checkpoint_exl()
638 rc = _rollforward_exl(wal, extf, 0); in _checkpoint_exl()
639 wal->mbytes = 0; in _checkpoint_exl()
640 wal->synched = true; in _checkpoint_exl()
641 iwp_current_time_ms(&wal->checkpoint_ts, true); in _checkpoint_exl()
643 *tsp = wal->checkpoint_ts; in _checkpoint_exl()
663 IWAL *wal = (IWAL *) iwkv->dlsnr; in iwal_test_checkpoint() local
664 iwrc rc = _excl_lock(wal); in iwal_test_checkpoint()
666 rc = _checkpoint_exl(wal, 0, false); in iwal_test_checkpoint()
667 IWRC(_excl_unlock(wal), rc); in iwal_test_checkpoint()
676 IWAL *wal = (IWAL *) iwkv->dlsnr; in iwal_poke_checkpoint() local
677 if (!wal || !(force || _need_checkpoint(wal))) { in iwal_poke_checkpoint()
680 iwrc rc = _lock(wal); in iwal_poke_checkpoint()
682 bool cforce = wal->force_cp; in iwal_poke_checkpoint()
684 _unlock(wal); in iwal_poke_checkpoint()
687 wal->force_cp = true; in iwal_poke_checkpoint()
688 } else if (!_need_checkpoint(wal)) { in iwal_poke_checkpoint()
689 _unlock(wal); in iwal_poke_checkpoint()
692 int rci = pthread_cond_broadcast(wal->cpt_condp); in iwal_poke_checkpoint()
696 _unlock(wal); in iwal_poke_checkpoint()
701 IWAL *wal = (IWAL *) iwkv->dlsnr; in iwal_poke_savepoint() local
702 if (!wal) { in iwal_poke_savepoint()
705 iwrc rc = _lock(wal); in iwal_poke_savepoint()
707 bool fsp = wal->force_sp; in iwal_poke_savepoint()
709 wal->force_sp = true; in iwal_poke_savepoint()
710 int rci = pthread_cond_broadcast(wal->cpt_condp); in iwal_poke_savepoint()
715 _unlock(wal); in iwal_poke_savepoint()
719 iwrc _savepoint_exl(IWAL *wal, uint64_t *tsp, bool sync) { in _savepoint_exl() argument
723 wal->force_sp = false; in _savepoint_exl()
729 rc = _write_wl(wal, &wbfp, sizeof(wbfp), 0, 0); in _savepoint_exl()
731 rc = _flush_wl(wal, sync); in _savepoint_exl()
734 wal->synched = true; in _savepoint_exl()
743 IWAL *wal = (IWAL *) iwkv->dlsnr; in iwal_synched() local
744 if (!wal) { in iwal_synched()
747 return wal->synched; in iwal_synched()
751 IWAL *wal = (IWAL *) iwkv->dlsnr; in iwal_savepoint_exl() local
752 if (!wal) { in iwal_savepoint_exl()
755 return _savepoint_exl(wal, 0, sync); in iwal_savepoint_exl()
759 IWAL *wal = (IWAL *) iwkv->dlsnr; in iwal_shutdown() local
760 if (!wal) { in iwal_shutdown()
763 while (wal->bkp_stage) { // todo: review in iwal_shutdown()
766 wal->open = false; in iwal_shutdown()
767 if (wal->mtxp && wal->cpt_condp) { in iwal_shutdown()
768 pthread_mutex_lock(wal->mtxp); in iwal_shutdown()
769 pthread_cond_broadcast(wal->cpt_condp); in iwal_shutdown()
770 pthread_mutex_unlock(wal->mtxp); in iwal_shutdown()
772 if (wal->cptp) { in iwal_shutdown()
773 pthread_join(wal->cpt, 0); in iwal_shutdown()
774 wal->cpt = 0; in iwal_shutdown()
781 IWAL *wal = op; in _cpt_worker_fn() local
782 IWKV iwkv = wal->iwkv; in _cpt_worker_fn()
785 while (wal->open) { in _cpt_worker_fn()
789 rc = _lock(wal); in _cpt_worker_fn()
792 if (_need_checkpoint(wal)) { in _cpt_worker_fn()
794 _unlock(wal); in _cpt_worker_fn()
796 } else if (wal->force_sp) { in _cpt_worker_fn()
798 _unlock(wal); in _cpt_worker_fn()
808 _unlock(wal); in _cpt_worker_fn()
813 rci = pthread_cond_timedwait(wal->cpt_condp, wal->mtxp, &tp); in _cpt_worker_fn()
816 _unlock(wal); in _cpt_worker_fn()
819 if (!wal->open || iwkv->fatalrc) { in _cpt_worker_fn()
820 _unlock(wal); in _cpt_worker_fn()
823 bool synched = wal->synched; in _cpt_worker_fn()
824 size_t mbytes = wal->mbytes; in _cpt_worker_fn()
825 …cp = _need_checkpoint(wal) || ((mbytes && (tick_ts - wal->checkpoint_ts) >= 1000LL * wal->checkpoi… in _cpt_worker_fn()
827 …sp = !synched && (wal->force_sp || ((tick_ts - savepoint_ts) >= 1000LL * wal->savepoint_timeout_se… in _cpt_worker_fn()
829 _unlock(wal); in _cpt_worker_fn()
833 rc = _excl_lock(wal); in _cpt_worker_fn()
837 rc = _checkpoint_exl(wal, &savepoint_ts, false); in _cpt_worker_fn()
839 rc = _savepoint_exl(wal, &savepoint_ts, true); in _cpt_worker_fn()
842 _excl_unlock(wal); in _cpt_worker_fn()
868 IWAL *wal = (IWAL *) iwkv->dlsnr; in iwal_online_backup() local
869 if (!wal) { in iwal_online_backup()
872 rc = _lock(wal); in iwal_online_backup()
874 if (wal->bkp_stage) { in iwal_online_backup()
877 wal->bkp_stage = BKP_STARTED; in iwal_online_backup()
879 _unlock(wal); in iwal_online_backup()
897 rc = _excl_lock(wal); in iwal_online_backup()
899 wal->bkp_stage = BKP_WAL_CLEANUP; in iwal_online_backup()
900 rc = _checkpoint_exl(wal, 0, false); in iwal_online_backup()
901 wal->bkp_stage = BKP_MAIN_COPY; in iwal_online_backup()
902 _excl_unlock(wal); in iwal_online_backup()
920 rc = _lock(wal); in iwal_online_backup()
922 wal->bkp_stage = BKP_WAL_COPY1; in iwal_online_backup()
923 rc = _flush_wl(wal, false); in iwal_online_backup()
924 _unlock(wal); in iwal_online_backup()
930 rc = iwp_pread(wal->fh, off, buf, sizeof(buf), &sp); in iwal_online_backup()
941 rc = _excl_lock(wal); in iwal_online_backup()
943 wal->bkp_stage = BKP_WAL_COPY2; in iwal_online_backup()
944 rc = _savepoint_exl(wal, ts, true); in iwal_online_backup()
947 rc = iwp_pread(wal->fh, off, buf, sizeof(buf), &sp); in iwal_online_backup()
965 wal->bkp_stage = 0; in iwal_online_backup()
966 IWRC(_excl_unlock(wal), rc); in iwal_online_backup()
970 _lock(wal); in iwal_online_backup()
971 wal->bkp_stage = 0; in iwal_online_backup()
972 _unlock(wal); in iwal_online_backup()
983 iwrc _init_cpt(IWAL *wal) { in _init_cpt() argument
984 if (wal->savepoint_timeout_sec == UINT32_MAX in _init_cpt()
985 && wal->checkpoint_timeout_sec == UINT32_MAX) { in _init_cpt()
1001 rci = pthread_cond_init(&wal->cpt_cond, &cattr); in _init_cpt()
1005 wal->cpt_condp = &wal->cpt_cond; in _init_cpt()
1011 rci = pthread_create(&wal->cpt, &pattr, _cpt_worker_fn, wal); in _init_cpt()
1015 wal->cptp = &wal->cpt; in _init_cpt()
1024 if ((opts->oflags & IWKV_RDONLY) || !opts->wal.enabled) { in iwal_create()
1028 IWAL *wal = calloc(1, sizeof(*wal)); in iwal_create() local
1029 if (!wal) { in iwal_create()
1033 wal->wal_lock_interceptor = opts->wal.wal_lock_interceptor; in iwal_create()
1034 wal->wal_lock_interceptor_opaque = opts->wal.wal_lock_interceptor_opaque; in iwal_create()
1039 free(wal); in iwal_create()
1046 wal->fh = INVALID_HANDLE_VALUE; in iwal_create()
1047 wal->path = wpath; in iwal_create()
1048 wal->oflags = opts->oflags; in iwal_create()
1049 wal->iwkv = iwkv; in iwal_create()
1050 iwp_current_time_ms(&wal->checkpoint_ts, true); in iwal_create()
1052 rc = _init_locks(wal); in iwal_create()
1055 IWDLSNR *dlsnr = &wal->lsnr; in iwal_create()
1063 iwkv->dlsnr = (IWDLSNR *) wal; in iwal_create()
1065 wal->wal_buffer_sz = in iwal_create()
1066 opts->wal.wal_buffer_sz > 0 ? in iwal_create()
1067 opts->wal.wal_buffer_sz : in iwal_create()
1073 if (wal->wal_buffer_sz < 4096) { in iwal_create()
1074 wal->wal_buffer_sz = 4096; in iwal_create()
1077 wal->checkpoint_buffer_sz in iwal_create()
1078 = opts->wal.checkpoint_buffer_sz > 0 ? in iwal_create()
1079 opts->wal.checkpoint_buffer_sz : in iwal_create()
1085 if (wal->checkpoint_buffer_sz < 1024 * 1024) { // 1M minimal in iwal_create()
1086 wal->checkpoint_buffer_sz = 1024 * 1024; in iwal_create()
1089 wal->savepoint_timeout_sec in iwal_create()
1090 = opts->wal.savepoint_timeout_sec > 0 ? in iwal_create()
1091 opts->wal.savepoint_timeout_sec : 10; // 10 sec in iwal_create()
1093 wal->checkpoint_timeout_sec in iwal_create()
1094 = opts->wal.checkpoint_timeout_sec > 0 ? in iwal_create()
1096 opts->wal.checkpoint_timeout_sec : 60; // 1 min in iwal_create()
1098 opts->wal.checkpoint_timeout_sec : 300; // 5 min in iwal_create()
1101 if (wal->checkpoint_timeout_sec < 10) { // 10 sec minimal in iwal_create()
1102 wal->checkpoint_timeout_sec = 10; in iwal_create()
1104 if (wal->savepoint_timeout_sec >= wal->checkpoint_timeout_sec) { in iwal_create()
1105 wal->savepoint_timeout_sec = wal->checkpoint_timeout_sec / 2; in iwal_create()
1108 wal->check_cp_crc = opts->wal.check_crc_on_checkpoint; in iwal_create()
1110 wal->buf = malloc(wal->wal_buffer_sz); in iwal_create()
1111 if (!wal->buf) { in iwal_create()
1115 wal->buf += sizeof(WBSEP); in iwal_create()
1116 wal->bufsz = wal->wal_buffer_sz - sizeof(WBSEP); in iwal_create()
1121 HANDLE fh = open(wal->path, O_CREAT | O_RDWR, IWFS_DEFAULT_FILEMODE); in iwal_create()
1127 HANDLE fh = CreateFile(wal->path, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, in iwal_create()
1135 wal->fh = fh; in iwal_create()
1136 rc = iwp_flock(wal->fh, IWP_WLOCK); in iwal_create()
1145 if (wal->oflags & IWKV_TRUNC) { in iwal_create()
1146 rc = _truncate_wl(wal); in iwal_create()
1149 rc = _recover_wl(iwkv, wal, fsmopts, recover_backup); in iwal_create()
1153 wal->open = true; in iwal_create()
1155 rc = _init_cpt(wal); in iwal_create()
1162 _destroy(wal); in iwal_create()