1 //! [Session Extension](https://sqlite.org/sessionintro.html)
2 #![allow(non_camel_case_types)]
3
4 use std::ffi::CStr;
5 use std::io::{Read, Write};
6 use std::marker::PhantomData;
7 use std::os::raw::{c_char, c_int, c_uchar, c_void};
8 use std::panic::{catch_unwind, RefUnwindSafe};
9 use std::ptr;
10 use std::slice::{from_raw_parts, from_raw_parts_mut};
11
12 use fallible_streaming_iterator::FallibleStreamingIterator;
13
14 use crate::error::{check, error_from_sqlite_code};
15 use crate::ffi;
16 use crate::hooks::Action;
17 use crate::types::ValueRef;
18 use crate::{errmsg_to_string, str_to_cstring, Connection, DatabaseName, Result};
19
20 // https://sqlite.org/session.html
21
22 /// An instance of this object is a session that can be
23 /// used to record changes to a database.
24 pub struct Session<'conn> {
25 phantom: PhantomData<&'conn Connection>,
26 s: *mut ffi::sqlite3_session,
27 filter: Option<Box<dyn Fn(&str) -> bool>>,
28 }
29
30 impl Session<'_> {
31 /// Create a new session object
32 #[inline]
new(db: &Connection) -> Result<Session<'_>>33 pub fn new(db: &Connection) -> Result<Session<'_>> {
34 Session::new_with_name(db, DatabaseName::Main)
35 }
36
37 /// Create a new session object
38 #[inline]
new_with_name<'conn>( db: &'conn Connection, name: DatabaseName<'_>, ) -> Result<Session<'conn>>39 pub fn new_with_name<'conn>(
40 db: &'conn Connection,
41 name: DatabaseName<'_>,
42 ) -> Result<Session<'conn>> {
43 let name = name.as_cstring()?;
44
45 let db = db.db.borrow_mut().db;
46
47 let mut s: *mut ffi::sqlite3_session = ptr::null_mut();
48 check(unsafe { ffi::sqlite3session_create(db, name.as_ptr(), &mut s) })?;
49
50 Ok(Session {
51 phantom: PhantomData,
52 s,
53 filter: None,
54 })
55 }
56
57 /// Set a table filter
table_filter<F>(&mut self, filter: Option<F>) where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,58 pub fn table_filter<F>(&mut self, filter: Option<F>)
59 where
60 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
61 {
62 unsafe extern "C" fn call_boxed_closure<F>(
63 p_arg: *mut c_void,
64 tbl_str: *const c_char,
65 ) -> c_int
66 where
67 F: Fn(&str) -> bool + RefUnwindSafe,
68 {
69 use std::str;
70
71 let boxed_filter: *mut F = p_arg as *mut F;
72 let tbl_name = {
73 let c_slice = CStr::from_ptr(tbl_str).to_bytes();
74 str::from_utf8(c_slice)
75 };
76 if let Ok(true) =
77 catch_unwind(|| (*boxed_filter)(tbl_name.expect("non-utf8 table name")))
78 {
79 1
80 } else {
81 0
82 }
83 }
84
85 match filter {
86 Some(filter) => {
87 let boxed_filter = Box::new(filter);
88 unsafe {
89 ffi::sqlite3session_table_filter(
90 self.s,
91 Some(call_boxed_closure::<F>),
92 &*boxed_filter as *const F as *mut _,
93 );
94 }
95 self.filter = Some(boxed_filter);
96 }
97 _ => {
98 unsafe { ffi::sqlite3session_table_filter(self.s, None, ptr::null_mut()) }
99 self.filter = None;
100 }
101 };
102 }
103
104 /// Attach a table. `None` means all tables.
attach(&mut self, table: Option<&str>) -> Result<()>105 pub fn attach(&mut self, table: Option<&str>) -> Result<()> {
106 let table = if let Some(table) = table {
107 Some(str_to_cstring(table)?)
108 } else {
109 None
110 };
111 let table = table.as_ref().map(|s| s.as_ptr()).unwrap_or(ptr::null());
112 check(unsafe { ffi::sqlite3session_attach(self.s, table) })
113 }
114
115 /// Generate a Changeset
changeset(&mut self) -> Result<Changeset>116 pub fn changeset(&mut self) -> Result<Changeset> {
117 let mut n = 0;
118 let mut cs: *mut c_void = ptr::null_mut();
119 check(unsafe { ffi::sqlite3session_changeset(self.s, &mut n, &mut cs) })?;
120 Ok(Changeset { cs, n })
121 }
122
123 /// Write the set of changes represented by this session to `output`.
124 #[inline]
changeset_strm(&mut self, output: &mut dyn Write) -> Result<()>125 pub fn changeset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
126 let output_ref = &output;
127 check(unsafe {
128 ffi::sqlite3session_changeset_strm(
129 self.s,
130 Some(x_output),
131 output_ref as *const &mut dyn Write as *mut c_void,
132 )
133 })
134 }
135
136 /// Generate a Patchset
137 #[inline]
patchset(&mut self) -> Result<Changeset>138 pub fn patchset(&mut self) -> Result<Changeset> {
139 let mut n = 0;
140 let mut ps: *mut c_void = ptr::null_mut();
141 check(unsafe { ffi::sqlite3session_patchset(self.s, &mut n, &mut ps) })?;
142 // TODO Validate: same struct
143 Ok(Changeset { cs: ps, n })
144 }
145
146 /// Write the set of patches represented by this session to `output`.
147 #[inline]
patchset_strm(&mut self, output: &mut dyn Write) -> Result<()>148 pub fn patchset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
149 let output_ref = &output;
150 check(unsafe {
151 ffi::sqlite3session_patchset_strm(
152 self.s,
153 Some(x_output),
154 output_ref as *const &mut dyn Write as *mut c_void,
155 )
156 })
157 }
158
159 /// Load the difference between tables.
diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()>160 pub fn diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()> {
161 let from = from.as_cstring()?;
162 let table = str_to_cstring(table)?;
163 let table = table.as_ptr();
164 unsafe {
165 let mut errmsg = ptr::null_mut();
166 let r =
167 ffi::sqlite3session_diff(self.s, from.as_ptr(), table, &mut errmsg as *mut *mut _);
168 if r != ffi::SQLITE_OK {
169 let errmsg: *mut c_char = errmsg;
170 let message = errmsg_to_string(&*errmsg);
171 ffi::sqlite3_free(errmsg as *mut c_void);
172 return Err(error_from_sqlite_code(r, Some(message)));
173 }
174 }
175 Ok(())
176 }
177
178 /// Test if a changeset has recorded any changes
179 #[inline]
is_empty(&self) -> bool180 pub fn is_empty(&self) -> bool {
181 unsafe { ffi::sqlite3session_isempty(self.s) != 0 }
182 }
183
184 /// Query the current state of the session
185 #[inline]
is_enabled(&self) -> bool186 pub fn is_enabled(&self) -> bool {
187 unsafe { ffi::sqlite3session_enable(self.s, -1) != 0 }
188 }
189
190 /// Enable or disable the recording of changes
191 #[inline]
set_enabled(&mut self, enabled: bool)192 pub fn set_enabled(&mut self, enabled: bool) {
193 unsafe {
194 ffi::sqlite3session_enable(self.s, if enabled { 1 } else { 0 });
195 }
196 }
197
198 /// Query the current state of the indirect flag
199 #[inline]
is_indirect(&self) -> bool200 pub fn is_indirect(&self) -> bool {
201 unsafe { ffi::sqlite3session_indirect(self.s, -1) != 0 }
202 }
203
204 /// Set or clear the indirect change flag
205 #[inline]
set_indirect(&mut self, indirect: bool)206 pub fn set_indirect(&mut self, indirect: bool) {
207 unsafe {
208 ffi::sqlite3session_indirect(self.s, if indirect { 1 } else { 0 });
209 }
210 }
211 }
212
213 impl Drop for Session<'_> {
214 #[inline]
drop(&mut self)215 fn drop(&mut self) {
216 if self.filter.is_some() {
217 self.table_filter(None::<fn(&str) -> bool>);
218 }
219 unsafe { ffi::sqlite3session_delete(self.s) };
220 }
221 }
222
223 /// Invert a changeset
224 #[inline]
invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()>225 pub fn invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()> {
226 let input_ref = &input;
227 let output_ref = &output;
228 check(unsafe {
229 ffi::sqlite3changeset_invert_strm(
230 Some(x_input),
231 input_ref as *const &mut dyn Read as *mut c_void,
232 Some(x_output),
233 output_ref as *const &mut dyn Write as *mut c_void,
234 )
235 })
236 }
237
238 /// Combine two changesets
239 #[inline]
concat_strm( input_a: &mut dyn Read, input_b: &mut dyn Read, output: &mut dyn Write, ) -> Result<()>240 pub fn concat_strm(
241 input_a: &mut dyn Read,
242 input_b: &mut dyn Read,
243 output: &mut dyn Write,
244 ) -> Result<()> {
245 let input_a_ref = &input_a;
246 let input_b_ref = &input_b;
247 let output_ref = &output;
248 check(unsafe {
249 ffi::sqlite3changeset_concat_strm(
250 Some(x_input),
251 input_a_ref as *const &mut dyn Read as *mut c_void,
252 Some(x_input),
253 input_b_ref as *const &mut dyn Read as *mut c_void,
254 Some(x_output),
255 output_ref as *const &mut dyn Write as *mut c_void,
256 )
257 })
258 }
259
260 /// Changeset or Patchset
261 pub struct Changeset {
262 cs: *mut c_void,
263 n: c_int,
264 }
265
266 impl Changeset {
267 /// Invert a changeset
268 #[inline]
invert(&self) -> Result<Changeset>269 pub fn invert(&self) -> Result<Changeset> {
270 let mut n = 0;
271 let mut cs = ptr::null_mut();
272 check(unsafe {
273 ffi::sqlite3changeset_invert(self.n, self.cs, &mut n, &mut cs as *mut *mut _)
274 })?;
275 Ok(Changeset { cs, n })
276 }
277
278 /// Create an iterator to traverse a changeset
279 #[inline]
iter(&self) -> Result<ChangesetIter<'_>>280 pub fn iter(&self) -> Result<ChangesetIter<'_>> {
281 let mut it = ptr::null_mut();
282 check(unsafe { ffi::sqlite3changeset_start(&mut it as *mut *mut _, self.n, self.cs) })?;
283 Ok(ChangesetIter {
284 phantom: PhantomData,
285 it,
286 item: None,
287 })
288 }
289
290 /// Concatenate two changeset objects
291 #[inline]
concat(a: &Changeset, b: &Changeset) -> Result<Changeset>292 pub fn concat(a: &Changeset, b: &Changeset) -> Result<Changeset> {
293 let mut n = 0;
294 let mut cs = ptr::null_mut();
295 check(unsafe {
296 ffi::sqlite3changeset_concat(a.n, a.cs, b.n, b.cs, &mut n, &mut cs as *mut *mut _)
297 })?;
298 Ok(Changeset { cs, n })
299 }
300 }
301
302 impl Drop for Changeset {
303 #[inline]
drop(&mut self)304 fn drop(&mut self) {
305 unsafe {
306 ffi::sqlite3_free(self.cs);
307 }
308 }
309 }
310
311 /// Cursor for iterating over the elements of a changeset
312 /// or patchset.
313 pub struct ChangesetIter<'changeset> {
314 phantom: PhantomData<&'changeset Changeset>,
315 it: *mut ffi::sqlite3_changeset_iter,
316 item: Option<ChangesetItem>,
317 }
318
319 impl ChangesetIter<'_> {
320 /// Create an iterator on `input`
321 #[inline]
start_strm<'input>(input: &&'input mut dyn Read) -> Result<ChangesetIter<'input>>322 pub fn start_strm<'input>(input: &&'input mut dyn Read) -> Result<ChangesetIter<'input>> {
323 let mut it = ptr::null_mut();
324 check(unsafe {
325 ffi::sqlite3changeset_start_strm(
326 &mut it as *mut *mut _,
327 Some(x_input),
328 input as *const &mut dyn Read as *mut c_void,
329 )
330 })?;
331 Ok(ChangesetIter {
332 phantom: PhantomData,
333 it,
334 item: None,
335 })
336 }
337 }
338
339 impl FallibleStreamingIterator for ChangesetIter<'_> {
340 type Error = crate::error::Error;
341 type Item = ChangesetItem;
342
343 #[inline]
advance(&mut self) -> Result<()>344 fn advance(&mut self) -> Result<()> {
345 let rc = unsafe { ffi::sqlite3changeset_next(self.it) };
346 match rc {
347 ffi::SQLITE_ROW => {
348 self.item = Some(ChangesetItem { it: self.it });
349 Ok(())
350 }
351 ffi::SQLITE_DONE => {
352 self.item = None;
353 Ok(())
354 }
355 code => Err(error_from_sqlite_code(code, None)),
356 }
357 }
358
359 #[inline]
get(&self) -> Option<&ChangesetItem>360 fn get(&self) -> Option<&ChangesetItem> {
361 self.item.as_ref()
362 }
363 }
364
365 /// Operation
366 pub struct Operation<'item> {
367 table_name: &'item str,
368 number_of_columns: i32,
369 code: Action,
370 indirect: bool,
371 }
372
373 impl Operation<'_> {
374 /// Returns the table name.
375 #[inline]
table_name(&self) -> &str376 pub fn table_name(&self) -> &str {
377 self.table_name
378 }
379
380 /// Returns the number of columns in table
381 #[inline]
number_of_columns(&self) -> i32382 pub fn number_of_columns(&self) -> i32 {
383 self.number_of_columns
384 }
385
386 /// Returns the action code.
387 #[inline]
code(&self) -> Action388 pub fn code(&self) -> Action {
389 self.code
390 }
391
392 /// Returns `true` for an 'indirect' change.
393 #[inline]
indirect(&self) -> bool394 pub fn indirect(&self) -> bool {
395 self.indirect
396 }
397 }
398
399 impl Drop for ChangesetIter<'_> {
400 #[inline]
drop(&mut self)401 fn drop(&mut self) {
402 unsafe {
403 ffi::sqlite3changeset_finalize(self.it);
404 }
405 }
406 }
407
408 /// An item passed to a conflict-handler by
409 /// [`Connection::apply`](crate::Connection::apply), or an item generated by
410 /// [`ChangesetIter::next`](ChangesetIter::next).
411 // TODO enum ? Delete, Insert, Update, ...
412 pub struct ChangesetItem {
413 it: *mut ffi::sqlite3_changeset_iter,
414 }
415
416 impl ChangesetItem {
417 /// Obtain conflicting row values
418 ///
419 /// May only be called with an `SQLITE_CHANGESET_DATA` or
420 /// `SQLITE_CHANGESET_CONFLICT` conflict handler callback.
421 #[inline]
conflict(&self, col: usize) -> Result<ValueRef<'_>>422 pub fn conflict(&self, col: usize) -> Result<ValueRef<'_>> {
423 unsafe {
424 let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
425 check(ffi::sqlite3changeset_conflict(
426 self.it,
427 col as i32,
428 &mut p_value,
429 ))?;
430 Ok(ValueRef::from_value(p_value))
431 }
432 }
433
434 /// Determine the number of foreign key constraint violations
435 ///
436 /// May only be called with an `SQLITE_CHANGESET_FOREIGN_KEY` conflict
437 /// handler callback.
438 #[inline]
fk_conflicts(&self) -> Result<i32>439 pub fn fk_conflicts(&self) -> Result<i32> {
440 unsafe {
441 let mut p_out = 0;
442 check(ffi::sqlite3changeset_fk_conflicts(self.it, &mut p_out))?;
443 Ok(p_out)
444 }
445 }
446
447 /// Obtain new.* Values
448 ///
449 /// May only be called if the type of change is either `SQLITE_UPDATE` or
450 /// `SQLITE_INSERT`.
451 #[inline]
new_value(&self, col: usize) -> Result<ValueRef<'_>>452 pub fn new_value(&self, col: usize) -> Result<ValueRef<'_>> {
453 unsafe {
454 let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
455 check(ffi::sqlite3changeset_new(self.it, col as i32, &mut p_value))?;
456 Ok(ValueRef::from_value(p_value))
457 }
458 }
459
460 /// Obtain old.* Values
461 ///
462 /// May only be called if the type of change is either `SQLITE_DELETE` or
463 /// `SQLITE_UPDATE`.
464 #[inline]
old_value(&self, col: usize) -> Result<ValueRef<'_>>465 pub fn old_value(&self, col: usize) -> Result<ValueRef<'_>> {
466 unsafe {
467 let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut();
468 check(ffi::sqlite3changeset_old(self.it, col as i32, &mut p_value))?;
469 Ok(ValueRef::from_value(p_value))
470 }
471 }
472
473 /// Obtain the current operation
474 #[inline]
op(&self) -> Result<Operation<'_>>475 pub fn op(&self) -> Result<Operation<'_>> {
476 let mut number_of_columns = 0;
477 let mut code = 0;
478 let mut indirect = 0;
479 let tab = unsafe {
480 let mut pz_tab: *const c_char = ptr::null();
481 check(ffi::sqlite3changeset_op(
482 self.it,
483 &mut pz_tab,
484 &mut number_of_columns,
485 &mut code,
486 &mut indirect,
487 ))?;
488 CStr::from_ptr(pz_tab)
489 };
490 let table_name = tab.to_str()?;
491 Ok(Operation {
492 table_name,
493 number_of_columns,
494 code: Action::from(code),
495 indirect: indirect != 0,
496 })
497 }
498
499 /// Obtain the primary key definition of a table
500 #[inline]
pk(&self) -> Result<&[u8]>501 pub fn pk(&self) -> Result<&[u8]> {
502 let mut number_of_columns = 0;
503 unsafe {
504 let mut pks: *mut c_uchar = ptr::null_mut();
505 check(ffi::sqlite3changeset_pk(
506 self.it,
507 &mut pks,
508 &mut number_of_columns,
509 ))?;
510 Ok(from_raw_parts(pks, number_of_columns as usize))
511 }
512 }
513 }
514
515 /// Used to combine two or more changesets or
516 /// patchsets
517 pub struct Changegroup {
518 cg: *mut ffi::sqlite3_changegroup,
519 }
520
521 impl Changegroup {
522 /// Create a new change group.
523 #[inline]
new() -> Result<Self>524 pub fn new() -> Result<Self> {
525 let mut cg = ptr::null_mut();
526 check(unsafe { ffi::sqlite3changegroup_new(&mut cg) })?;
527 Ok(Changegroup { cg })
528 }
529
530 /// Add a changeset
531 #[inline]
add(&mut self, cs: &Changeset) -> Result<()>532 pub fn add(&mut self, cs: &Changeset) -> Result<()> {
533 check(unsafe { ffi::sqlite3changegroup_add(self.cg, cs.n, cs.cs) })
534 }
535
536 /// Add a changeset read from `input` to this change group.
537 #[inline]
add_stream(&mut self, input: &mut dyn Read) -> Result<()>538 pub fn add_stream(&mut self, input: &mut dyn Read) -> Result<()> {
539 let input_ref = &input;
540 check(unsafe {
541 ffi::sqlite3changegroup_add_strm(
542 self.cg,
543 Some(x_input),
544 input_ref as *const &mut dyn Read as *mut c_void,
545 )
546 })
547 }
548
549 /// Obtain a composite Changeset
550 #[inline]
output(&mut self) -> Result<Changeset>551 pub fn output(&mut self) -> Result<Changeset> {
552 let mut n = 0;
553 let mut output: *mut c_void = ptr::null_mut();
554 check(unsafe { ffi::sqlite3changegroup_output(self.cg, &mut n, &mut output) })?;
555 Ok(Changeset { cs: output, n })
556 }
557
558 /// Write the combined set of changes to `output`.
559 #[inline]
output_strm(&mut self, output: &mut dyn Write) -> Result<()>560 pub fn output_strm(&mut self, output: &mut dyn Write) -> Result<()> {
561 let output_ref = &output;
562 check(unsafe {
563 ffi::sqlite3changegroup_output_strm(
564 self.cg,
565 Some(x_output),
566 output_ref as *const &mut dyn Write as *mut c_void,
567 )
568 })
569 }
570 }
571
572 impl Drop for Changegroup {
573 #[inline]
drop(&mut self)574 fn drop(&mut self) {
575 unsafe {
576 ffi::sqlite3changegroup_delete(self.cg);
577 }
578 }
579 }
580
581 impl Connection {
582 /// Apply a changeset to a database
apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()> where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,583 pub fn apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()>
584 where
585 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
586 C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
587 {
588 let db = self.db.borrow_mut().db;
589
590 let filtered = filter.is_some();
591 let tuple = &mut (filter, conflict);
592 check(unsafe {
593 if filtered {
594 ffi::sqlite3changeset_apply(
595 db,
596 cs.n,
597 cs.cs,
598 Some(call_filter::<F, C>),
599 Some(call_conflict::<F, C>),
600 tuple as *mut (Option<F>, C) as *mut c_void,
601 )
602 } else {
603 ffi::sqlite3changeset_apply(
604 db,
605 cs.n,
606 cs.cs,
607 None,
608 Some(call_conflict::<F, C>),
609 tuple as *mut (Option<F>, C) as *mut c_void,
610 )
611 }
612 })
613 }
614
615 /// Apply a changeset to a database
apply_strm<F, C>( &self, input: &mut dyn Read, filter: Option<F>, conflict: C, ) -> Result<()> where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,616 pub fn apply_strm<F, C>(
617 &self,
618 input: &mut dyn Read,
619 filter: Option<F>,
620 conflict: C,
621 ) -> Result<()>
622 where
623 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
624 C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
625 {
626 let input_ref = &input;
627 let db = self.db.borrow_mut().db;
628
629 let filtered = filter.is_some();
630 let tuple = &mut (filter, conflict);
631 check(unsafe {
632 if filtered {
633 ffi::sqlite3changeset_apply_strm(
634 db,
635 Some(x_input),
636 input_ref as *const &mut dyn Read as *mut c_void,
637 Some(call_filter::<F, C>),
638 Some(call_conflict::<F, C>),
639 tuple as *mut (Option<F>, C) as *mut c_void,
640 )
641 } else {
642 ffi::sqlite3changeset_apply_strm(
643 db,
644 Some(x_input),
645 input_ref as *const &mut dyn Read as *mut c_void,
646 None,
647 Some(call_conflict::<F, C>),
648 tuple as *mut (Option<F>, C) as *mut c_void,
649 )
650 }
651 })
652 }
653 }
654
655 /// Constants passed to the conflict handler
656 /// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_CONFLICT) for details.
657 #[allow(missing_docs)]
658 #[repr(i32)]
659 #[derive(Debug, PartialEq)]
660 #[non_exhaustive]
661 #[allow(clippy::upper_case_acronyms)]
662 pub enum ConflictType {
663 UNKNOWN = -1,
664 SQLITE_CHANGESET_DATA = ffi::SQLITE_CHANGESET_DATA,
665 SQLITE_CHANGESET_NOTFOUND = ffi::SQLITE_CHANGESET_NOTFOUND,
666 SQLITE_CHANGESET_CONFLICT = ffi::SQLITE_CHANGESET_CONFLICT,
667 SQLITE_CHANGESET_CONSTRAINT = ffi::SQLITE_CHANGESET_CONSTRAINT,
668 SQLITE_CHANGESET_FOREIGN_KEY = ffi::SQLITE_CHANGESET_FOREIGN_KEY,
669 }
670 impl From<i32> for ConflictType {
from(code: i32) -> ConflictType671 fn from(code: i32) -> ConflictType {
672 match code {
673 ffi::SQLITE_CHANGESET_DATA => ConflictType::SQLITE_CHANGESET_DATA,
674 ffi::SQLITE_CHANGESET_NOTFOUND => ConflictType::SQLITE_CHANGESET_NOTFOUND,
675 ffi::SQLITE_CHANGESET_CONFLICT => ConflictType::SQLITE_CHANGESET_CONFLICT,
676 ffi::SQLITE_CHANGESET_CONSTRAINT => ConflictType::SQLITE_CHANGESET_CONSTRAINT,
677 ffi::SQLITE_CHANGESET_FOREIGN_KEY => ConflictType::SQLITE_CHANGESET_FOREIGN_KEY,
678 _ => ConflictType::UNKNOWN,
679 }
680 }
681 }
682
683 /// Constants returned by the conflict handler
684 /// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_ABORT) for details.
685 #[allow(missing_docs)]
686 #[repr(i32)]
687 #[derive(Debug, PartialEq)]
688 #[non_exhaustive]
689 #[allow(clippy::upper_case_acronyms)]
690 pub enum ConflictAction {
691 SQLITE_CHANGESET_OMIT = ffi::SQLITE_CHANGESET_OMIT,
692 SQLITE_CHANGESET_REPLACE = ffi::SQLITE_CHANGESET_REPLACE,
693 SQLITE_CHANGESET_ABORT = ffi::SQLITE_CHANGESET_ABORT,
694 }
695
call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,696 unsafe extern "C" fn call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int
697 where
698 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
699 C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
700 {
701 use std::str;
702
703 let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
704 let tbl_name = {
705 let c_slice = CStr::from_ptr(tbl_str).to_bytes();
706 str::from_utf8(c_slice)
707 };
708 match *tuple {
709 (Some(ref filter), _) => {
710 if let Ok(true) = catch_unwind(|| filter(tbl_name.expect("illegal table name"))) {
711 1
712 } else {
713 0
714 }
715 }
716 _ => unimplemented!(),
717 }
718 }
719
call_conflict<F, C>( p_ctx: *mut c_void, e_conflict: c_int, p: *mut ffi::sqlite3_changeset_iter, ) -> c_int where F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,720 unsafe extern "C" fn call_conflict<F, C>(
721 p_ctx: *mut c_void,
722 e_conflict: c_int,
723 p: *mut ffi::sqlite3_changeset_iter,
724 ) -> c_int
725 where
726 F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
727 C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
728 {
729 let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
730 let conflict_type = ConflictType::from(e_conflict);
731 let item = ChangesetItem { it: p };
732 if let Ok(action) = catch_unwind(|| (*tuple).1(conflict_type, item)) {
733 action as c_int
734 } else {
735 ffi::SQLITE_CHANGESET_ABORT
736 }
737 }
738
x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int739 unsafe extern "C" fn x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int {
740 if p_in.is_null() {
741 return ffi::SQLITE_MISUSE;
742 }
743 let bytes: &mut [u8] = from_raw_parts_mut(data as *mut u8, *len as usize);
744 let input = p_in as *mut &mut dyn Read;
745 match (*input).read(bytes) {
746 Ok(n) => {
747 *len = n as i32; // TODO Validate: n = 0 may not mean the reader will always no longer be able to
748 // produce bytes.
749 ffi::SQLITE_OK
750 }
751 Err(_) => ffi::SQLITE_IOERR_READ, // TODO check if err is a (ru)sqlite Error => propagate
752 }
753 }
754
x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int755 unsafe extern "C" fn x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int {
756 if p_out.is_null() {
757 return ffi::SQLITE_MISUSE;
758 }
759 // The sessions module never invokes an xOutput callback with the third
760 // parameter set to a value less than or equal to zero.
761 let bytes: &[u8] = from_raw_parts(data as *const u8, len as usize);
762 let output = p_out as *mut &mut dyn Write;
763 match (*output).write_all(bytes) {
764 Ok(_) => ffi::SQLITE_OK,
765 Err(_) => ffi::SQLITE_IOERR_WRITE, // TODO check if err is a (ru)sqlite Error => propagate
766 }
767 }
768
769 #[cfg(test)]
770 mod test {
771 use fallible_streaming_iterator::FallibleStreamingIterator;
772 use std::io::Read;
773 use std::sync::atomic::{AtomicBool, Ordering};
774
775 use super::{Changeset, ChangesetIter, ConflictAction, ConflictType, Session};
776 use crate::hooks::Action;
777 use crate::{Connection, Result};
778
one_changeset() -> Result<Changeset>779 fn one_changeset() -> Result<Changeset> {
780 let db = Connection::open_in_memory()?;
781 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
782
783 let mut session = Session::new(&db)?;
784 assert!(session.is_empty());
785
786 session.attach(None)?;
787 db.execute("INSERT INTO foo (t) VALUES (?);", ["bar"])?;
788
789 session.changeset()
790 }
791
one_changeset_strm() -> Result<Vec<u8>>792 fn one_changeset_strm() -> Result<Vec<u8>> {
793 let db = Connection::open_in_memory()?;
794 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
795
796 let mut session = Session::new(&db)?;
797 assert!(session.is_empty());
798
799 session.attach(None)?;
800 db.execute("INSERT INTO foo (t) VALUES (?);", ["bar"])?;
801
802 let mut output = Vec::new();
803 session.changeset_strm(&mut output)?;
804 Ok(output)
805 }
806
807 #[test]
test_changeset() -> Result<()>808 fn test_changeset() -> Result<()> {
809 let changeset = one_changeset()?;
810 let mut iter = changeset.iter()?;
811 let item = iter.next()?;
812 assert!(item.is_some());
813
814 let item = item.unwrap();
815 let op = item.op()?;
816 assert_eq!("foo", op.table_name());
817 assert_eq!(1, op.number_of_columns());
818 assert_eq!(Action::SQLITE_INSERT, op.code());
819 assert!(!op.indirect());
820
821 let pk = item.pk()?;
822 assert_eq!(&[1], pk);
823
824 let new_value = item.new_value(0)?;
825 assert_eq!(Ok("bar"), new_value.as_str());
826 Ok(())
827 }
828
829 #[test]
test_changeset_strm() -> Result<()>830 fn test_changeset_strm() -> Result<()> {
831 let output = one_changeset_strm()?;
832 assert!(!output.is_empty());
833 assert_eq!(14, output.len());
834
835 let input: &mut dyn Read = &mut output.as_slice();
836 let mut iter = ChangesetIter::start_strm(&input)?;
837 let item = iter.next()?;
838 assert!(item.is_some());
839 Ok(())
840 }
841
842 #[test]
test_changeset_apply() -> Result<()>843 fn test_changeset_apply() -> Result<()> {
844 let changeset = one_changeset()?;
845
846 let db = Connection::open_in_memory()?;
847 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
848
849 static CALLED: AtomicBool = AtomicBool::new(false);
850 db.apply(
851 &changeset,
852 None::<fn(&str) -> bool>,
853 |_conflict_type, _item| {
854 CALLED.store(true, Ordering::Relaxed);
855 ConflictAction::SQLITE_CHANGESET_OMIT
856 },
857 )?;
858
859 assert!(!CALLED.load(Ordering::Relaxed));
860 let check = db.query_row("SELECT 1 FROM foo WHERE t = ?", ["bar"], |row| {
861 row.get::<_, i32>(0)
862 })?;
863 assert_eq!(1, check);
864
865 // conflict expected when same changeset applied again on the same db
866 db.apply(
867 &changeset,
868 None::<fn(&str) -> bool>,
869 |conflict_type, item| {
870 CALLED.store(true, Ordering::Relaxed);
871 assert_eq!(ConflictType::SQLITE_CHANGESET_CONFLICT, conflict_type);
872 let conflict = item.conflict(0).unwrap();
873 assert_eq!(Ok("bar"), conflict.as_str());
874 ConflictAction::SQLITE_CHANGESET_OMIT
875 },
876 )?;
877 assert!(CALLED.load(Ordering::Relaxed));
878 Ok(())
879 }
880
881 #[test]
test_changeset_apply_strm() -> Result<()>882 fn test_changeset_apply_strm() -> Result<()> {
883 let output = one_changeset_strm()?;
884
885 let db = Connection::open_in_memory()?;
886 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
887
888 let mut input = output.as_slice();
889 db.apply_strm(
890 &mut input,
891 None::<fn(&str) -> bool>,
892 |_conflict_type, _item| ConflictAction::SQLITE_CHANGESET_OMIT,
893 )?;
894
895 let check = db.query_row("SELECT 1 FROM foo WHERE t = ?", ["bar"], |row| {
896 row.get::<_, i32>(0)
897 })?;
898 assert_eq!(1, check);
899 Ok(())
900 }
901
902 #[test]
test_session_empty() -> Result<()>903 fn test_session_empty() -> Result<()> {
904 let db = Connection::open_in_memory()?;
905 db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?;
906
907 let mut session = Session::new(&db)?;
908 assert!(session.is_empty());
909
910 session.attach(None)?;
911 db.execute("INSERT INTO foo (t) VALUES (?);", ["bar"])?;
912
913 assert!(!session.is_empty());
914 Ok(())
915 }
916
917 #[test]
test_session_set_enabled() -> Result<()>918 fn test_session_set_enabled() -> Result<()> {
919 let db = Connection::open_in_memory()?;
920
921 let mut session = Session::new(&db)?;
922 assert!(session.is_enabled());
923 session.set_enabled(false);
924 assert!(!session.is_enabled());
925 Ok(())
926 }
927
928 #[test]
test_session_set_indirect() -> Result<()>929 fn test_session_set_indirect() -> Result<()> {
930 let db = Connection::open_in_memory()?;
931
932 let mut session = Session::new(&db)?;
933 assert!(!session.is_indirect());
934 session.set_indirect(true);
935 assert!(session.is_indirect());
936 Ok(())
937 }
938 }
939