• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! `feature = "session"` [Session Extension](https://sqlite.org/sessionintro.html)
2 #![allow(non_camel_case_types)]
3 
4 use std::ffi::CStr;
5 use std::io::{Read, Write};
6 use std::marker::PhantomData;
7 use std::os::raw::{c_char, c_int, c_uchar, c_void};
8 use std::panic::{catch_unwind, RefUnwindSafe};
9 use std::ptr;
10 use std::slice::{from_raw_parts, from_raw_parts_mut};
11 
12 use fallible_streaming_iterator::FallibleStreamingIterator;
13 
14 use crate::error::error_from_sqlite_code;
15 use crate::ffi;
16 use crate::hooks::Action;
17 use crate::types::ValueRef;
18 use crate::{errmsg_to_string, str_to_cstring, Connection, DatabaseName, Result};
19 
20 // https://sqlite.org/session.html
21 
22 /// `feature = "session"` An instance of this object is a session that can be
23 /// used to record changes to a database.
24 pub struct Session<'conn> {
25     phantom: PhantomData<&'conn Connection>,
26     s: *mut ffi::sqlite3_session,
27     filter: Option<Box<dyn Fn(&str) -> bool>>,
28 }
29 
30 impl Session<'_> {
31     /// Create a new session object
new<'conn>(db: &'conn Connection) -> Result<Session<'conn>>32     pub fn new<'conn>(db: &'conn Connection) -> Result<Session<'conn>> {
33         Session::new_with_name(db, DatabaseName::Main)
34     }
35 
36     /// Create a new session object
new_with_name<'conn>( db: &'conn Connection, name: DatabaseName<'_>, ) -> Result<Session<'conn>>37     pub fn new_with_name<'conn>(
38         db: &'conn Connection,
39         name: DatabaseName<'_>,
40     ) -> Result<Session<'conn>> {
41         let name = name.to_cstring()?;
42 
43         let db = db.db.borrow_mut().db;
44 
45         let mut s: *mut ffi::sqlite3_session = ptr::null_mut();
46         check!(unsafe { ffi::sqlite3session_create(db, name.as_ptr(), &mut s) });
47 
48         Ok(Session {
49             phantom: PhantomData,
50             s,
51             filter: None,
52         })
53     }
54 
55     /// Set a table filter
table_filter<F>(&mut self, filter: Option<F>) where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,56     pub fn table_filter<F>(&mut self, filter: Option<F>)
57     where
58         F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
59     {
60         unsafe extern "C" fn call_boxed_closure<F>(
61             p_arg: *mut c_void,
62             tbl_str: *const c_char,
63         ) -> c_int
64         where
65             F: Fn(&str) -> bool + RefUnwindSafe,
66         {
67             use std::str;
68 
69             let boxed_filter: *mut F = p_arg as *mut F;
70             let tbl_name = {
71                 let c_slice = CStr::from_ptr(tbl_str).to_bytes();
72                 str::from_utf8(c_slice)
73             };
74             if let Ok(true) =
75                 catch_unwind(|| (*boxed_filter)(tbl_name.expect("non-utf8 table name")))
76             {
77                 1
78             } else {
79                 0
80             }
81         }
82 
83         match filter {
84             Some(filter) => {
85                 let boxed_filter = Box::new(filter);
86                 unsafe {
87                     ffi::sqlite3session_table_filter(
88                         self.s,
89                         Some(call_boxed_closure::<F>),
90                         &*boxed_filter as *const F as *mut _,
91                     );
92                 }
93                 self.filter = Some(boxed_filter);
94             }
95             _ => {
96                 unsafe { ffi::sqlite3session_table_filter(self.s, None, ptr::null_mut()) }
97                 self.filter = None;
98             }
99         };
100     }
101 
102     /// Attach a table. `None` means all tables.
attach(&mut self, table: Option<&str>) -> Result<()>103     pub fn attach(&mut self, table: Option<&str>) -> Result<()> {
104         let table = if let Some(table) = table {
105             Some(str_to_cstring(table)?)
106         } else {
107             None
108         };
109         let table = table.as_ref().map(|s| s.as_ptr()).unwrap_or(ptr::null());
110         unsafe { check!(ffi::sqlite3session_attach(self.s, table)) };
111         Ok(())
112     }
113 
114     /// Generate a Changeset
changeset(&mut self) -> Result<Changeset>115     pub fn changeset(&mut self) -> Result<Changeset> {
116         let mut n = 0;
117         let mut cs: *mut c_void = ptr::null_mut();
118         check!(unsafe { ffi::sqlite3session_changeset(self.s, &mut n, &mut cs) });
119         Ok(Changeset { cs, n })
120     }
121 
122     /// Write the set of changes represented by this session to `output`.
changeset_strm(&mut self, output: &mut dyn Write) -> Result<()>123     pub fn changeset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
124         let output_ref = &output;
125         check!(unsafe {
126             ffi::sqlite3session_changeset_strm(
127                 self.s,
128                 Some(x_output),
129                 output_ref as *const &mut dyn Write as *mut c_void,
130             )
131         });
132         Ok(())
133     }
134 
135     /// Generate a Patchset
patchset(&mut self) -> Result<Changeset>136     pub fn patchset(&mut self) -> Result<Changeset> {
137         let mut n = 0;
138         let mut ps: *mut c_void = ptr::null_mut();
139         check!(unsafe { ffi::sqlite3session_patchset(self.s, &mut n, &mut ps) });
140         // TODO Validate: same struct
141         Ok(Changeset { cs: ps, n })
142     }
143 
144     /// Write the set of patches represented by this session to `output`.
patchset_strm(&mut self, output: &mut dyn Write) -> Result<()>145     pub fn patchset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
146         let output_ref = &output;
147         check!(unsafe {
148             ffi::sqlite3session_patchset_strm(
149                 self.s,
150                 Some(x_output),
151                 output_ref as *const &mut dyn Write as *mut c_void,
152             )
153         });
154         Ok(())
155     }
156 
157     /// Load the difference between tables.
diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()>158     pub fn diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()> {
159         let from = from.to_cstring()?;
160         let table = str_to_cstring(table)?;
161         let table = table.as_ptr();
162         unsafe {
163             let mut errmsg = ptr::null_mut();
164             let r =
165                 ffi::sqlite3session_diff(self.s, from.as_ptr(), table, &mut errmsg as *mut *mut _);
166             if r != ffi::SQLITE_OK {
167                 let errmsg: *mut c_char = errmsg;
168                 let message = errmsg_to_string(&*errmsg);
169                 ffi::sqlite3_free(errmsg as *mut ::std::os::raw::c_void);
170                 return Err(error_from_sqlite_code(r, Some(message)));
171             }
172         }
173         Ok(())
174     }
175 
176     /// Test if a changeset has recorded any changes
is_empty(&self) -> bool177     pub fn is_empty(&self) -> bool {
178         unsafe { ffi::sqlite3session_isempty(self.s) != 0 }
179     }
180 
181     /// Query the current state of the session
is_enabled(&self) -> bool182     pub fn is_enabled(&self) -> bool {
183         unsafe { ffi::sqlite3session_enable(self.s, -1) != 0 }
184     }
185 
186     /// Enable or disable the recording of changes
set_enabled(&mut self, enabled: bool)187     pub fn set_enabled(&mut self, enabled: bool) {
188         unsafe {
189             ffi::sqlite3session_enable(self.s, if enabled { 1 } else { 0 });
190         }
191     }
192 
193     /// Query the current state of the indirect flag
is_indirect(&self) -> bool194     pub fn is_indirect(&self) -> bool {
195         unsafe { ffi::sqlite3session_indirect(self.s, -1) != 0 }
196     }
197 
198     /// Set or clear the indirect change flag
set_indirect(&mut self, indirect: bool)199     pub fn set_indirect(&mut self, indirect: bool) {
200         unsafe {
201             ffi::sqlite3session_indirect(self.s, if indirect { 1 } else { 0 });
202         }
203     }
204 }
205 
206 impl Drop for Session<'_> {
drop(&mut self)207     fn drop(&mut self) {
208         if self.filter.is_some() {
209             self.table_filter(None::<fn(&str) -> bool>);
210         }
211         unsafe { ffi::sqlite3session_delete(self.s) };
212     }
213 }
214 
215 /// `feature = "session"` Invert a changeset
invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()>216 pub fn invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()> {
217     let input_ref = &input;
218     let output_ref = &output;
219     check!(unsafe {
220         ffi::sqlite3changeset_invert_strm(
221             Some(x_input),
222             input_ref as *const &mut dyn Read as *mut c_void,
223             Some(x_output),
224             output_ref as *const &mut dyn Write as *mut c_void,
225         )
226     });
227     Ok(())
228 }
229 
230 /// `feature = "session"` Combine two changesets
concat_strm( input_a: &mut dyn Read, input_b: &mut dyn Read, output: &mut dyn Write, ) -> Result<()>231 pub fn concat_strm(
232     input_a: &mut dyn Read,
233     input_b: &mut dyn Read,
234     output: &mut dyn Write,
235 ) -> Result<()> {
236     let input_a_ref = &input_a;
237     let input_b_ref = &input_b;
238     let output_ref = &output;
239     check!(unsafe {
240         ffi::sqlite3changeset_concat_strm(
241             Some(x_input),
242             input_a_ref as *const &mut dyn Read as *mut c_void,
243             Some(x_input),
244             input_b_ref as *const &mut dyn Read as *mut c_void,
245             Some(x_output),
246             output_ref as *const &mut dyn Write as *mut c_void,
247         )
248     });
249     Ok(())
250 }
251 
252 /// `feature = "session"` Changeset or Patchset
253 pub struct Changeset {
254     cs: *mut c_void,
255     n: c_int,
256 }
257 
258 impl Changeset {
259     /// Invert a changeset
invert(&self) -> Result<Changeset>260     pub fn invert(&self) -> Result<Changeset> {
261         let mut n = 0;
262         let mut cs = ptr::null_mut();
263         check!(unsafe {
264             ffi::sqlite3changeset_invert(self.n, self.cs, &mut n, &mut cs as *mut *mut _)
265         });
266         Ok(Changeset { cs, n })
267     }
268 
269     /// Create an iterator to traverse a changeset
iter(&self) -> Result<ChangesetIter<'_>>270     pub fn iter(&self) -> Result<ChangesetIter<'_>> {
271         let mut it = ptr::null_mut();
272         check!(unsafe { ffi::sqlite3changeset_start(&mut it as *mut *mut _, self.n, self.cs) });
273         Ok(ChangesetIter {
274             phantom: PhantomData,
275             it,
276             item: None,
277         })
278     }
279 
280     /// Concatenate two changeset objects
concat(a: &Changeset, b: &Changeset) -> Result<Changeset>281     pub fn concat(a: &Changeset, b: &Changeset) -> Result<Changeset> {
282         let mut n = 0;
283         let mut cs = ptr::null_mut();
284         check!(unsafe {
285             ffi::sqlite3changeset_concat(a.n, a.cs, b.n, b.cs, &mut n, &mut cs as *mut *mut _)
286         });
287         Ok(Changeset { cs, n })
288     }
289 }
290 
291 impl Drop for Changeset {
drop(&mut self)292     fn drop(&mut self) {
293         unsafe {
294             ffi::sqlite3_free(self.cs);
295         }
296     }
297 }
298 
299 /// `feature = "session"` Cursor for iterating over the elements of a changeset
300 /// or patchset.
301 pub struct ChangesetIter<'changeset> {
302     phantom: PhantomData<&'changeset Changeset>,
303     it: *mut ffi::sqlite3_changeset_iter,
304     item: Option<ChangesetItem>,
305 }
306 
307 impl ChangesetIter<'_> {
308     /// Create an iterator on `input`
start_strm<'input>(input: &&'input mut dyn Read) -> Result<ChangesetIter<'input>>309     pub fn start_strm<'input>(input: &&'input mut dyn Read) -> Result<ChangesetIter<'input>> {
310         let mut it = ptr::null_mut();
311         check!(unsafe {
312             ffi::sqlite3changeset_start_strm(
313                 &mut it as *mut *mut _,
314                 Some(x_input),
315                 input as *const &mut dyn Read as *mut c_void,
316             )
317         });
318         Ok(ChangesetIter {
319             phantom: PhantomData,
320             it,
321             item: None,
322         })
323     }
324 }
325 
326 impl FallibleStreamingIterator for ChangesetIter<'_> {
327     type Error = crate::error::Error;
328     type Item = ChangesetItem;
329 
advance(&mut self) -> Result<()>330     fn advance(&mut self) -> Result<()> {
331         let rc = unsafe { ffi::sqlite3changeset_next(self.it) };
332         match rc {
333             ffi::SQLITE_ROW => {
334                 self.item = Some(ChangesetItem { it: self.it });
335                 Ok(())
336             }
337             ffi::SQLITE_DONE => {
338                 self.item = None;
339                 Ok(())
340             }
341             code => Err(error_from_sqlite_code(code, None)),
342         }
343     }
344 
get(&self) -> Option<&ChangesetItem>345     fn get(&self) -> Option<&ChangesetItem> {
346         self.item.as_ref()
347     }
348 }
349 
350 /// `feature = "session"`
351 pub struct Operation<'item> {
352     table_name: &'item str,
353     number_of_columns: i32,
354     code: Action,
355     indirect: bool,
356 }
357 
358 impl Operation<'_> {
359     /// Returns the table name.
table_name(&self) -> &str360     pub fn table_name(&self) -> &str {
361         self.table_name
362     }
363 
364     /// Returns the number of columns in table
number_of_columns(&self) -> i32365     pub fn number_of_columns(&self) -> i32 {
366         self.number_of_columns
367     }
368 
369     /// Returns the action code.
code(&self) -> Action370     pub fn code(&self) -> Action {
371         self.code
372     }
373 
374     /// Returns `true` for an 'indirect' change.
indirect(&self) -> bool375     pub fn indirect(&self) -> bool {
376         self.indirect
377     }
378 }
379 
380 impl Drop for ChangesetIter<'_> {
drop(&mut self)381     fn drop(&mut self) {
382         unsafe {
383             ffi::sqlite3changeset_finalize(self.it);
384         }
385     }
386 }
387 
388 /// `feature = "session"` An item passed to a conflict-handler by
389 /// `Connection::apply`, or an item generated by `ChangesetIter::next`.
390 // TODO enum ? Delete, Insert, Update, ...
391 pub struct ChangesetItem {
392     it: *mut ffi::sqlite3_changeset_iter,
393 }
394 
395 impl ChangesetItem {
396     /// Obtain conflicting row values
397     ///
398     /// May only be called with an `SQLITE_CHANGESET_DATA` or
399     /// `SQLITE_CHANGESET_CONFLICT` conflict handler callback.
conflict(&self, col: usize) -> Result<ValueRef<'_>>400     pub fn conflict(&self, col: usize) -> Result<ValueRef<'_>> {
401         unsafe {
402             let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
403             check!(ffi::sqlite3changeset_conflict(
404                 self.it,
405                 col as i32,
406                 &mut p_value,
407             ));
408             Ok(ValueRef::from_value(p_value))
409         }
410     }
411 
412     /// Determine the number of foreign key constraint violations
413     ///
414     /// May only be called with an `SQLITE_CHANGESET_FOREIGN_KEY` conflict
415     /// handler callback.
fk_conflicts(&self) -> Result<i32>416     pub fn fk_conflicts(&self) -> Result<i32> {
417         unsafe {
418             let mut p_out = 0;
419             check!(ffi::sqlite3changeset_fk_conflicts(self.it, &mut p_out));
420             Ok(p_out)
421         }
422     }
423 
424     /// Obtain new.* Values
425     ///
426     /// May only be called if the type of change is either `SQLITE_UPDATE` or
427     /// `SQLITE_INSERT`.
new_value(&self, col: usize) -> Result<ValueRef<'_>>428     pub fn new_value(&self, col: usize) -> Result<ValueRef<'_>> {
429         unsafe {
430             let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
431             check!(ffi::sqlite3changeset_new(self.it, col as i32, &mut p_value,));
432             Ok(ValueRef::from_value(p_value))
433         }
434     }
435 
436     /// Obtain old.* Values
437     ///
438     /// May only be called if the type of change is either `SQLITE_DELETE` or
439     /// `SQLITE_UPDATE`.
old_value(&self, col: usize) -> Result<ValueRef<'_>>440     pub fn old_value(&self, col: usize) -> Result<ValueRef<'_>> {
441         unsafe {
442             let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
443             check!(ffi::sqlite3changeset_old(self.it, col as i32, &mut p_value,));
444             Ok(ValueRef::from_value(p_value))
445         }
446     }
447 
448     /// Obtain the current operation
op(&self) -> Result<Operation<'_>>449     pub fn op(&self) -> Result<Operation<'_>> {
450         let mut number_of_columns = 0;
451         let mut code = 0;
452         let mut indirect = 0;
453         let tab = unsafe {
454             let mut pz_tab: *const c_char = ptr::null();
455             check!(ffi::sqlite3changeset_op(
456                 self.it,
457                 &mut pz_tab,
458                 &mut number_of_columns,
459                 &mut code,
460                 &mut indirect
461             ));
462             CStr::from_ptr(pz_tab)
463         };
464         let table_name = tab.to_str()?;
465         Ok(Operation {
466             table_name,
467             number_of_columns,
468             code: Action::from(code),
469             indirect: indirect != 0,
470         })
471     }
472 
473     /// Obtain the primary key definition of a table
pk(&self) -> Result<&[u8]>474     pub fn pk(&self) -> Result<&[u8]> {
475         let mut number_of_columns = 0;
476         unsafe {
477             let mut pks: *mut c_uchar = ptr::null_mut();
478             check!(ffi::sqlite3changeset_pk(
479                 self.it,
480                 &mut pks,
481                 &mut number_of_columns
482             ));
483             Ok(from_raw_parts(pks, number_of_columns as usize))
484         }
485     }
486 }
487 
488 /// `feature = "session"` Used to combine two or more changesets or
489 /// patchsets
490 pub struct Changegroup {
491     cg: *mut ffi::sqlite3_changegroup,
492 }
493 
494 impl Changegroup {
495     /// Create a new change group.
new() -> Result<Self>496     pub fn new() -> Result<Self> {
497         let mut cg = ptr::null_mut();
498         check!(unsafe { ffi::sqlite3changegroup_new(&mut cg) });
499         Ok(Changegroup { cg })
500     }
501 
502     /// Add a changeset
add(&mut self, cs: &Changeset) -> Result<()>503     pub fn add(&mut self, cs: &Changeset) -> Result<()> {
504         check!(unsafe { ffi::sqlite3changegroup_add(self.cg, cs.n, cs.cs) });
505         Ok(())
506     }
507 
508     /// Add a changeset read from `input` to this change group.
add_stream(&mut self, input: &mut dyn Read) -> Result<()>509     pub fn add_stream(&mut self, input: &mut dyn Read) -> Result<()> {
510         let input_ref = &input;
511         check!(unsafe {
512             ffi::sqlite3changegroup_add_strm(
513                 self.cg,
514                 Some(x_input),
515                 input_ref as *const &mut dyn Read as *mut c_void,
516             )
517         });
518         Ok(())
519     }
520 
521     /// Obtain a composite Changeset
output(&mut self) -> Result<Changeset>522     pub fn output(&mut self) -> Result<Changeset> {
523         let mut n = 0;
524         let mut output: *mut c_void = ptr::null_mut();
525         check!(unsafe { ffi::sqlite3changegroup_output(self.cg, &mut n, &mut output) });
526         Ok(Changeset { cs: output, n })
527     }
528 
529     /// Write the combined set of changes to `output`.
output_strm(&mut self, output: &mut dyn Write) -> Result<()>530     pub fn output_strm(&mut self, output: &mut dyn Write) -> Result<()> {
531         let output_ref = &output;
532         check!(unsafe {
533             ffi::sqlite3changegroup_output_strm(
534                 self.cg,
535                 Some(x_output),
536                 output_ref as *const &mut dyn Write as *mut c_void,
537             )
538         });
539         Ok(())
540     }
541 }
542 
543 impl Drop for Changegroup {
drop(&mut self)544     fn drop(&mut self) {
545         unsafe {
546             ffi::sqlite3changegroup_delete(self.cg);
547         }
548     }
549 }
550 
551 impl Connection {
552     /// `feature = "session"` Apply a changeset to a database
apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()> where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,553     pub fn apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()>
554     where
555         F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
556         C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
557     {
558         let db = self.db.borrow_mut().db;
559 
560         let filtered = filter.is_some();
561         let tuple = &mut (filter, conflict);
562         check!(unsafe {
563             if filtered {
564                 ffi::sqlite3changeset_apply(
565                     db,
566                     cs.n,
567                     cs.cs,
568                     Some(call_filter::<F, C>),
569                     Some(call_conflict::<F, C>),
570                     tuple as *mut (Option<F>, C) as *mut c_void,
571                 )
572             } else {
573                 ffi::sqlite3changeset_apply(
574                     db,
575                     cs.n,
576                     cs.cs,
577                     None,
578                     Some(call_conflict::<F, C>),
579                     tuple as *mut (Option<F>, C) as *mut c_void,
580                 )
581             }
582         });
583         Ok(())
584     }
585 
586     /// `feature = "session"` Apply a changeset to a database
apply_strm<F, C>( &self, input: &mut dyn Read, filter: Option<F>, conflict: C, ) -> Result<()> where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,587     pub fn apply_strm<F, C>(
588         &self,
589         input: &mut dyn Read,
590         filter: Option<F>,
591         conflict: C,
592     ) -> Result<()>
593     where
594         F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
595         C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
596     {
597         let input_ref = &input;
598         let db = self.db.borrow_mut().db;
599 
600         let filtered = filter.is_some();
601         let tuple = &mut (filter, conflict);
602         check!(unsafe {
603             if filtered {
604                 ffi::sqlite3changeset_apply_strm(
605                     db,
606                     Some(x_input),
607                     input_ref as *const &mut dyn Read as *mut c_void,
608                     Some(call_filter::<F, C>),
609                     Some(call_conflict::<F, C>),
610                     tuple as *mut (Option<F>, C) as *mut c_void,
611                 )
612             } else {
613                 ffi::sqlite3changeset_apply_strm(
614                     db,
615                     Some(x_input),
616                     input_ref as *const &mut dyn Read as *mut c_void,
617                     None,
618                     Some(call_conflict::<F, C>),
619                     tuple as *mut (Option<F>, C) as *mut c_void,
620                 )
621             }
622         });
623         Ok(())
624     }
625 }
626 
627 /// `feature = "session"` Constants passed to the conflict handler
628 /// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_CONFLICT) for details.
629 #[allow(missing_docs)]
630 #[repr(i32)]
631 #[derive(Debug, PartialEq)]
632 #[non_exhaustive]
633 pub enum ConflictType {
634     UNKNOWN = -1,
635     SQLITE_CHANGESET_DATA = ffi::SQLITE_CHANGESET_DATA,
636     SQLITE_CHANGESET_NOTFOUND = ffi::SQLITE_CHANGESET_NOTFOUND,
637     SQLITE_CHANGESET_CONFLICT = ffi::SQLITE_CHANGESET_CONFLICT,
638     SQLITE_CHANGESET_CONSTRAINT = ffi::SQLITE_CHANGESET_CONSTRAINT,
639     SQLITE_CHANGESET_FOREIGN_KEY = ffi::SQLITE_CHANGESET_FOREIGN_KEY,
640 }
641 impl From<i32> for ConflictType {
from(code: i32) -> ConflictType642     fn from(code: i32) -> ConflictType {
643         match code {
644             ffi::SQLITE_CHANGESET_DATA => ConflictType::SQLITE_CHANGESET_DATA,
645             ffi::SQLITE_CHANGESET_NOTFOUND => ConflictType::SQLITE_CHANGESET_NOTFOUND,
646             ffi::SQLITE_CHANGESET_CONFLICT => ConflictType::SQLITE_CHANGESET_CONFLICT,
647             ffi::SQLITE_CHANGESET_CONSTRAINT => ConflictType::SQLITE_CHANGESET_CONSTRAINT,
648             ffi::SQLITE_CHANGESET_FOREIGN_KEY => ConflictType::SQLITE_CHANGESET_FOREIGN_KEY,
649             _ => ConflictType::UNKNOWN,
650         }
651     }
652 }
653 
654 /// `feature = "session"` Constants returned by the conflict handler
655 /// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_ABORT) for details.
656 #[allow(missing_docs)]
657 #[repr(i32)]
658 #[derive(Debug, PartialEq)]
659 #[non_exhaustive]
660 pub enum ConflictAction {
661     SQLITE_CHANGESET_OMIT = ffi::SQLITE_CHANGESET_OMIT,
662     SQLITE_CHANGESET_REPLACE = ffi::SQLITE_CHANGESET_REPLACE,
663     SQLITE_CHANGESET_ABORT = ffi::SQLITE_CHANGESET_ABORT,
664 }
665 
call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,666 unsafe extern "C" fn call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int
667 where
668     F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
669     C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
670 {
671     use std::str;
672 
673     let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
674     let tbl_name = {
675         let c_slice = CStr::from_ptr(tbl_str).to_bytes();
676         str::from_utf8(c_slice)
677     };
678     match *tuple {
679         (Some(ref filter), _) => {
680             if let Ok(true) = catch_unwind(|| filter(tbl_name.expect("illegal table name"))) {
681                 1
682             } else {
683                 0
684             }
685         }
686         _ => unimplemented!(),
687     }
688 }
689 
call_conflict<F, C>( p_ctx: *mut c_void, e_conflict: c_int, p: *mut ffi::sqlite3_changeset_iter, ) -> c_int where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,690 unsafe extern "C" fn call_conflict<F, C>(
691     p_ctx: *mut c_void,
692     e_conflict: c_int,
693     p: *mut ffi::sqlite3_changeset_iter,
694 ) -> c_int
695 where
696     F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
697     C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
698 {
699     let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
700     let conflict_type = ConflictType::from(e_conflict);
701     let item = ChangesetItem { it: p };
702     if let Ok(action) = catch_unwind(|| (*tuple).1(conflict_type, item)) {
703         action as c_int
704     } else {
705         ffi::SQLITE_CHANGESET_ABORT
706     }
707 }
708 
x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int709 unsafe extern "C" fn x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int {
710     if p_in.is_null() {
711         return ffi::SQLITE_MISUSE;
712     }
713     let bytes: &mut [u8] = from_raw_parts_mut(data as *mut u8, *len as usize);
714     let input = p_in as *mut &mut dyn Read;
715     match (*input).read(bytes) {
716         Ok(n) => {
717             *len = n as i32; // TODO Validate: n = 0 may not mean the reader will always no longer be able to
718                              // produce bytes.
719             ffi::SQLITE_OK
720         }
721         Err(_) => ffi::SQLITE_IOERR_READ, // TODO check if err is a (ru)sqlite Error => propagate
722     }
723 }
724 
x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int725 unsafe extern "C" fn x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int {
726     if p_out.is_null() {
727         return ffi::SQLITE_MISUSE;
728     }
729     // The sessions module never invokes an xOutput callback with the third
730     // parameter set to a value less than or equal to zero.
731     let bytes: &[u8] = from_raw_parts(data as *const u8, len as usize);
732     let output = p_out as *mut &mut dyn Write;
733     match (*output).write_all(bytes) {
734         Ok(_) => ffi::SQLITE_OK,
735         Err(_) => ffi::SQLITE_IOERR_WRITE, // TODO check if err is a (ru)sqlite Error => propagate
736     }
737 }
738 
739 #[cfg(test)]
740 mod test {
741     use fallible_streaming_iterator::FallibleStreamingIterator;
742     use std::io::Read;
743     use std::sync::atomic::{AtomicBool, Ordering};
744 
745     use super::{Changeset, ChangesetIter, ConflictAction, ConflictType, Session};
746     use crate::hooks::Action;
747     use crate::Connection;
748 
one_changeset() -> Changeset749     fn one_changeset() -> Changeset {
750         let db = Connection::open_in_memory().unwrap();
751         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
752             .unwrap();
753 
754         let mut session = Session::new(&db).unwrap();
755         assert!(session.is_empty());
756 
757         session.attach(None).unwrap();
758         db.execute("INSERT INTO foo (t) VALUES (?);", &["bar"])
759             .unwrap();
760 
761         session.changeset().unwrap()
762     }
763 
one_changeset_strm() -> Vec<u8>764     fn one_changeset_strm() -> Vec<u8> {
765         let db = Connection::open_in_memory().unwrap();
766         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
767             .unwrap();
768 
769         let mut session = Session::new(&db).unwrap();
770         assert!(session.is_empty());
771 
772         session.attach(None).unwrap();
773         db.execute("INSERT INTO foo (t) VALUES (?);", &["bar"])
774             .unwrap();
775 
776         let mut output = Vec::new();
777         session.changeset_strm(&mut output).unwrap();
778         output
779     }
780 
781     #[test]
test_changeset()782     fn test_changeset() {
783         let changeset = one_changeset();
784         let mut iter = changeset.iter().unwrap();
785         let item = iter.next().unwrap();
786         assert!(item.is_some());
787 
788         let item = item.unwrap();
789         let op = item.op().unwrap();
790         assert_eq!("foo", op.table_name());
791         assert_eq!(1, op.number_of_columns());
792         assert_eq!(Action::SQLITE_INSERT, op.code());
793         assert_eq!(false, op.indirect());
794 
795         let pk = item.pk().unwrap();
796         assert_eq!(&[1], pk);
797 
798         let new_value = item.new_value(0).unwrap();
799         assert_eq!(Ok("bar"), new_value.as_str());
800     }
801 
802     #[test]
test_changeset_strm()803     fn test_changeset_strm() {
804         let output = one_changeset_strm();
805         assert!(!output.is_empty());
806         assert_eq!(14, output.len());
807 
808         let input: &mut dyn Read = &mut output.as_slice();
809         let mut iter = ChangesetIter::start_strm(&input).unwrap();
810         let item = iter.next().unwrap();
811         assert!(item.is_some());
812     }
813 
814     #[test]
test_changeset_apply()815     fn test_changeset_apply() {
816         let changeset = one_changeset();
817 
818         let db = Connection::open_in_memory().unwrap();
819         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
820             .unwrap();
821 
822         lazy_static::lazy_static! {
823             static ref CALLED: AtomicBool = AtomicBool::new(false);
824         }
825         db.apply(
826             &changeset,
827             None::<fn(&str) -> bool>,
828             |_conflict_type, _item| {
829                 CALLED.store(true, Ordering::Relaxed);
830                 ConflictAction::SQLITE_CHANGESET_OMIT
831             },
832         )
833         .unwrap();
834 
835         assert!(!CALLED.load(Ordering::Relaxed));
836         let check = db
837             .query_row("SELECT 1 FROM foo WHERE t = ?", &["bar"], |row| {
838                 row.get::<_, i32>(0)
839             })
840             .unwrap();
841         assert_eq!(1, check);
842 
843         // conflict expected when same changeset applied again on the same db
844         db.apply(
845             &changeset,
846             None::<fn(&str) -> bool>,
847             |conflict_type, item| {
848                 CALLED.store(true, Ordering::Relaxed);
849                 assert_eq!(ConflictType::SQLITE_CHANGESET_CONFLICT, conflict_type);
850                 let conflict = item.conflict(0).unwrap();
851                 assert_eq!(Ok("bar"), conflict.as_str());
852                 ConflictAction::SQLITE_CHANGESET_OMIT
853             },
854         )
855         .unwrap();
856         assert!(CALLED.load(Ordering::Relaxed));
857     }
858 
859     #[test]
test_changeset_apply_strm()860     fn test_changeset_apply_strm() {
861         let output = one_changeset_strm();
862 
863         let db = Connection::open_in_memory().unwrap();
864         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
865             .unwrap();
866 
867         let mut input = output.as_slice();
868         db.apply_strm(
869             &mut input,
870             None::<fn(&str) -> bool>,
871             |_conflict_type, _item| ConflictAction::SQLITE_CHANGESET_OMIT,
872         )
873         .unwrap();
874 
875         let check = db
876             .query_row("SELECT 1 FROM foo WHERE t = ?", &["bar"], |row| {
877                 row.get::<_, i32>(0)
878             })
879             .unwrap();
880         assert_eq!(1, check);
881     }
882 
883     #[test]
test_session_empty()884     fn test_session_empty() {
885         let db = Connection::open_in_memory().unwrap();
886         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
887             .unwrap();
888 
889         let mut session = Session::new(&db).unwrap();
890         assert!(session.is_empty());
891 
892         session.attach(None).unwrap();
893         db.execute("INSERT INTO foo (t) VALUES (?);", &["bar"])
894             .unwrap();
895 
896         assert!(!session.is_empty());
897     }
898 
899     #[test]
test_session_set_enabled()900     fn test_session_set_enabled() {
901         let db = Connection::open_in_memory().unwrap();
902 
903         let mut session = Session::new(&db).unwrap();
904         assert!(session.is_enabled());
905         session.set_enabled(false);
906         assert!(!session.is_enabled());
907     }
908 
909     #[test]
test_session_set_indirect()910     fn test_session_set_indirect() {
911         let db = Connection::open_in_memory().unwrap();
912 
913         let mut session = Session::new(&db).unwrap();
914         assert!(!session.is_indirect());
915         session.set_indirect(true);
916         assert!(session.is_indirect());
917     }
918 }
919