• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! [Session Extension](https://sqlite.org/sessionintro.html)
2 #![allow(non_camel_case_types)]
3 
4 use std::ffi::CStr;
5 use std::io::{Read, Write};
6 use std::marker::PhantomData;
7 use std::os::raw::{c_char, c_int, c_uchar, c_void};
8 use std::panic::{catch_unwind, RefUnwindSafe};
9 use std::ptr;
10 use std::slice::{from_raw_parts, from_raw_parts_mut};
11 
12 use fallible_streaming_iterator::FallibleStreamingIterator;
13 
14 use crate::error::{check, error_from_sqlite_code};
15 use crate::ffi;
16 use crate::hooks::Action;
17 use crate::types::ValueRef;
18 use crate::{errmsg_to_string, str_to_cstring, Connection, DatabaseName, Result};
19 
20 // https://sqlite.org/session.html
21 
22 /// An instance of this object is a session that can be
23 /// used to record changes to a database.
24 pub struct Session<'conn> {
25     phantom: PhantomData<&'conn Connection>,
26     s: *mut ffi::sqlite3_session,
27     filter: Option<Box<dyn Fn(&str) -> bool>>,
28 }
29 
30 impl Session<'_> {
31     /// Create a new session object
32     #[inline]
new(db: &Connection) -> Result<Session<'_>>33     pub fn new(db: &Connection) -> Result<Session<'_>> {
34         Session::new_with_name(db, DatabaseName::Main)
35     }
36 
37     /// Create a new session object
38     #[inline]
new_with_name<'conn>( db: &'conn Connection, name: DatabaseName<'_>, ) -> Result<Session<'conn>>39     pub fn new_with_name<'conn>(
40         db: &'conn Connection,
41         name: DatabaseName<'_>,
42     ) -> Result<Session<'conn>> {
43         let name = name.as_cstring()?;
44 
45         let db = db.db.borrow_mut().db;
46 
47         let mut s: *mut ffi::sqlite3_session = ptr::null_mut();
48         check(unsafe { ffi::sqlite3session_create(db, name.as_ptr(), &mut s) })?;
49 
50         Ok(Session {
51             phantom: PhantomData,
52             s,
53             filter: None,
54         })
55     }
56 
57     /// Set a table filter
table_filter<F>(&mut self, filter: Option<F>) where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,58     pub fn table_filter<F>(&mut self, filter: Option<F>)
59     where
60         F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
61     {
62         unsafe extern "C" fn call_boxed_closure<F>(
63             p_arg: *mut c_void,
64             tbl_str: *const c_char,
65         ) -> c_int
66         where
67             F: Fn(&str) -> bool + RefUnwindSafe,
68         {
69             use std::str;
70 
71             let boxed_filter: *mut F = p_arg as *mut F;
72             let tbl_name = {
73                 let c_slice = CStr::from_ptr(tbl_str).to_bytes();
74                 str::from_utf8(c_slice)
75             };
76             if let Ok(true) =
77                 catch_unwind(|| (*boxed_filter)(tbl_name.expect("non-utf8 table name")))
78             {
79                 1
80             } else {
81                 0
82             }
83         }
84 
85         match filter {
86             Some(filter) => {
87                 let boxed_filter = Box::new(filter);
88                 unsafe {
89                     ffi::sqlite3session_table_filter(
90                         self.s,
91                         Some(call_boxed_closure::<F>),
92                         &*boxed_filter as *const F as *mut _,
93                     );
94                 }
95                 self.filter = Some(boxed_filter);
96             }
97             _ => {
98                 unsafe { ffi::sqlite3session_table_filter(self.s, None, ptr::null_mut()) }
99                 self.filter = None;
100             }
101         };
102     }
103 
104     /// Attach a table. `None` means all tables.
attach(&mut self, table: Option<&str>) -> Result<()>105     pub fn attach(&mut self, table: Option<&str>) -> Result<()> {
106         let table = if let Some(table) = table {
107             Some(str_to_cstring(table)?)
108         } else {
109             None
110         };
111         let table = table.as_ref().map(|s| s.as_ptr()).unwrap_or(ptr::null());
112         check(unsafe { ffi::sqlite3session_attach(self.s, table) })
113     }
114 
115     /// Generate a Changeset
changeset(&mut self) -> Result<Changeset>116     pub fn changeset(&mut self) -> Result<Changeset> {
117         let mut n = 0;
118         let mut cs: *mut c_void = ptr::null_mut();
119         check(unsafe { ffi::sqlite3session_changeset(self.s, &mut n, &mut cs) })?;
120         Ok(Changeset { cs, n })
121     }
122 
123     /// Write the set of changes represented by this session to `output`.
124     #[inline]
changeset_strm(&mut self, output: &mut dyn Write) -> Result<()>125     pub fn changeset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
126         let output_ref = &output;
127         check(unsafe {
128             ffi::sqlite3session_changeset_strm(
129                 self.s,
130                 Some(x_output),
131                 output_ref as *const &mut dyn Write as *mut c_void,
132             )
133         })
134     }
135 
136     /// Generate a Patchset
137     #[inline]
patchset(&mut self) -> Result<Changeset>138     pub fn patchset(&mut self) -> Result<Changeset> {
139         let mut n = 0;
140         let mut ps: *mut c_void = ptr::null_mut();
141         check(unsafe { ffi::sqlite3session_patchset(self.s, &mut n, &mut ps) })?;
142         // TODO Validate: same struct
143         Ok(Changeset { cs: ps, n })
144     }
145 
146     /// Write the set of patches represented by this session to `output`.
147     #[inline]
patchset_strm(&mut self, output: &mut dyn Write) -> Result<()>148     pub fn patchset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
149         let output_ref = &output;
150         check(unsafe {
151             ffi::sqlite3session_patchset_strm(
152                 self.s,
153                 Some(x_output),
154                 output_ref as *const &mut dyn Write as *mut c_void,
155             )
156         })
157     }
158 
159     /// Load the difference between tables.
diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()>160     pub fn diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()> {
161         let from = from.as_cstring()?;
162         let table = str_to_cstring(table)?;
163         let table = table.as_ptr();
164         unsafe {
165             let mut errmsg = ptr::null_mut();
166             let r =
167                 ffi::sqlite3session_diff(self.s, from.as_ptr(), table, &mut errmsg as *mut *mut _);
168             if r != ffi::SQLITE_OK {
169                 let errmsg: *mut c_char = errmsg;
170                 let message = errmsg_to_string(&*errmsg);
171                 ffi::sqlite3_free(errmsg as *mut c_void);
172                 return Err(error_from_sqlite_code(r, Some(message)));
173             }
174         }
175         Ok(())
176     }
177 
178     /// Test if a changeset has recorded any changes
179     #[inline]
is_empty(&self) -> bool180     pub fn is_empty(&self) -> bool {
181         unsafe { ffi::sqlite3session_isempty(self.s) != 0 }
182     }
183 
184     /// Query the current state of the session
185     #[inline]
is_enabled(&self) -> bool186     pub fn is_enabled(&self) -> bool {
187         unsafe { ffi::sqlite3session_enable(self.s, -1) != 0 }
188     }
189 
190     /// Enable or disable the recording of changes
191     #[inline]
set_enabled(&mut self, enabled: bool)192     pub fn set_enabled(&mut self, enabled: bool) {
193         unsafe {
194             ffi::sqlite3session_enable(self.s, if enabled { 1 } else { 0 });
195         }
196     }
197 
198     /// Query the current state of the indirect flag
199     #[inline]
is_indirect(&self) -> bool200     pub fn is_indirect(&self) -> bool {
201         unsafe { ffi::sqlite3session_indirect(self.s, -1) != 0 }
202     }
203 
204     /// Set or clear the indirect change flag
205     #[inline]
set_indirect(&mut self, indirect: bool)206     pub fn set_indirect(&mut self, indirect: bool) {
207         unsafe {
208             ffi::sqlite3session_indirect(self.s, if indirect { 1 } else { 0 });
209         }
210     }
211 }
212 
213 impl Drop for Session<'_> {
214     #[inline]
drop(&mut self)215     fn drop(&mut self) {
216         if self.filter.is_some() {
217             self.table_filter(None::<fn(&str) -> bool>);
218         }
219         unsafe { ffi::sqlite3session_delete(self.s) };
220     }
221 }
222 
223 /// Invert a changeset
224 #[inline]
invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()>225 pub fn invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()> {
226     let input_ref = &input;
227     let output_ref = &output;
228     check(unsafe {
229         ffi::sqlite3changeset_invert_strm(
230             Some(x_input),
231             input_ref as *const &mut dyn Read as *mut c_void,
232             Some(x_output),
233             output_ref as *const &mut dyn Write as *mut c_void,
234         )
235     })
236 }
237 
238 /// Combine two changesets
239 #[inline]
concat_strm( input_a: &mut dyn Read, input_b: &mut dyn Read, output: &mut dyn Write, ) -> Result<()>240 pub fn concat_strm(
241     input_a: &mut dyn Read,
242     input_b: &mut dyn Read,
243     output: &mut dyn Write,
244 ) -> Result<()> {
245     let input_a_ref = &input_a;
246     let input_b_ref = &input_b;
247     let output_ref = &output;
248     check(unsafe {
249         ffi::sqlite3changeset_concat_strm(
250             Some(x_input),
251             input_a_ref as *const &mut dyn Read as *mut c_void,
252             Some(x_input),
253             input_b_ref as *const &mut dyn Read as *mut c_void,
254             Some(x_output),
255             output_ref as *const &mut dyn Write as *mut c_void,
256         )
257     })
258 }
259 
260 /// Changeset or Patchset
261 pub struct Changeset {
262     cs: *mut c_void,
263     n: c_int,
264 }
265 
266 impl Changeset {
267     /// Invert a changeset
268     #[inline]
invert(&self) -> Result<Changeset>269     pub fn invert(&self) -> Result<Changeset> {
270         let mut n = 0;
271         let mut cs = ptr::null_mut();
272         check(unsafe {
273             ffi::sqlite3changeset_invert(self.n, self.cs, &mut n, &mut cs as *mut *mut _)
274         })?;
275         Ok(Changeset { cs, n })
276     }
277 
278     /// Create an iterator to traverse a changeset
279     #[inline]
iter(&self) -> Result<ChangesetIter<'_>>280     pub fn iter(&self) -> Result<ChangesetIter<'_>> {
281         let mut it = ptr::null_mut();
282         check(unsafe { ffi::sqlite3changeset_start(&mut it as *mut *mut _, self.n, self.cs) })?;
283         Ok(ChangesetIter {
284             phantom: PhantomData,
285             it,
286             item: None,
287         })
288     }
289 
290     /// Concatenate two changeset objects
291     #[inline]
concat(a: &Changeset, b: &Changeset) -> Result<Changeset>292     pub fn concat(a: &Changeset, b: &Changeset) -> Result<Changeset> {
293         let mut n = 0;
294         let mut cs = ptr::null_mut();
295         check(unsafe {
296             ffi::sqlite3changeset_concat(a.n, a.cs, b.n, b.cs, &mut n, &mut cs as *mut *mut _)
297         })?;
298         Ok(Changeset { cs, n })
299     }
300 }
301 
302 impl Drop for Changeset {
303     #[inline]
drop(&mut self)304     fn drop(&mut self) {
305         unsafe {
306             ffi::sqlite3_free(self.cs);
307         }
308     }
309 }
310 
311 /// Cursor for iterating over the elements of a changeset
312 /// or patchset.
313 pub struct ChangesetIter<'changeset> {
314     phantom: PhantomData<&'changeset Changeset>,
315     it: *mut ffi::sqlite3_changeset_iter,
316     item: Option<ChangesetItem>,
317 }
318 
319 impl ChangesetIter<'_> {
320     /// Create an iterator on `input`
321     #[inline]
start_strm<'input>(input: &&'input mut dyn Read) -> Result<ChangesetIter<'input>>322     pub fn start_strm<'input>(input: &&'input mut dyn Read) -> Result<ChangesetIter<'input>> {
323         let mut it = ptr::null_mut();
324         check(unsafe {
325             ffi::sqlite3changeset_start_strm(
326                 &mut it as *mut *mut _,
327                 Some(x_input),
328                 input as *const &mut dyn Read as *mut c_void,
329             )
330         })?;
331         Ok(ChangesetIter {
332             phantom: PhantomData,
333             it,
334             item: None,
335         })
336     }
337 }
338 
339 impl FallibleStreamingIterator for ChangesetIter<'_> {
340     type Error = crate::error::Error;
341     type Item = ChangesetItem;
342 
343     #[inline]
advance(&mut self) -> Result<()>344     fn advance(&mut self) -> Result<()> {
345         let rc = unsafe { ffi::sqlite3changeset_next(self.it) };
346         match rc {
347             ffi::SQLITE_ROW => {
348                 self.item = Some(ChangesetItem { it: self.it });
349                 Ok(())
350             }
351             ffi::SQLITE_DONE => {
352                 self.item = None;
353                 Ok(())
354             }
355             code => Err(error_from_sqlite_code(code, None)),
356         }
357     }
358 
359     #[inline]
get(&self) -> Option<&ChangesetItem>360     fn get(&self) -> Option<&ChangesetItem> {
361         self.item.as_ref()
362     }
363 }
364 
365 /// Operation
366 pub struct Operation<'item> {
367     table_name: &'item str,
368     number_of_columns: i32,
369     code: Action,
370     indirect: bool,
371 }
372 
373 impl Operation<'_> {
374     /// Returns the table name.
375     #[inline]
table_name(&self) -> &str376     pub fn table_name(&self) -> &str {
377         self.table_name
378     }
379 
380     /// Returns the number of columns in table
381     #[inline]
number_of_columns(&self) -> i32382     pub fn number_of_columns(&self) -> i32 {
383         self.number_of_columns
384     }
385 
386     /// Returns the action code.
387     #[inline]
code(&self) -> Action388     pub fn code(&self) -> Action {
389         self.code
390     }
391 
392     /// Returns `true` for an 'indirect' change.
393     #[inline]
indirect(&self) -> bool394     pub fn indirect(&self) -> bool {
395         self.indirect
396     }
397 }
398 
399 impl Drop for ChangesetIter<'_> {
400     #[inline]
drop(&mut self)401     fn drop(&mut self) {
402         unsafe {
403             ffi::sqlite3changeset_finalize(self.it);
404         }
405     }
406 }
407 
408 /// An item passed to a conflict-handler by
409 /// [`Connection::apply`](crate::Connection::apply), or an item generated by
410 /// [`ChangesetIter::next`](ChangesetIter::next).
411 // TODO enum ? Delete, Insert, Update, ...
412 pub struct ChangesetItem {
413     it: *mut ffi::sqlite3_changeset_iter,
414 }
415 
416 impl ChangesetItem {
417     /// Obtain conflicting row values
418     ///
419     /// May only be called with an `SQLITE_CHANGESET_DATA` or
420     /// `SQLITE_CHANGESET_CONFLICT` conflict handler callback.
421     #[inline]
conflict(&self, col: usize) -> Result<ValueRef<'_>>422     pub fn conflict(&self, col: usize) -> Result<ValueRef<'_>> {
423         unsafe {
424             let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
425             check(ffi::sqlite3changeset_conflict(
426                 self.it,
427                 col as i32,
428                 &mut p_value,
429             ))?;
430             Ok(ValueRef::from_value(p_value))
431         }
432     }
433 
434     /// Determine the number of foreign key constraint violations
435     ///
436     /// May only be called with an `SQLITE_CHANGESET_FOREIGN_KEY` conflict
437     /// handler callback.
438     #[inline]
fk_conflicts(&self) -> Result<i32>439     pub fn fk_conflicts(&self) -> Result<i32> {
440         unsafe {
441             let mut p_out = 0;
442             check(ffi::sqlite3changeset_fk_conflicts(self.it, &mut p_out))?;
443             Ok(p_out)
444         }
445     }
446 
447     /// Obtain new.* Values
448     ///
449     /// May only be called if the type of change is either `SQLITE_UPDATE` or
450     /// `SQLITE_INSERT`.
451     #[inline]
new_value(&self, col: usize) -> Result<ValueRef<'_>>452     pub fn new_value(&self, col: usize) -> Result<ValueRef<'_>> {
453         unsafe {
454             let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
455             check(ffi::sqlite3changeset_new(self.it, col as i32, &mut p_value))?;
456             Ok(ValueRef::from_value(p_value))
457         }
458     }
459 
460     /// Obtain old.* Values
461     ///
462     /// May only be called if the type of change is either `SQLITE_DELETE` or
463     /// `SQLITE_UPDATE`.
464     #[inline]
old_value(&self, col: usize) -> Result<ValueRef<'_>>465     pub fn old_value(&self, col: usize) -> Result<ValueRef<'_>> {
466         unsafe {
467             let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
468             check(ffi::sqlite3changeset_old(self.it, col as i32, &mut p_value))?;
469             Ok(ValueRef::from_value(p_value))
470         }
471     }
472 
473     /// Obtain the current operation
474     #[inline]
op(&self) -> Result<Operation<'_>>475     pub fn op(&self) -> Result<Operation<'_>> {
476         let mut number_of_columns = 0;
477         let mut code = 0;
478         let mut indirect = 0;
479         let tab = unsafe {
480             let mut pz_tab: *const c_char = ptr::null();
481             check(ffi::sqlite3changeset_op(
482                 self.it,
483                 &mut pz_tab,
484                 &mut number_of_columns,
485                 &mut code,
486                 &mut indirect,
487             ))?;
488             CStr::from_ptr(pz_tab)
489         };
490         let table_name = tab.to_str()?;
491         Ok(Operation {
492             table_name,
493             number_of_columns,
494             code: Action::from(code),
495             indirect: indirect != 0,
496         })
497     }
498 
499     /// Obtain the primary key definition of a table
500     #[inline]
pk(&self) -> Result<&[u8]>501     pub fn pk(&self) -> Result<&[u8]> {
502         let mut number_of_columns = 0;
503         unsafe {
504             let mut pks: *mut c_uchar = ptr::null_mut();
505             check(ffi::sqlite3changeset_pk(
506                 self.it,
507                 &mut pks,
508                 &mut number_of_columns,
509             ))?;
510             Ok(from_raw_parts(pks, number_of_columns as usize))
511         }
512     }
513 }
514 
515 /// Used to combine two or more changesets or
516 /// patchsets
517 pub struct Changegroup {
518     cg: *mut ffi::sqlite3_changegroup,
519 }
520 
521 impl Changegroup {
522     /// Create a new change group.
523     #[inline]
new() -> Result<Self>524     pub fn new() -> Result<Self> {
525         let mut cg = ptr::null_mut();
526         check(unsafe { ffi::sqlite3changegroup_new(&mut cg) })?;
527         Ok(Changegroup { cg })
528     }
529 
530     /// Add a changeset
531     #[inline]
add(&mut self, cs: &Changeset) -> Result<()>532     pub fn add(&mut self, cs: &Changeset) -> Result<()> {
533         check(unsafe { ffi::sqlite3changegroup_add(self.cg, cs.n, cs.cs) })
534     }
535 
536     /// Add a changeset read from `input` to this change group.
537     #[inline]
add_stream(&mut self, input: &mut dyn Read) -> Result<()>538     pub fn add_stream(&mut self, input: &mut dyn Read) -> Result<()> {
539         let input_ref = &input;
540         check(unsafe {
541             ffi::sqlite3changegroup_add_strm(
542                 self.cg,
543                 Some(x_input),
544                 input_ref as *const &mut dyn Read as *mut c_void,
545             )
546         })
547     }
548 
549     /// Obtain a composite Changeset
550     #[inline]
output(&mut self) -> Result<Changeset>551     pub fn output(&mut self) -> Result<Changeset> {
552         let mut n = 0;
553         let mut output: *mut c_void = ptr::null_mut();
554         check(unsafe { ffi::sqlite3changegroup_output(self.cg, &mut n, &mut output) })?;
555         Ok(Changeset { cs: output, n })
556     }
557 
558     /// Write the combined set of changes to `output`.
559     #[inline]
output_strm(&mut self, output: &mut dyn Write) -> Result<()>560     pub fn output_strm(&mut self, output: &mut dyn Write) -> Result<()> {
561         let output_ref = &output;
562         check(unsafe {
563             ffi::sqlite3changegroup_output_strm(
564                 self.cg,
565                 Some(x_output),
566                 output_ref as *const &mut dyn Write as *mut c_void,
567             )
568         })
569     }
570 }
571 
572 impl Drop for Changegroup {
573     #[inline]
drop(&mut self)574     fn drop(&mut self) {
575         unsafe {
576             ffi::sqlite3changegroup_delete(self.cg);
577         }
578     }
579 }
580 
581 impl Connection {
582     /// Apply a changeset to a database
apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()> where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,583     pub fn apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()>
584     where
585         F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
586         C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
587     {
588         let db = self.db.borrow_mut().db;
589 
590         let filtered = filter.is_some();
591         let tuple = &mut (filter, conflict);
592         check(unsafe {
593             if filtered {
594                 ffi::sqlite3changeset_apply(
595                     db,
596                     cs.n,
597                     cs.cs,
598                     Some(call_filter::<F, C>),
599                     Some(call_conflict::<F, C>),
600                     tuple as *mut (Option<F>, C) as *mut c_void,
601                 )
602             } else {
603                 ffi::sqlite3changeset_apply(
604                     db,
605                     cs.n,
606                     cs.cs,
607                     None,
608                     Some(call_conflict::<F, C>),
609                     tuple as *mut (Option<F>, C) as *mut c_void,
610                 )
611             }
612         })
613     }
614 
615     /// Apply a changeset to a database
apply_strm<F, C>( &self, input: &mut dyn Read, filter: Option<F>, conflict: C, ) -> Result<()> where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,616     pub fn apply_strm<F, C>(
617         &self,
618         input: &mut dyn Read,
619         filter: Option<F>,
620         conflict: C,
621     ) -> Result<()>
622     where
623         F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
624         C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
625     {
626         let input_ref = &input;
627         let db = self.db.borrow_mut().db;
628 
629         let filtered = filter.is_some();
630         let tuple = &mut (filter, conflict);
631         check(unsafe {
632             if filtered {
633                 ffi::sqlite3changeset_apply_strm(
634                     db,
635                     Some(x_input),
636                     input_ref as *const &mut dyn Read as *mut c_void,
637                     Some(call_filter::<F, C>),
638                     Some(call_conflict::<F, C>),
639                     tuple as *mut (Option<F>, C) as *mut c_void,
640                 )
641             } else {
642                 ffi::sqlite3changeset_apply_strm(
643                     db,
644                     Some(x_input),
645                     input_ref as *const &mut dyn Read as *mut c_void,
646                     None,
647                     Some(call_conflict::<F, C>),
648                     tuple as *mut (Option<F>, C) as *mut c_void,
649                 )
650             }
651         })
652     }
653 }
654 
655 /// Constants passed to the conflict handler
656 /// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_CONFLICT) for details.
657 #[allow(missing_docs)]
658 #[repr(i32)]
659 #[derive(Debug, PartialEq)]
660 #[non_exhaustive]
661 #[allow(clippy::upper_case_acronyms)]
662 pub enum ConflictType {
663     UNKNOWN = -1,
664     SQLITE_CHANGESET_DATA = ffi::SQLITE_CHANGESET_DATA,
665     SQLITE_CHANGESET_NOTFOUND = ffi::SQLITE_CHANGESET_NOTFOUND,
666     SQLITE_CHANGESET_CONFLICT = ffi::SQLITE_CHANGESET_CONFLICT,
667     SQLITE_CHANGESET_CONSTRAINT = ffi::SQLITE_CHANGESET_CONSTRAINT,
668     SQLITE_CHANGESET_FOREIGN_KEY = ffi::SQLITE_CHANGESET_FOREIGN_KEY,
669 }
670 impl From<i32> for ConflictType {
from(code: i32) -> ConflictType671     fn from(code: i32) -> ConflictType {
672         match code {
673             ffi::SQLITE_CHANGESET_DATA => ConflictType::SQLITE_CHANGESET_DATA,
674             ffi::SQLITE_CHANGESET_NOTFOUND => ConflictType::SQLITE_CHANGESET_NOTFOUND,
675             ffi::SQLITE_CHANGESET_CONFLICT => ConflictType::SQLITE_CHANGESET_CONFLICT,
676             ffi::SQLITE_CHANGESET_CONSTRAINT => ConflictType::SQLITE_CHANGESET_CONSTRAINT,
677             ffi::SQLITE_CHANGESET_FOREIGN_KEY => ConflictType::SQLITE_CHANGESET_FOREIGN_KEY,
678             _ => ConflictType::UNKNOWN,
679         }
680     }
681 }
682 
683 /// Constants returned by the conflict handler
684 /// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_ABORT) for details.
685 #[allow(missing_docs)]
686 #[repr(i32)]
687 #[derive(Debug, PartialEq)]
688 #[non_exhaustive]
689 #[allow(clippy::upper_case_acronyms)]
690 pub enum ConflictAction {
691     SQLITE_CHANGESET_OMIT = ffi::SQLITE_CHANGESET_OMIT,
692     SQLITE_CHANGESET_REPLACE = ffi::SQLITE_CHANGESET_REPLACE,
693     SQLITE_CHANGESET_ABORT = ffi::SQLITE_CHANGESET_ABORT,
694 }
695 
call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,696 unsafe extern "C" fn call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int
697 where
698     F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
699     C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
700 {
701     use std::str;
702 
703     let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
704     let tbl_name = {
705         let c_slice = CStr::from_ptr(tbl_str).to_bytes();
706         str::from_utf8(c_slice)
707     };
708     match *tuple {
709         (Some(ref filter), _) => {
710             if let Ok(true) = catch_unwind(|| filter(tbl_name.expect("illegal table name"))) {
711                 1
712             } else {
713                 0
714             }
715         }
716         _ => unimplemented!(),
717     }
718 }
719 
call_conflict<F, C>( p_ctx: *mut c_void, e_conflict: c_int, p: *mut ffi::sqlite3_changeset_iter, ) -> c_int where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,720 unsafe extern "C" fn call_conflict<F, C>(
721     p_ctx: *mut c_void,
722     e_conflict: c_int,
723     p: *mut ffi::sqlite3_changeset_iter,
724 ) -> c_int
725 where
726     F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
727     C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
728 {
729     let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
730     let conflict_type = ConflictType::from(e_conflict);
731     let item = ChangesetItem { it: p };
732     if let Ok(action) = catch_unwind(|| (*tuple).1(conflict_type, item)) {
733         action as c_int
734     } else {
735         ffi::SQLITE_CHANGESET_ABORT
736     }
737 }
738 
x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int739 unsafe extern "C" fn x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int {
740     if p_in.is_null() {
741         return ffi::SQLITE_MISUSE;
742     }
743     let bytes: &mut [u8] = from_raw_parts_mut(data as *mut u8, *len as usize);
744     let input = p_in as *mut &mut dyn Read;
745     match (*input).read(bytes) {
746         Ok(n) => {
747             *len = n as i32; // TODO Validate: n = 0 may not mean the reader will always no longer be able to
748                              // produce bytes.
749             ffi::SQLITE_OK
750         }
751         Err(_) => ffi::SQLITE_IOERR_READ, // TODO check if err is a (ru)sqlite Error => propagate
752     }
753 }
754 
x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int755 unsafe extern "C" fn x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int {
756     if p_out.is_null() {
757         return ffi::SQLITE_MISUSE;
758     }
759     // The sessions module never invokes an xOutput callback with the third
760     // parameter set to a value less than or equal to zero.
761     let bytes: &[u8] = from_raw_parts(data as *const u8, len as usize);
762     let output = p_out as *mut &mut dyn Write;
763     match (*output).write_all(bytes) {
764         Ok(_) => ffi::SQLITE_OK,
765         Err(_) => ffi::SQLITE_IOERR_WRITE, // TODO check if err is a (ru)sqlite Error => propagate
766     }
767 }
768 
769 #[cfg(test)]
770 mod test {
771     use fallible_streaming_iterator::FallibleStreamingIterator;
772     use std::io::Read;
773     use std::sync::atomic::{AtomicBool, Ordering};
774 
775     use super::{Changeset, ChangesetIter, ConflictAction, ConflictType, Session};
776     use crate::hooks::Action;
777     use crate::{Connection, Result};
778 
one_changeset() -> Result<Changeset>779     fn one_changeset() -> Result<Changeset> {
780         let db = Connection::open_in_memory()?;
781         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
782 
783         let mut session = Session::new(&db)?;
784         assert!(session.is_empty());
785 
786         session.attach(None)?;
787         db.execute("INSERT INTO foo (t) VALUES (?);", ["bar"])?;
788 
789         session.changeset()
790     }
791 
one_changeset_strm() -> Result<Vec<u8>>792     fn one_changeset_strm() -> Result<Vec<u8>> {
793         let db = Connection::open_in_memory()?;
794         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
795 
796         let mut session = Session::new(&db)?;
797         assert!(session.is_empty());
798 
799         session.attach(None)?;
800         db.execute("INSERT INTO foo (t) VALUES (?);", ["bar"])?;
801 
802         let mut output = Vec::new();
803         session.changeset_strm(&mut output)?;
804         Ok(output)
805     }
806 
807     #[test]
test_changeset() -> Result<()>808     fn test_changeset() -> Result<()> {
809         let changeset = one_changeset()?;
810         let mut iter = changeset.iter()?;
811         let item = iter.next()?;
812         assert!(item.is_some());
813 
814         let item = item.unwrap();
815         let op = item.op()?;
816         assert_eq!("foo", op.table_name());
817         assert_eq!(1, op.number_of_columns());
818         assert_eq!(Action::SQLITE_INSERT, op.code());
819         assert!(!op.indirect());
820 
821         let pk = item.pk()?;
822         assert_eq!(&[1], pk);
823 
824         let new_value = item.new_value(0)?;
825         assert_eq!(Ok("bar"), new_value.as_str());
826         Ok(())
827     }
828 
829     #[test]
test_changeset_strm() -> Result<()>830     fn test_changeset_strm() -> Result<()> {
831         let output = one_changeset_strm()?;
832         assert!(!output.is_empty());
833         assert_eq!(14, output.len());
834 
835         let input: &mut dyn Read = &mut output.as_slice();
836         let mut iter = ChangesetIter::start_strm(&input)?;
837         let item = iter.next()?;
838         assert!(item.is_some());
839         Ok(())
840     }
841 
842     #[test]
test_changeset_apply() -> Result<()>843     fn test_changeset_apply() -> Result<()> {
844         let changeset = one_changeset()?;
845 
846         let db = Connection::open_in_memory()?;
847         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
848 
849         static CALLED: AtomicBool = AtomicBool::new(false);
850         db.apply(
851             &changeset,
852             None::<fn(&str) -> bool>,
853             |_conflict_type, _item| {
854                 CALLED.store(true, Ordering::Relaxed);
855                 ConflictAction::SQLITE_CHANGESET_OMIT
856             },
857         )?;
858 
859         assert!(!CALLED.load(Ordering::Relaxed));
860         let check = db.query_row("SELECT 1 FROM foo WHERE t = ?", ["bar"], |row| {
861             row.get::<_, i32>(0)
862         })?;
863         assert_eq!(1, check);
864 
865         // conflict expected when same changeset applied again on the same db
866         db.apply(
867             &changeset,
868             None::<fn(&str) -> bool>,
869             |conflict_type, item| {
870                 CALLED.store(true, Ordering::Relaxed);
871                 assert_eq!(ConflictType::SQLITE_CHANGESET_CONFLICT, conflict_type);
872                 let conflict = item.conflict(0).unwrap();
873                 assert_eq!(Ok("bar"), conflict.as_str());
874                 ConflictAction::SQLITE_CHANGESET_OMIT
875             },
876         )?;
877         assert!(CALLED.load(Ordering::Relaxed));
878         Ok(())
879     }
880 
881     #[test]
test_changeset_apply_strm() -> Result<()>882     fn test_changeset_apply_strm() -> Result<()> {
883         let output = one_changeset_strm()?;
884 
885         let db = Connection::open_in_memory()?;
886         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
887 
888         let mut input = output.as_slice();
889         db.apply_strm(
890             &mut input,
891             None::<fn(&str) -> bool>,
892             |_conflict_type, _item| ConflictAction::SQLITE_CHANGESET_OMIT,
893         )?;
894 
895         let check = db.query_row("SELECT 1 FROM foo WHERE t = ?", ["bar"], |row| {
896             row.get::<_, i32>(0)
897         })?;
898         assert_eq!(1, check);
899         Ok(())
900     }
901 
902     #[test]
test_session_empty() -> Result<()>903     fn test_session_empty() -> Result<()> {
904         let db = Connection::open_in_memory()?;
905         db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
906 
907         let mut session = Session::new(&db)?;
908         assert!(session.is_empty());
909 
910         session.attach(None)?;
911         db.execute("INSERT INTO foo (t) VALUES (?);", ["bar"])?;
912 
913         assert!(!session.is_empty());
914         Ok(())
915     }
916 
917     #[test]
test_session_set_enabled() -> Result<()>918     fn test_session_set_enabled() -> Result<()> {
919         let db = Connection::open_in_memory()?;
920 
921         let mut session = Session::new(&db)?;
922         assert!(session.is_enabled());
923         session.set_enabled(false);
924         assert!(!session.is_enabled());
925         Ok(())
926     }
927 
928     #[test]
test_session_set_indirect() -> Result<()>929     fn test_session_set_indirect() -> Result<()> {
930         let db = Connection::open_in_memory()?;
931 
932         let mut session = Session::new(&db)?;
933         assert!(!session.is_indirect());
934         session.set_indirect(true);
935         assert!(session.is_indirect());
936         Ok(())
937     }
938 }
939