diff --git a/CHANGELOG.md b/CHANGELOG.md index c856351..693b620 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ -## 0.0.4 (unreleased) +## 0.0.4 - Update PowerSync core extension to version 0.4.11. +- Improvements for raw tables: + - The `put` and `delete` statements are optional now. + - The `RawTableSchema` struct represents a raw table in the local database, and can be used + to create triggers forwarding writes to the CRUD upload queue and to infer statements used + to sync data into raw tables. ## 0.0.3 diff --git a/powersync/src/db/schema.rs b/powersync/src/db/schema.rs index e5b09d3..3da3bac 100644 --- a/powersync/src/db/schema.rs +++ b/powersync/src/db/schema.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; use std::collections::HashSet; -use serde::{Serialize, ser::SerializeStruct}; +use serde::{Serialize, Serializer, ser::SerializeStruct}; use crate::error::PowerSyncError; @@ -29,6 +29,10 @@ impl Schema { table.validate()?; } + for table in &self.raw_tables { + table.validate()?; + } + Ok(()) } @@ -52,30 +56,19 @@ impl Schema { /// /// When this is part of a schema, the PowerSync SDK will create and auto-migrate the table. /// If you need direct control on a table, use [RawTable] instead. -#[derive(Debug)] +#[derive(Debug, Serialize)] pub struct Table { /// The synced table name, matching sync rules. pub name: SchemaString, // Override the name for the view. + #[serde(rename = "view_name")] pub view_name_override: Option, /// List of columns. pub columns: Vec, /// List of indexes. pub indexes: Vec, - /// Whether this is a local-only table. - pub local_only: bool, - /// Whether this is an insert-only table. - pub insert_only: bool, - /// Whether to add a hidden `_metadata` column that will be enabled for updates to attach custom - /// information about writes that will be reported through crud entries. - pub track_metadata: bool, - /// When set, track old values of columns for CRUD entries. - /// - /// See [TrackPreviousValues] for details. - pub track_previous_values: Option, - /// Whether an `UPDATE` statement that doesn't change any values should be ignored when creating - /// CRUD entries. - pub ignore_empty_updates: bool, + #[serde(flatten)] + pub options: TableOptions, } impl Table { @@ -92,11 +85,7 @@ impl Table { view_name_override: None, columns, indexes: vec![], - local_only: false, - insert_only: false, - track_metadata: false, - track_previous_values: None, - ignore_empty_updates: false, + options: TableOptions::default(), }; build(&mut table); table @@ -115,17 +104,7 @@ impl Table { Schema::validate_name(view_name_override, "table view")?; } - if self.local_only && self.track_metadata { - return Err(PowerSyncError::argument_error( - "Can't track metadata for local-only tables", - )); - } - - if self.local_only && self.track_previous_values.is_some() { - return Err(PowerSyncError::argument_error( - "Can't track old values for local-only tables", - )); - } + self.options.validate()?; let mut column_names = HashSet::new(); column_names.insert("id"); @@ -172,18 +151,59 @@ impl Table { const MAX_AMOUNT_OF_COLUMNS: usize = 1999; } -impl Serialize for Table { +/// Options that apply to both view-based JSON tables and raw tables. +#[derive(Debug, Default)] +pub struct TableOptions { + /// Whether this is a local-only table. + pub local_only: bool, + /// Whether this is an insert-only table. + pub insert_only: bool, + /// Whether to add a hidden `_metadata` column that will be enabled for updates to attach custom + /// information about writes that will be reported through crud entries. + pub track_metadata: bool, + /// When set, track old values of columns for CRUD entries. + /// + /// See [TrackPreviousValues] for details. + pub track_previous_values: Option, + /// Whether an `UPDATE` statement that doesn't change any values should be ignored when creating + /// CRUD entries. + pub ignore_empty_updates: bool, +} + +impl TableOptions { + fn validate(&self) -> Result<(), PowerSyncError> { + if self.local_only && self.track_metadata { + return Err(PowerSyncError::argument_error( + "Can't track metadata for local-only tables", + )); + } + + if self.local_only && self.track_previous_values.is_some() { + return Err(PowerSyncError::argument_error( + "Can't track old values for local-only tables", + )); + } + + Ok(()) + } +} + +impl Serialize for TableOptions { fn serialize(&self, serializer: S) -> Result where - S: serde::Serializer, + S: Serializer, { - let mut serializer = serializer.serialize_struct("Table", 10)?; - serializer.serialize_field("name", &self.name)?; - serializer.serialize_field("columns", &self.columns)?; - serializer.serialize_field("indexes", &self.indexes)?; + let mut serializer = serializer.serialize_struct( + "TableOptions", + 4 + if self.track_previous_values.is_some() { + 2 + } else { + 0 + }, + )?; + serializer.serialize_field("local_only", &self.local_only)?; serializer.serialize_field("insert_only", &self.insert_only)?; - serializer.serialize_field("view_name", &self.view_name_override)?; serializer.serialize_field("ignore_empty_update", &self.ignore_empty_updates)?; serializer.serialize_field("include_metadata", &self.track_metadata)?; @@ -199,8 +219,8 @@ impl Serialize for Table { &include_old.only_when_changed, )?; } else { - serializer.serialize_field("include_old_include_oldonly_when_changed", &false)?; - serializer.serialize_field("include_old_only_when_changed", &false)?; + serializer.skip_field("include_old")?; + serializer.skip_field("include_old_only_when_changed")?; } serializer.end() @@ -261,13 +281,144 @@ pub struct IndexedColumn { pub type_name: SchemaString, } +/// A raw table, defined by the user instead of being managed by PowerSync. +/// +/// Any ordinary SQLite table can be defined as a raw table, which enables: +/// +/// - More performant queries, since data is stored in typed rows instead of the schemaless JSON +/// view PowerSync uses by default. +/// - More control over the table, since custom column constraints can be used in its definition. +/// +/// By default, the PowerSync client will infer the schema of raw tables and use that to generate +/// `UPSERT` and `DELETE` statements to forward writes from the backend database to SQLite. This +/// requires [Self::schema] to be set. +/// These statements can be customized by providing [Self::put] and [Self::delete] statements. +/// +/// When using raw tables, you are responsible for creating and migrating them when they've changed. +/// Further, triggers are necessary to collect local writes to those tables. For more information, +/// see [the documentation](https://docs.powersync.com/client-sdks/advanced/raw-tables). #[derive(Serialize, Debug)] pub struct RawTable { + /// The name of the table as used by the sync service. + /// + /// This doesn't necessarily have to match the name of the SQLite table that [put] and [delete] + /// write to. Instead, it's used by the sync client to identify which statements to use when it + /// encounters sync operations for this table. pub name: SchemaString, - pub put: PendingStatement, - pub delete: PendingStatement, + + /// An optional schema containing the name of the raw table in the local schema. + /// + /// If this is set, [Self::put] and [Self::delete] can be omitted because these statements can + /// be inferred from the schema. + #[serde(flatten)] + pub schema: Option, + + /// A statement responsible for inserting or updating a row in this raw table based on data from + /// the sync service. + /// + /// By default, the client generates an `INSERT` statement with an upsert clause for all columns + /// in the table. + /// + /// See [PendingStatement] for details. + pub put: Option, + + /// A statement responsible for deleting a row based on its PowerSync id. + /// + /// By default, the client generates the statement `DELETE FROM $local_table WHERE id = ?`. + /// + /// See [PendingStatement] for details. Note that [PendingStatementValue]s used here must all be + /// [PendingStatementValue::Id]. + pub delete: Option, + + /// An optional statement to run when the `powersync_clear` SQL function is called. + pub clear: Option, } +impl RawTable { + /// Creates a [RawTable] where statements used to sync rows into the table are inferred from + /// the columns of the table. + pub fn with_schema(name: impl Into, schema: RawTableSchema) -> Self { + Self { + name: name.into(), + schema: Some(schema), + put: None, + delete: None, + clear: None, + } + } + + /// Creates a [RawTable] with explicit put and delete statements to use. + pub fn with_statements( + name: impl Into, + put: PendingStatement, + delete: PendingStatement, + ) -> Self { + Self { + name: name.into(), + schema: None, + put: Some(put), + delete: Some(delete), + clear: None, + } + } + + fn validate(&self) -> Result<(), PowerSyncError> { + if let Some(schema) = &self.schema { + schema.options.validate()?; + } else { + // If we don't have a schema, statements need to be given. + if self.put.is_none() || self.delete.is_none() { + return Err(PowerSyncError::argument_error( + "Raw tables without a schema need to provide put and delete statements.", + )); + } + } + + Ok(()) + } +} + +/// Information about the schema of a [RawTable] in the local database. +/// +/// This information is optional when declaring raw tables. However, providing it allows the sync +/// client to infer [RawTable::put] and [RawTable::delete] statements automatically. +#[derive(Serialize, Debug)] +pub struct RawTableSchema { + /// The actual name of the raw table in the local schema. + /// + /// This is used to infer statements for the sync client. It can also be used to auto-generate + /// triggers forwarding writes on raw tables into the CRUD upload queue. + pub table_name: SchemaString, + /// An optional filter of columns that should be synced. + /// + /// By default, all columns in a raw table are considered to be synced. If a filter is + /// specified, PowerSync treats unmatched columns as _local-only_ and will not attempt to sync + /// them. + pub synced_columns: Option>, + + /// Common options affecting how the `powersync_create_raw_table_crud_trigger` SQL function + /// generates triggers. + #[serde(flatten)] + pub options: TableOptions, +} + +impl RawTableSchema { + pub fn new(table_name: impl Into) -> Self { + Self { + table_name: table_name.into(), + synced_columns: None, + options: Default::default(), + } + } +} + +/// An SQL statement to be run by the sync client against raw tables. +/// +/// Since raw tables are managed by the user, PowerSync can't know how to apply serverside changes +/// to them. These statements bridge raw tables and PowerSync by providing upserts and delete +/// statements. +/// +/// For more information, see [the documentation](https://docs.powersync.com/client-sdks/advanced/raw-tables). #[derive(Serialize, Debug)] pub struct PendingStatement { pub sql: SchemaString, @@ -275,10 +426,20 @@ pub struct PendingStatement { pub params: Vec, } +/// A description of a value that will be resolved in the sync client when running a +/// [PendingStatement] for a [RawTable]. #[derive(Serialize, Debug)] pub enum PendingStatementValue { + /// A value that is bound to the textual id used in the PowerSync protocol. Id, + + /// A value that is bound to the value of a column in a replace (`PUT`) + /// operation of the PowerSync protocol. Column(SchemaString), + + /// A value that is bound to a JSON object containing all columns from the synced row that + /// haven't been matched by a [Self::Column] value. + Rest, } /// Options to include old values in CRUD entries for update statements. @@ -301,12 +462,13 @@ impl TrackPreviousValues { #[cfg(test)] mod test { - use crate::schema::{Column, Table, TrackPreviousValues}; + use crate::schema::{Column, RawTable, RawTableSchema, Table, TrackPreviousValues}; + use serde_json::json; #[test] fn handles_options_track_metadata() { let value = serde_json::to_value(Table::create("foo", vec![], |tbl| { - tbl.track_metadata = true + tbl.options.track_metadata = true })) .unwrap(); @@ -324,7 +486,7 @@ mod test { #[test] fn handles_options_ignore_empty_updates() { let value = serde_json::to_value(Table::create("foo", vec![], |tbl| { - tbl.ignore_empty_updates = true + tbl.options.ignore_empty_updates = true })) .unwrap(); @@ -342,7 +504,7 @@ mod test { #[test] fn handles_options_track_previous_all() { let value = serde_json::to_value(Table::create("foo", vec![], |tbl| { - tbl.track_previous_values = Some(TrackPreviousValues::all()) + tbl.options.track_previous_values = Some(TrackPreviousValues::all()) })) .unwrap(); let value = value.as_object().unwrap(); @@ -360,18 +522,19 @@ mod test { #[test] fn handles_options_track_previous_column_filter() { let value = serde_json::to_value(Table::create("foo", vec![], |tbl| { - tbl.track_previous_values = Some(TrackPreviousValues::all()) + tbl.options.track_previous_values = Some(TrackPreviousValues { + column_filter: Some(vec!["a".into()]), + only_when_changed: true, + }) })) .unwrap(); let value = value.as_object().unwrap(); - assert!(value.get("include_old").unwrap().as_bool().unwrap()); - assert!( - !value - .get("include_old_only_when_changed") - .unwrap() - .as_bool() - .unwrap(), + let include_old = value.get("include_old").unwrap(); + assert_eq!(*include_old, json!(["a"])); + assert_eq!( + *value.get("include_old_only_when_changed").unwrap(), + json!(true) ); } @@ -392,4 +555,12 @@ mod test { table.columns.push(Column::integer("a")); assert!(table.validate().is_err()); } + + #[test] + fn invalid_raw_table_missing_statements() { + let mut table = RawTable::with_schema("users", RawTableSchema::new("users")); + table.schema = None; + + assert!(table.validate().is_err()); + } } diff --git a/powersync/tests/crud_test.rs b/powersync/tests/crud_test.rs index 37e916b..7d2b9dd 100644 --- a/powersync/tests/crud_test.rs +++ b/powersync/tests/crud_test.rs @@ -1,6 +1,9 @@ use futures_lite::{StreamExt, future}; use powersync::PowerSyncDatabase; -use powersync::schema::{Column, Schema, Table, TrackPreviousValues}; +use powersync::schema::{ + Column, PendingStatement, PendingStatementValue, RawTable, RawTableSchema, Schema, Table, + TrackPreviousValues, +}; use powersync_test_utils::{DatabaseTest, execute, query_all}; use rusqlite::params; use serde_json::{Value, json}; @@ -14,7 +17,7 @@ fn include_metadata() { schema .tables .push(Table::create("lists", vec![Column::text("name")], |tbl| { - tbl.track_metadata = true + tbl.options.track_metadata = true })); schema }); @@ -43,7 +46,7 @@ fn include_old_values() { schema.tables.push(Table::create( "lists", vec![Column::text("name"), Column::text("content")], - |tbl| tbl.track_previous_values = Some(TrackPreviousValues::all()), + |tbl| tbl.options.track_previous_values = Some(TrackPreviousValues::all()), )); schema }); @@ -80,7 +83,7 @@ fn include_old_values_with_filter() { "lists", vec![Column::text("name"), Column::text("content")], |tbl| { - tbl.track_previous_values = Some(TrackPreviousValues { + tbl.options.track_previous_values = Some(TrackPreviousValues { column_filter: Some(vec!["name".into()]), only_when_changed: false, }) @@ -124,7 +127,7 @@ fn include_old_values_when_changed() { "lists", vec![Column::text("name"), Column::text("content")], |tbl| { - tbl.track_previous_values = Some(TrackPreviousValues { + tbl.options.track_previous_values = Some(TrackPreviousValues { column_filter: None, only_when_changed: true, }) @@ -164,7 +167,7 @@ fn ignore_empty_update() { schema.tables.push(Table::create( "lists", vec![Column::text("name"), Column::text("content")], - |tbl| tbl.ignore_empty_updates = true, + |tbl| tbl.options.ignore_empty_updates = true, )); schema }); @@ -261,3 +264,135 @@ fn crud_transactions() { assert_eq!(remaining.crud.len(), 15); }); } + +#[test] +fn raw_table_clear() { + future::block_on(async move { + let mut schema = Schema::default(); + let mut raw_table = RawTable::with_statements( + "foo", + PendingStatement { + sql: "unused".into(), + params: vec![], + }, + PendingStatement { + sql: "unused".into(), + params: vec![], + }, + ); + raw_table.clear = Some("DELETE FROM users".into()); + schema.raw_tables.push(raw_table); + + let test = DatabaseTest::new(); + let db = PowerSyncDatabase::new(test.in_memory(), schema); + + { + let writer = db.writer().await.unwrap(); + writer + .execute("CREATE TABLE users (name TEXT);", params![]) + .unwrap(); + writer + .execute("INSERT INTO users (name) VALUES (?);", params!["test"]) + .unwrap(); + } + + assert_eq!( + query_all(&db, "SELECT * FROM users", params![]).await, + json!([{"name": "test"}]), + ); + + // Running powersync_clear should delete from users + { + let writer = db.writer().await.unwrap(); + let mut stmt = writer.prepare("SELECT powersync_clear(0)").unwrap(); + stmt.query_one(params![], |_| Ok(())).unwrap(); + } + + assert_eq!( + query_all(&db, "SELECT * FROM users", params![]).await, + json!([]), + ); + }); +} + +#[test] +fn raw_table_crud_trigger() { + future::block_on(async move { + let mut schema = Schema::default(); + + schema.raw_tables.push(RawTable::with_schema("foo", { + let mut info = RawTableSchema::new("users"); + info.synced_columns = Some(vec!["name".into()]); + info + })); + let serialized_table = serde_json::to_string(&schema.raw_tables[0]).unwrap(); + + let test = DatabaseTest::new(); + let db = PowerSyncDatabase::new(test.in_memory(), Default::default()); + + { + let mut writer = db.writer().await.unwrap(); + writer + .execute( + "CREATE TABLE users (id TEXT, name TEXT, local_column INTEGER);", + params![], + ) + .unwrap(); + + let mut trigger_stmt = writer + .prepare("SELECT powersync_create_raw_table_crud_trigger(?,?,?)") + .unwrap(); + + for write in &["INSERT", "UPDATE", "DELETE"] { + trigger_stmt + .query_one( + params![serialized_table, format!("users_{write}"), write], + |_| Ok(()), + ) + .unwrap(); + } + drop(trigger_stmt); + + let tx = writer.transaction().unwrap(); + tx.execute_batch( + "\ +INSERT INTO users(id, name, local_column) VALUES ('id', 'name', 42); +UPDATE users SET local_column = local_column + 1; -- should not create a ps_crud entry +DELETE FROM users; +", + ) + .unwrap(); + tx.commit().unwrap(); + } + + let crud = db.next_crud_transaction().await.unwrap().unwrap(); + assert_eq!(crud.crud.len(), 2); + }); +} + +#[test] +fn raw_table_rest_column() { + future::block_on(async move { + let mut schema = Schema::default(); + let raw_table = RawTable::with_statements( + "foo", + PendingStatement { + sql: "INSERT INTO users (name) VALUES ?".into(), + params: vec![PendingStatementValue::Rest], + }, + PendingStatement { + sql: "unused".into(), + params: vec![], + }, + ); + schema.raw_tables.push(raw_table); + + let test = DatabaseTest::new(); + let db = PowerSyncDatabase::new(test.in_memory(), schema); + + // Just use the database to ensure the schema with the rest column has been installed. + // We don't test the behavior of that here, the core extension has tests for that. This + // verifies we generate a JSON structure understood by the core extension. + db.reader().await.unwrap(); + }); +}