1 //! `feature = "session"` [Session Extension](https://sqlite.org/sessionintro.html)
2 #![allow(non_camel_case_types)]
3
4 use std::ffi::CStr;
5 use std::io::{Read, Write};
6 use std::marker::PhantomData;
7 use std::os::raw::{c_char, c_int, c_uchar, c_void};
8 use std::panic::{catch_unwind, RefUnwindSafe};
9 use std::ptr;
10 use std::slice::{from_raw_parts, from_raw_parts_mut};
11
12 use fallible_streaming_iterator::FallibleStreamingIterator;
13
14 use crate::error::error_from_sqlite_code;
15 use crate::ffi;
16 use crate::hooks::Action;
17 use crate::types::ValueRef;
18 use crate::{errmsg_to_string, str_to_cstring, Connection, DatabaseName, Result};
19
20 // https://sqlite.org/session.html
21
22 /// `feature = "session"` An instance of this object is a session that can be
23 /// used to record changes to a database.
24 pub struct Session<'conn> {
25 phantom: PhantomData<&'conn Connection>,
26 s: *mut ffi::sqlite3_session,
27 filter: Option<Box<dyn Fn(&str) -> bool>>,
28 }
29
30 impl Session<'_> {
31 /// Create a new session object
new<'conn>(db: &'conn Connection) -> Result<Session<'conn>>32 pub fn new<'conn>(db: &'conn Connection) -> Result<Session<'conn>> {
33 Session::new_with_name(db, DatabaseName::Main)
34 }
35
36 /// Create a new session object
new_with_name<'conn>( db: &'conn Connection, name: DatabaseName<'_>, ) -> Result<Session<'conn>>37 pub fn new_with_name<'conn>(
38 db: &'conn Connection,
39 name: DatabaseName<'_>,
40 ) -> Result<Session<'conn>> {
41 let name = name.to_cstring()?;
42
43 let db = db.db.borrow_mut().db;
44
45 let mut s: *mut ffi::sqlite3_session = ptr::null_mut();
46 check!(unsafe { ffi::sqlite3session_create(db, name.as_ptr(), &mut s) });
47
48 Ok(Session {
49 phantom: PhantomData,
50 s,
51 filter: None,
52 })
53 }
54
55 /// Set a table filter
table_filter<F>(&mut self, filter: Option<F>) where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,56 pub fn table_filter<F>(&mut self, filter: Option<F>)
57 where
58 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
59 {
60 unsafe extern "C" fn call_boxed_closure<F>(
61 p_arg: *mut c_void,
62 tbl_str: *const c_char,
63 ) -> c_int
64 where
65 F: Fn(&str) -> bool + RefUnwindSafe,
66 {
67 use std::str;
68
69 let boxed_filter: *mut F = p_arg as *mut F;
70 let tbl_name = {
71 let c_slice = CStr::from_ptr(tbl_str).to_bytes();
72 str::from_utf8(c_slice)
73 };
74 if let Ok(true) =
75 catch_unwind(|| (*boxed_filter)(tbl_name.expect("non-utf8 table name")))
76 {
77 1
78 } else {
79 0
80 }
81 }
82
83 match filter {
84 Some(filter) => {
85 let boxed_filter = Box::new(filter);
86 unsafe {
87 ffi::sqlite3session_table_filter(
88 self.s,
89 Some(call_boxed_closure::<F>),
90 &*boxed_filter as *const F as *mut _,
91 );
92 }
93 self.filter = Some(boxed_filter);
94 }
95 _ => {
96 unsafe { ffi::sqlite3session_table_filter(self.s, None, ptr::null_mut()) }
97 self.filter = None;
98 }
99 };
100 }
101
102 /// Attach a table. `None` means all tables.
attach(&mut self, table: Option<&str>) -> Result<()>103 pub fn attach(&mut self, table: Option<&str>) -> Result<()> {
104 let table = if let Some(table) = table {
105 Some(str_to_cstring(table)?)
106 } else {
107 None
108 };
109 let table = table.as_ref().map(|s| s.as_ptr()).unwrap_or(ptr::null());
110 unsafe { check!(ffi::sqlite3session_attach(self.s, table)) };
111 Ok(())
112 }
113
114 /// Generate a Changeset
changeset(&mut self) -> Result<Changeset>115 pub fn changeset(&mut self) -> Result<Changeset> {
116 let mut n = 0;
117 let mut cs: *mut c_void = ptr::null_mut();
118 check!(unsafe { ffi::sqlite3session_changeset(self.s, &mut n, &mut cs) });
119 Ok(Changeset { cs, n })
120 }
121
122 /// Write the set of changes represented by this session to `output`.
changeset_strm(&mut self, output: &mut dyn Write) -> Result<()>123 pub fn changeset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
124 let output_ref = &output;
125 check!(unsafe {
126 ffi::sqlite3session_changeset_strm(
127 self.s,
128 Some(x_output),
129 output_ref as *const &mut dyn Write as *mut c_void,
130 )
131 });
132 Ok(())
133 }
134
135 /// Generate a Patchset
patchset(&mut self) -> Result<Changeset>136 pub fn patchset(&mut self) -> Result<Changeset> {
137 let mut n = 0;
138 let mut ps: *mut c_void = ptr::null_mut();
139 check!(unsafe { ffi::sqlite3session_patchset(self.s, &mut n, &mut ps) });
140 // TODO Validate: same struct
141 Ok(Changeset { cs: ps, n })
142 }
143
144 /// Write the set of patches represented by this session to `output`.
patchset_strm(&mut self, output: &mut dyn Write) -> Result<()>145 pub fn patchset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
146 let output_ref = &output;
147 check!(unsafe {
148 ffi::sqlite3session_patchset_strm(
149 self.s,
150 Some(x_output),
151 output_ref as *const &mut dyn Write as *mut c_void,
152 )
153 });
154 Ok(())
155 }
156
157 /// Load the difference between tables.
diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()>158 pub fn diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()> {
159 let from = from.to_cstring()?;
160 let table = str_to_cstring(table)?;
161 let table = table.as_ptr();
162 unsafe {
163 let mut errmsg = ptr::null_mut();
164 let r =
165 ffi::sqlite3session_diff(self.s, from.as_ptr(), table, &mut errmsg as *mut *mut _);
166 if r != ffi::SQLITE_OK {
167 let errmsg: *mut c_char = errmsg;
168 let message = errmsg_to_string(&*errmsg);
169 ffi::sqlite3_free(errmsg as *mut ::std::os::raw::c_void);
170 return Err(error_from_sqlite_code(r, Some(message)));
171 }
172 }
173 Ok(())
174 }
175
176 /// Test if a changeset has recorded any changes
is_empty(&self) -> bool177 pub fn is_empty(&self) -> bool {
178 unsafe { ffi::sqlite3session_isempty(self.s) != 0 }
179 }
180
181 /// Query the current state of the session
is_enabled(&self) -> bool182 pub fn is_enabled(&self) -> bool {
183 unsafe { ffi::sqlite3session_enable(self.s, -1) != 0 }
184 }
185
186 /// Enable or disable the recording of changes
set_enabled(&mut self, enabled: bool)187 pub fn set_enabled(&mut self, enabled: bool) {
188 unsafe {
189 ffi::sqlite3session_enable(self.s, if enabled { 1 } else { 0 });
190 }
191 }
192
193 /// Query the current state of the indirect flag
is_indirect(&self) -> bool194 pub fn is_indirect(&self) -> bool {
195 unsafe { ffi::sqlite3session_indirect(self.s, -1) != 0 }
196 }
197
198 /// Set or clear the indirect change flag
set_indirect(&mut self, indirect: bool)199 pub fn set_indirect(&mut self, indirect: bool) {
200 unsafe {
201 ffi::sqlite3session_indirect(self.s, if indirect { 1 } else { 0 });
202 }
203 }
204 }
205
206 impl Drop for Session<'_> {
drop(&mut self)207 fn drop(&mut self) {
208 if self.filter.is_some() {
209 self.table_filter(None::<fn(&str) -> bool>);
210 }
211 unsafe { ffi::sqlite3session_delete(self.s) };
212 }
213 }
214
215 /// `feature = "session"` Invert a changeset
invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()>216 pub fn invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()> {
217 let input_ref = &input;
218 let output_ref = &output;
219 check!(unsafe {
220 ffi::sqlite3changeset_invert_strm(
221 Some(x_input),
222 input_ref as *const &mut dyn Read as *mut c_void,
223 Some(x_output),
224 output_ref as *const &mut dyn Write as *mut c_void,
225 )
226 });
227 Ok(())
228 }
229
230 /// `feature = "session"` Combine two changesets
concat_strm( input_a: &mut dyn Read, input_b: &mut dyn Read, output: &mut dyn Write, ) -> Result<()>231 pub fn concat_strm(
232 input_a: &mut dyn Read,
233 input_b: &mut dyn Read,
234 output: &mut dyn Write,
235 ) -> Result<()> {
236 let input_a_ref = &input_a;
237 let input_b_ref = &input_b;
238 let output_ref = &output;
239 check!(unsafe {
240 ffi::sqlite3changeset_concat_strm(
241 Some(x_input),
242 input_a_ref as *const &mut dyn Read as *mut c_void,
243 Some(x_input),
244 input_b_ref as *const &mut dyn Read as *mut c_void,
245 Some(x_output),
246 output_ref as *const &mut dyn Write as *mut c_void,
247 )
248 });
249 Ok(())
250 }
251
252 /// `feature = "session"` Changeset or Patchset
253 pub struct Changeset {
254 cs: *mut c_void,
255 n: c_int,
256 }
257
258 impl Changeset {
259 /// Invert a changeset
invert(&self) -> Result<Changeset>260 pub fn invert(&self) -> Result<Changeset> {
261 let mut n = 0;
262 let mut cs = ptr::null_mut();
263 check!(unsafe {
264 ffi::sqlite3changeset_invert(self.n, self.cs, &mut n, &mut cs as *mut *mut _)
265 });
266 Ok(Changeset { cs, n })
267 }
268
269 /// Create an iterator to traverse a changeset
iter(&self) -> Result<ChangesetIter<'_>>270 pub fn iter(&self) -> Result<ChangesetIter<'_>> {
271 let mut it = ptr::null_mut();
272 check!(unsafe { ffi::sqlite3changeset_start(&mut it as *mut *mut _, self.n, self.cs) });
273 Ok(ChangesetIter {
274 phantom: PhantomData,
275 it,
276 item: None,
277 })
278 }
279
280 /// Concatenate two changeset objects
concat(a: &Changeset, b: &Changeset) -> Result<Changeset>281 pub fn concat(a: &Changeset, b: &Changeset) -> Result<Changeset> {
282 let mut n = 0;
283 let mut cs = ptr::null_mut();
284 check!(unsafe {
285 ffi::sqlite3changeset_concat(a.n, a.cs, b.n, b.cs, &mut n, &mut cs as *mut *mut _)
286 });
287 Ok(Changeset { cs, n })
288 }
289 }
290
291 impl Drop for Changeset {
drop(&mut self)292 fn drop(&mut self) {
293 unsafe {
294 ffi::sqlite3_free(self.cs);
295 }
296 }
297 }
298
299 /// `feature = "session"` Cursor for iterating over the elements of a changeset
300 /// or patchset.
301 pub struct ChangesetIter<'changeset> {
302 phantom: PhantomData<&'changeset Changeset>,
303 it: *mut ffi::sqlite3_changeset_iter,
304 item: Option<ChangesetItem>,
305 }
306
307 impl ChangesetIter<'_> {
308 /// Create an iterator on `input`
start_strm<'input>(input: &&'input mut dyn Read) -> Result<ChangesetIter<'input>>309 pub fn start_strm<'input>(input: &&'input mut dyn Read) -> Result<ChangesetIter<'input>> {
310 let mut it = ptr::null_mut();
311 check!(unsafe {
312 ffi::sqlite3changeset_start_strm(
313 &mut it as *mut *mut _,
314 Some(x_input),
315 input as *const &mut dyn Read as *mut c_void,
316 )
317 });
318 Ok(ChangesetIter {
319 phantom: PhantomData,
320 it,
321 item: None,
322 })
323 }
324 }
325
326 impl FallibleStreamingIterator for ChangesetIter<'_> {
327 type Error = crate::error::Error;
328 type Item = ChangesetItem;
329
advance(&mut self) -> Result<()>330 fn advance(&mut self) -> Result<()> {
331 let rc = unsafe { ffi::sqlite3changeset_next(self.it) };
332 match rc {
333 ffi::SQLITE_ROW => {
334 self.item = Some(ChangesetItem { it: self.it });
335 Ok(())
336 }
337 ffi::SQLITE_DONE => {
338 self.item = None;
339 Ok(())
340 }
341 code => Err(error_from_sqlite_code(code, None)),
342 }
343 }
344
get(&self) -> Option<&ChangesetItem>345 fn get(&self) -> Option<&ChangesetItem> {
346 self.item.as_ref()
347 }
348 }
349
350 /// `feature = "session"`
351 pub struct Operation<'item> {
352 table_name: &'item str,
353 number_of_columns: i32,
354 code: Action,
355 indirect: bool,
356 }
357
358 impl Operation<'_> {
359 /// Returns the table name.
table_name(&self) -> &str360 pub fn table_name(&self) -> &str {
361 self.table_name
362 }
363
364 /// Returns the number of columns in table
number_of_columns(&self) -> i32365 pub fn number_of_columns(&self) -> i32 {
366 self.number_of_columns
367 }
368
369 /// Returns the action code.
code(&self) -> Action370 pub fn code(&self) -> Action {
371 self.code
372 }
373
374 /// Returns `true` for an 'indirect' change.
indirect(&self) -> bool375 pub fn indirect(&self) -> bool {
376 self.indirect
377 }
378 }
379
380 impl Drop for ChangesetIter<'_> {
drop(&mut self)381 fn drop(&mut self) {
382 unsafe {
383 ffi::sqlite3changeset_finalize(self.it);
384 }
385 }
386 }
387
388 /// `feature = "session"` An item passed to a conflict-handler by
389 /// `Connection::apply`, or an item generated by `ChangesetIter::next`.
390 // TODO enum ? Delete, Insert, Update, ...
391 pub struct ChangesetItem {
392 it: *mut ffi::sqlite3_changeset_iter,
393 }
394
395 impl ChangesetItem {
396 /// Obtain conflicting row values
397 ///
398 /// May only be called with an `SQLITE_CHANGESET_DATA` or
399 /// `SQLITE_CHANGESET_CONFLICT` conflict handler callback.
conflict(&self, col: usize) -> Result<ValueRef<'_>>400 pub fn conflict(&self, col: usize) -> Result<ValueRef<'_>> {
401 unsafe {
402 let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
403 check!(ffi::sqlite3changeset_conflict(
404 self.it,
405 col as i32,
406 &mut p_value,
407 ));
408 Ok(ValueRef::from_value(p_value))
409 }
410 }
411
412 /// Determine the number of foreign key constraint violations
413 ///
414 /// May only be called with an `SQLITE_CHANGESET_FOREIGN_KEY` conflict
415 /// handler callback.
fk_conflicts(&self) -> Result<i32>416 pub fn fk_conflicts(&self) -> Result<i32> {
417 unsafe {
418 let mut p_out = 0;
419 check!(ffi::sqlite3changeset_fk_conflicts(self.it, &mut p_out));
420 Ok(p_out)
421 }
422 }
423
424 /// Obtain new.* Values
425 ///
426 /// May only be called if the type of change is either `SQLITE_UPDATE` or
427 /// `SQLITE_INSERT`.
new_value(&self, col: usize) -> Result<ValueRef<'_>>428 pub fn new_value(&self, col: usize) -> Result<ValueRef<'_>> {
429 unsafe {
430 let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
431 check!(ffi::sqlite3changeset_new(self.it, col as i32, &mut p_value,));
432 Ok(ValueRef::from_value(p_value))
433 }
434 }
435
436 /// Obtain old.* Values
437 ///
438 /// May only be called if the type of change is either `SQLITE_DELETE` or
439 /// `SQLITE_UPDATE`.
old_value(&self, col: usize) -> Result<ValueRef<'_>>440 pub fn old_value(&self, col: usize) -> Result<ValueRef<'_>> {
441 unsafe {
442 let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
443 check!(ffi::sqlite3changeset_old(self.it, col as i32, &mut p_value,));
444 Ok(ValueRef::from_value(p_value))
445 }
446 }
447
448 /// Obtain the current operation
op(&self) -> Result<Operation<'_>>449 pub fn op(&self) -> Result<Operation<'_>> {
450 let mut number_of_columns = 0;
451 let mut code = 0;
452 let mut indirect = 0;
453 let tab = unsafe {
454 let mut pz_tab: *const c_char = ptr::null();
455 check!(ffi::sqlite3changeset_op(
456 self.it,
457 &mut pz_tab,
458 &mut number_of_columns,
459 &mut code,
460 &mut indirect
461 ));
462 CStr::from_ptr(pz_tab)
463 };
464 let table_name = tab.to_str()?;
465 Ok(Operation {
466 table_name,
467 number_of_columns,
468 code: Action::from(code),
469 indirect: indirect != 0,
470 })
471 }
472
473 /// Obtain the primary key definition of a table
pk(&self) -> Result<&[u8]>474 pub fn pk(&self) -> Result<&[u8]> {
475 let mut number_of_columns = 0;
476 unsafe {
477 let mut pks: *mut c_uchar = ptr::null_mut();
478 check!(ffi::sqlite3changeset_pk(
479 self.it,
480 &mut pks,
481 &mut number_of_columns
482 ));
483 Ok(from_raw_parts(pks, number_of_columns as usize))
484 }
485 }
486 }
487
488 /// `feature = "session"` Used to combine two or more changesets or
489 /// patchsets
490 pub struct Changegroup {
491 cg: *mut ffi::sqlite3_changegroup,
492 }
493
494 impl Changegroup {
495 /// Create a new change group.
new() -> Result<Self>496 pub fn new() -> Result<Self> {
497 let mut cg = ptr::null_mut();
498 check!(unsafe { ffi::sqlite3changegroup_new(&mut cg) });
499 Ok(Changegroup { cg })
500 }
501
502 /// Add a changeset
add(&mut self, cs: &Changeset) -> Result<()>503 pub fn add(&mut self, cs: &Changeset) -> Result<()> {
504 check!(unsafe { ffi::sqlite3changegroup_add(self.cg, cs.n, cs.cs) });
505 Ok(())
506 }
507
508 /// Add a changeset read from `input` to this change group.
add_stream(&mut self, input: &mut dyn Read) -> Result<()>509 pub fn add_stream(&mut self, input: &mut dyn Read) -> Result<()> {
510 let input_ref = &input;
511 check!(unsafe {
512 ffi::sqlite3changegroup_add_strm(
513 self.cg,
514 Some(x_input),
515 input_ref as *const &mut dyn Read as *mut c_void,
516 )
517 });
518 Ok(())
519 }
520
521 /// Obtain a composite Changeset
output(&mut self) -> Result<Changeset>522 pub fn output(&mut self) -> Result<Changeset> {
523 let mut n = 0;
524 let mut output: *mut c_void = ptr::null_mut();
525 check!(unsafe { ffi::sqlite3changegroup_output(self.cg, &mut n, &mut output) });
526 Ok(Changeset { cs: output, n })
527 }
528
529 /// Write the combined set of changes to `output`.
output_strm(&mut self, output: &mut dyn Write) -> Result<()>530 pub fn output_strm(&mut self, output: &mut dyn Write) -> Result<()> {
531 let output_ref = &output;
532 check!(unsafe {
533 ffi::sqlite3changegroup_output_strm(
534 self.cg,
535 Some(x_output),
536 output_ref as *const &mut dyn Write as *mut c_void,
537 )
538 });
539 Ok(())
540 }
541 }
542
543 impl Drop for Changegroup {
drop(&mut self)544 fn drop(&mut self) {
545 unsafe {
546 ffi::sqlite3changegroup_delete(self.cg);
547 }
548 }
549 }
550
551 impl Connection {
552 /// `feature = "session"` Apply a changeset to a database
apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()> where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,553 pub fn apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()>
554 where
555 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
556 C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
557 {
558 let db = self.db.borrow_mut().db;
559
560 let filtered = filter.is_some();
561 let tuple = &mut (filter, conflict);
562 check!(unsafe {
563 if filtered {
564 ffi::sqlite3changeset_apply(
565 db,
566 cs.n,
567 cs.cs,
568 Some(call_filter::<F, C>),
569 Some(call_conflict::<F, C>),
570 tuple as *mut (Option<F>, C) as *mut c_void,
571 )
572 } else {
573 ffi::sqlite3changeset_apply(
574 db,
575 cs.n,
576 cs.cs,
577 None,
578 Some(call_conflict::<F, C>),
579 tuple as *mut (Option<F>, C) as *mut c_void,
580 )
581 }
582 });
583 Ok(())
584 }
585
586 /// `feature = "session"` Apply a changeset to a database
apply_strm<F, C>( &self, input: &mut dyn Read, filter: Option<F>, conflict: C, ) -> Result<()> where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,587 pub fn apply_strm<F, C>(
588 &self,
589 input: &mut dyn Read,
590 filter: Option<F>,
591 conflict: C,
592 ) -> Result<()>
593 where
594 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
595 C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
596 {
597 let input_ref = &input;
598 let db = self.db.borrow_mut().db;
599
600 let filtered = filter.is_some();
601 let tuple = &mut (filter, conflict);
602 check!(unsafe {
603 if filtered {
604 ffi::sqlite3changeset_apply_strm(
605 db,
606 Some(x_input),
607 input_ref as *const &mut dyn Read as *mut c_void,
608 Some(call_filter::<F, C>),
609 Some(call_conflict::<F, C>),
610 tuple as *mut (Option<F>, C) as *mut c_void,
611 )
612 } else {
613 ffi::sqlite3changeset_apply_strm(
614 db,
615 Some(x_input),
616 input_ref as *const &mut dyn Read as *mut c_void,
617 None,
618 Some(call_conflict::<F, C>),
619 tuple as *mut (Option<F>, C) as *mut c_void,
620 )
621 }
622 });
623 Ok(())
624 }
625 }
626
627 /// `feature = "session"` Constants passed to the conflict handler
628 /// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_CONFLICT) for details.
629 #[allow(missing_docs)]
630 #[repr(i32)]
631 #[derive(Debug, PartialEq)]
632 #[non_exhaustive]
633 pub enum ConflictType {
634 UNKNOWN = -1,
635 SQLITE_CHANGESET_DATA = ffi::SQLITE_CHANGESET_DATA,
636 SQLITE_CHANGESET_NOTFOUND = ffi::SQLITE_CHANGESET_NOTFOUND,
637 SQLITE_CHANGESET_CONFLICT = ffi::SQLITE_CHANGESET_CONFLICT,
638 SQLITE_CHANGESET_CONSTRAINT = ffi::SQLITE_CHANGESET_CONSTRAINT,
639 SQLITE_CHANGESET_FOREIGN_KEY = ffi::SQLITE_CHANGESET_FOREIGN_KEY,
640 }
641 impl From<i32> for ConflictType {
from(code: i32) -> ConflictType642 fn from(code: i32) -> ConflictType {
643 match code {
644 ffi::SQLITE_CHANGESET_DATA => ConflictType::SQLITE_CHANGESET_DATA,
645 ffi::SQLITE_CHANGESET_NOTFOUND => ConflictType::SQLITE_CHANGESET_NOTFOUND,
646 ffi::SQLITE_CHANGESET_CONFLICT => ConflictType::SQLITE_CHANGESET_CONFLICT,
647 ffi::SQLITE_CHANGESET_CONSTRAINT => ConflictType::SQLITE_CHANGESET_CONSTRAINT,
648 ffi::SQLITE_CHANGESET_FOREIGN_KEY => ConflictType::SQLITE_CHANGESET_FOREIGN_KEY,
649 _ => ConflictType::UNKNOWN,
650 }
651 }
652 }
653
654 /// `feature = "session"` Constants returned by the conflict handler
655 /// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_ABORT) for details.
656 #[allow(missing_docs)]
657 #[repr(i32)]
658 #[derive(Debug, PartialEq)]
659 #[non_exhaustive]
660 pub enum ConflictAction {
661 SQLITE_CHANGESET_OMIT = ffi::SQLITE_CHANGESET_OMIT,
662 SQLITE_CHANGESET_REPLACE = ffi::SQLITE_CHANGESET_REPLACE,
663 SQLITE_CHANGESET_ABORT = ffi::SQLITE_CHANGESET_ABORT,
664 }
665
call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,666 unsafe extern "C" fn call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int
667 where
668 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
669 C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
670 {
671 use std::str;
672
673 let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
674 let tbl_name = {
675 let c_slice = CStr::from_ptr(tbl_str).to_bytes();
676 str::from_utf8(c_slice)
677 };
678 match *tuple {
679 (Some(ref filter), _) => {
680 if let Ok(true) = catch_unwind(|| filter(tbl_name.expect("illegal table name"))) {
681 1
682 } else {
683 0
684 }
685 }
686 _ => unimplemented!(),
687 }
688 }
689
call_conflict<F, C>( p_ctx: *mut c_void, e_conflict: c_int, p: *mut ffi::sqlite3_changeset_iter, ) -> c_int where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,690 unsafe extern "C" fn call_conflict<F, C>(
691 p_ctx: *mut c_void,
692 e_conflict: c_int,
693 p: *mut ffi::sqlite3_changeset_iter,
694 ) -> c_int
695 where
696 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
697 C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
698 {
699 let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
700 let conflict_type = ConflictType::from(e_conflict);
701 let item = ChangesetItem { it: p };
702 if let Ok(action) = catch_unwind(|| (*tuple).1(conflict_type, item)) {
703 action as c_int
704 } else {
705 ffi::SQLITE_CHANGESET_ABORT
706 }
707 }
708
x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int709 unsafe extern "C" fn x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int {
710 if p_in.is_null() {
711 return ffi::SQLITE_MISUSE;
712 }
713 let bytes: &mut [u8] = from_raw_parts_mut(data as *mut u8, *len as usize);
714 let input = p_in as *mut &mut dyn Read;
715 match (*input).read(bytes) {
716 Ok(n) => {
717 *len = n as i32; // TODO Validate: n = 0 may not mean the reader will always no longer be able to
718 // produce bytes.
719 ffi::SQLITE_OK
720 }
721 Err(_) => ffi::SQLITE_IOERR_READ, // TODO check if err is a (ru)sqlite Error => propagate
722 }
723 }
724
x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int725 unsafe extern "C" fn x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int {
726 if p_out.is_null() {
727 return ffi::SQLITE_MISUSE;
728 }
729 // The sessions module never invokes an xOutput callback with the third
730 // parameter set to a value less than or equal to zero.
731 let bytes: &[u8] = from_raw_parts(data as *const u8, len as usize);
732 let output = p_out as *mut &mut dyn Write;
733 match (*output).write_all(bytes) {
734 Ok(_) => ffi::SQLITE_OK,
735 Err(_) => ffi::SQLITE_IOERR_WRITE, // TODO check if err is a (ru)sqlite Error => propagate
736 }
737 }
738
739 #[cfg(test)]
740 mod test {
741 use fallible_streaming_iterator::FallibleStreamingIterator;
742 use std::io::Read;
743 use std::sync::atomic::{AtomicBool, Ordering};
744
745 use super::{Changeset, ChangesetIter, ConflictAction, ConflictType, Session};
746 use crate::hooks::Action;
747 use crate::Connection;
748
one_changeset() -> Changeset749 fn one_changeset() -> Changeset {
750 let db = Connection::open_in_memory().unwrap();
751 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
752 .unwrap();
753
754 let mut session = Session::new(&db).unwrap();
755 assert!(session.is_empty());
756
757 session.attach(None).unwrap();
758 db.execute("INSERT INTO foo (t) VALUES (?);", &["bar"])
759 .unwrap();
760
761 session.changeset().unwrap()
762 }
763
one_changeset_strm() -> Vec<u8>764 fn one_changeset_strm() -> Vec<u8> {
765 let db = Connection::open_in_memory().unwrap();
766 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
767 .unwrap();
768
769 let mut session = Session::new(&db).unwrap();
770 assert!(session.is_empty());
771
772 session.attach(None).unwrap();
773 db.execute("INSERT INTO foo (t) VALUES (?);", &["bar"])
774 .unwrap();
775
776 let mut output = Vec::new();
777 session.changeset_strm(&mut output).unwrap();
778 output
779 }
780
781 #[test]
test_changeset()782 fn test_changeset() {
783 let changeset = one_changeset();
784 let mut iter = changeset.iter().unwrap();
785 let item = iter.next().unwrap();
786 assert!(item.is_some());
787
788 let item = item.unwrap();
789 let op = item.op().unwrap();
790 assert_eq!("foo", op.table_name());
791 assert_eq!(1, op.number_of_columns());
792 assert_eq!(Action::SQLITE_INSERT, op.code());
793 assert_eq!(false, op.indirect());
794
795 let pk = item.pk().unwrap();
796 assert_eq!(&[1], pk);
797
798 let new_value = item.new_value(0).unwrap();
799 assert_eq!(Ok("bar"), new_value.as_str());
800 }
801
802 #[test]
test_changeset_strm()803 fn test_changeset_strm() {
804 let output = one_changeset_strm();
805 assert!(!output.is_empty());
806 assert_eq!(14, output.len());
807
808 let input: &mut dyn Read = &mut output.as_slice();
809 let mut iter = ChangesetIter::start_strm(&input).unwrap();
810 let item = iter.next().unwrap();
811 assert!(item.is_some());
812 }
813
814 #[test]
test_changeset_apply()815 fn test_changeset_apply() {
816 let changeset = one_changeset();
817
818 let db = Connection::open_in_memory().unwrap();
819 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
820 .unwrap();
821
822 lazy_static::lazy_static! {
823 static ref CALLED: AtomicBool = AtomicBool::new(false);
824 }
825 db.apply(
826 &changeset,
827 None::<fn(&str) -> bool>,
828 |_conflict_type, _item| {
829 CALLED.store(true, Ordering::Relaxed);
830 ConflictAction::SQLITE_CHANGESET_OMIT
831 },
832 )
833 .unwrap();
834
835 assert!(!CALLED.load(Ordering::Relaxed));
836 let check = db
837 .query_row("SELECT 1 FROM foo WHERE t = ?", &["bar"], |row| {
838 row.get::<_, i32>(0)
839 })
840 .unwrap();
841 assert_eq!(1, check);
842
843 // conflict expected when same changeset applied again on the same db
844 db.apply(
845 &changeset,
846 None::<fn(&str) -> bool>,
847 |conflict_type, item| {
848 CALLED.store(true, Ordering::Relaxed);
849 assert_eq!(ConflictType::SQLITE_CHANGESET_CONFLICT, conflict_type);
850 let conflict = item.conflict(0).unwrap();
851 assert_eq!(Ok("bar"), conflict.as_str());
852 ConflictAction::SQLITE_CHANGESET_OMIT
853 },
854 )
855 .unwrap();
856 assert!(CALLED.load(Ordering::Relaxed));
857 }
858
859 #[test]
test_changeset_apply_strm()860 fn test_changeset_apply_strm() {
861 let output = one_changeset_strm();
862
863 let db = Connection::open_in_memory().unwrap();
864 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
865 .unwrap();
866
867 let mut input = output.as_slice();
868 db.apply_strm(
869 &mut input,
870 None::<fn(&str) -> bool>,
871 |_conflict_type, _item| ConflictAction::SQLITE_CHANGESET_OMIT,
872 )
873 .unwrap();
874
875 let check = db
876 .query_row("SELECT 1 FROM foo WHERE t = ?", &["bar"], |row| {
877 row.get::<_, i32>(0)
878 })
879 .unwrap();
880 assert_eq!(1, check);
881 }
882
883 #[test]
test_session_empty()884 fn test_session_empty() {
885 let db = Connection::open_in_memory().unwrap();
886 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
887 .unwrap();
888
889 let mut session = Session::new(&db).unwrap();
890 assert!(session.is_empty());
891
892 session.attach(None).unwrap();
893 db.execute("INSERT INTO foo (t) VALUES (?);", &["bar"])
894 .unwrap();
895
896 assert!(!session.is_empty());
897 }
898
899 #[test]
test_session_set_enabled()900 fn test_session_set_enabled() {
901 let db = Connection::open_in_memory().unwrap();
902
903 let mut session = Session::new(&db).unwrap();
904 assert!(session.is_enabled());
905 session.set_enabled(false);
906 assert!(!session.is_enabled());
907 }
908
909 #[test]
test_session_set_indirect()910 fn test_session_set_indirect() {
911 let db = Connection::open_in_memory().unwrap();
912
913 let mut session = Session::new(&db).unwrap();
914 assert!(!session.is_indirect());
915 session.set_indirect(true);
916 assert!(session.is_indirect());
917 }
918 }
919