diff --git a/.changeset/slimy-mails-move.md b/.changeset/slimy-mails-move.md new file mode 100644 index 000000000..30cc4830b --- /dev/null +++ b/.changeset/slimy-mails-move.md @@ -0,0 +1,10 @@ +--- +'@powersync/react-native': minor +'@powersync/common': minor +'@powersync/web': minor +'@powersync/op-sqlite': patch +'@powersync/capacitor': patch +'@powersync/node': patch +--- + +Share common db adapter implementation logic. diff --git a/packages/adapter-sql-js/src/SQLJSAdapter.ts b/packages/adapter-sql-js/src/SQLJSAdapter.ts index 4d35ee9f7..5c4dddbab 100644 --- a/packages/adapter-sql-js/src/SQLJSAdapter.ts +++ b/packages/adapter-sql-js/src/SQLJSAdapter.ts @@ -2,14 +2,18 @@ import { BaseListener, BaseObserver, BatchedUpdateNotification, + ConnectionPool, ControlledExecutor, createLogger, DBAdapter, + DBAdapterDefaultMixin, DBAdapterListener, + DBGetUtilsDefaultMixin, DBLockOptions, ILogger, LockContext, QueryResult, + SqlExecutor, SQLOpenFactory, SQLOpenOptions, Transaction @@ -56,7 +60,7 @@ interface TableObserverListener extends BaseListener { } class TableObserver extends BaseObserver {} -export class SQLJSDBAdapter extends BaseObserver implements DBAdapter { +class SqlJsConnectionPool extends BaseObserver implements ConnectionPool { protected initPromise: Promise; protected _db: SQLJs.Database | null; protected tableUpdateCache: Set; @@ -136,112 +140,6 @@ export class SQLJSDBAdapter extends BaseObserver implements D db.close(); } - protected generateLockContext(): LockContext { - const execute = async (query: string, params?: any[]): Promise => { - const db = await this.getDB(); - const statement = db.prepare(query); - const rawResults: any[][] = []; - let columnNames: string[] | null = null; - try { - if (params) { - statement.bind(params); - } - while (statement.step()) { - if (!columnNames) { - columnNames = statement.getColumnNames(); - } - rawResults.push(statement.get()); - } - - const rows = rawResults.map((row) => { - return Object.fromEntries(row.map((value, index) => [columnNames![index], value])); - }); - return { - // `lastInsertId` is not available in the original version of SQL.js or its types, but it's available in the fork we use. - insertId: (db as any).lastInsertId(), - rowsAffected: db.getRowsModified(), - rows: { - _array: rows, - length: rows.length, - item: (idx: number) => rows[idx] - } - }; - } finally { - statement.free(); - } - }; - - const getAll = async (query: string, params?: any[]): Promise => { - const result = await execute(query, params); - return result.rows?._array ?? ([] as T[]); - }; - - const getOptional = async (query: string, params?: any[]): Promise => { - const results = await getAll(query, params); - return results.length > 0 ? results[0] : null; - }; - - const get = async (query: string, params?: any[]): Promise => { - const result = await getOptional(query, params); - if (!result) { - throw new Error(`No results for query: ${query}`); - } - return result; - }; - - const executeRaw = async (query: string, params?: any[]): Promise => { - const db = await this.getDB(); - const statement = db.prepare(query); - const rawResults: any[][] = []; - try { - if (params) { - statement.bind(params); - } - while (statement.step()) { - rawResults.push(statement.get()); - } - return rawResults; - } finally { - statement.free(); - } - }; - - return { - getAll, - getOptional, - get, - executeRaw, - execute - }; - } - - execute(query: string, params?: any[]): Promise { - return this.writeLock((tx) => tx.execute(query, params)); - } - - executeRaw(query: string, params?: any[]): Promise { - return this.writeLock((tx) => tx.executeRaw(query, params)); - } - - async executeBatch(query: string, params: any[][] = []): Promise { - let totalRowsAffected = 0; - const db = await this.getDB(); - - const stmt = db.prepare(query); - try { - for (const paramSet of params) { - stmt.run(paramSet); - totalRowsAffected += db.getRowsModified(); - } - - return { - rowsAffected: totalRowsAffected - }; - } finally { - stmt.free(); - } - } - /** * We're not using separate read/write locks here because we can't implement connection pools on top of SQL.js. */ @@ -249,16 +147,10 @@ export class SQLJSDBAdapter extends BaseObserver implements D return this.writeLock(fn, options); } - readTransaction(fn: (tx: Transaction) => Promise, options?: DBLockOptions): Promise { - return this.readLock(async (ctx) => { - return this.internalTransaction(ctx, fn); - }); - } - writeLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions): Promise { return this.mutex.runExclusive(async () => { const db = await this.getDB(); - const result = await fn(this.generateLockContext()); + const result = await fn(new SqlJsLockContext(db)); // No point to schedule a write if there's no persister. if (this.options.persister) { @@ -276,61 +168,85 @@ export class SQLJSDBAdapter extends BaseObserver implements D }); } - writeTransaction(fn: (tx: Transaction) => Promise, options?: DBLockOptions): Promise { - return this.writeLock(async (ctx) => { - return this.internalTransaction(ctx, fn); - }); - } - - refreshSchema(): Promise { - return this.get("PRAGMA table_info('sqlite_master')"); + async refreshSchema(): Promise { + await this.writeLock((ctx) => ctx.get("PRAGMA table_info('sqlite_master')")); } +} - getAll(sql: string, parameters?: any[]): Promise { - return this.readLock((tx) => tx.getAll(sql, parameters)); - } +class SqlJsExecutor implements SqlExecutor { + constructor(readonly db: SQLJs.Database) {} - getOptional(sql: string, parameters?: any[]): Promise { - return this.readLock((tx) => tx.getOptional(sql, parameters)); - } + async execute(query: string, params?: any[]): Promise { + const db = this.db; + const statement = db.prepare(query); + const rawResults: any[][] = []; + let columnNames: string[] | null = null; + try { + if (params) { + statement.bind(params); + } + while (statement.step()) { + if (!columnNames) { + columnNames = statement.getColumnNames(); + } + rawResults.push(statement.get()); + } - get(sql: string, parameters?: any[]): Promise { - return this.readLock((tx) => tx.get(sql, parameters)); + const rows = rawResults.map((row) => { + return Object.fromEntries(row.map((value, index) => [columnNames![index], value])); + }); + return { + // `lastInsertId` is not available in the original version of SQL.js or its types, but it's available in the fork we use. + insertId: (db as any).lastInsertId(), + rowsAffected: db.getRowsModified(), + rows: { + _array: rows, + length: rows.length, + item: (idx: number) => rows[idx] + } + }; + } finally { + statement.free(); + } } - protected async internalTransaction(ctx: LockContext, fn: (tx: Transaction) => Promise): Promise { - let finalized = false; - const commit = async (): Promise => { - if (finalized) { - return { rowsAffected: 0 }; + async executeRaw(query: string, params?: any[]): Promise { + const db = this.db; + const statement = db.prepare(query); + const rawResults: any[][] = []; + try { + if (params) { + statement.bind(params); } - finalized = true; - return ctx.execute('COMMIT'); - }; - const rollback = async (): Promise => { - if (finalized) { - return { rowsAffected: 0 }; + while (statement.step()) { + rawResults.push(statement.get()); } - finalized = true; - return ctx.execute('ROLLBACK'); - }; + return rawResults; + } finally { + statement.free(); + } + } + + async executeBatch(query: string, params: any[][] = []): Promise { + let totalRowsAffected = 0; + const db = this.db; + + const stmt = db.prepare(query); try { - await ctx.execute('BEGIN'); - const result = await fn({ - ...ctx, - commit, - rollback - }); - await commit(); - return result; - } catch (ex) { - try { - await rollback(); - } catch (ex2) { - // In rare cases, a rollback may fail. - // Safe to ignore. + for (const paramSet of params) { + stmt.run(paramSet); + totalRowsAffected += db.getRowsModified(); } - throw ex; + + return { + rowsAffected: totalRowsAffected + }; + } finally { + stmt.free(); } } } + +class SqlJsLockContext extends DBGetUtilsDefaultMixin(SqlJsExecutor) implements LockContext {} + +export class SQLJSDBAdapter extends DBAdapterDefaultMixin(SqlJsConnectionPool) implements DBAdapter {} diff --git a/packages/capacitor/src/adapter/CapacitorSQLiteAdapter.ts b/packages/capacitor/src/adapter/CapacitorSQLiteAdapter.ts index 2244e8e81..3faf90a7e 100644 --- a/packages/capacitor/src/adapter/CapacitorSQLiteAdapter.ts +++ b/packages/capacitor/src/adapter/CapacitorSQLiteAdapter.ts @@ -4,7 +4,9 @@ import { Capacitor } from '@capacitor/core'; import { BaseObserver, BatchedUpdateNotification, + ConnectionPool, DBAdapter, + DBAdapterDefaultMixin, DBAdapterListener, DBLockOptions, LockContext, @@ -30,13 +32,8 @@ async function monitorQuery(sql: string, executor: () => Promise): throw e; } } -/** - * An implementation of {@link DBAdapter} using the Capacitor Community SQLite [plugin](https://github.com/capacitor-community/sqlite). - * - * @experimental - * @alpha This is currently experimental and may change without a major version bump. - */ -export class CapacitorSQLiteAdapter extends BaseObserver implements DBAdapter { + +class CapacitorConnectionPool extends BaseObserver implements ConnectionPool { protected _writeConnection: SQLiteDBConnection | null; protected _readConnection: SQLiteDBConnection | null; protected initializedPromise: Promise; @@ -206,26 +203,8 @@ export class CapacitorSQLiteAdapter extends BaseObserver impl return results.rows?._array.map((row) => Object.values(row)) ?? []; }; - return { - getAll, - getOptional, - get, - executeRaw, - execute - }; - } - - execute(query: string, params?: any[]): Promise { - return this.writeLock((tx) => tx.execute(query, params)); - } - - executeRaw(query: string, params?: any[]): Promise { - return this.writeLock((tx) => tx.executeRaw(query, params)); - } - - async executeBatch(query: string, params: any[][] = []): Promise { - return this.writeLock(async (tx) => { - let result = await this.writeConnection.executeSet( + const executeBatch = async (query: string, params: any[][] = []): Promise => { + let result = await db.executeSet( params.map((param) => ({ statement: query, values: param @@ -236,7 +215,16 @@ export class CapacitorSQLiteAdapter extends BaseObserver impl rowsAffected: result.changes?.changes ?? 0, insertId: result.changes?.lastId }; - }); + }; + + return { + getAll, + getOptional, + get, + executeRaw, + execute, + executeBatch + }; } readLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions): Promise { @@ -250,12 +238,6 @@ export class CapacitorSQLiteAdapter extends BaseObserver impl ); } - readTransaction(fn: (tx: Transaction) => Promise, options?: DBLockOptions): Promise { - return this.readLock(async (ctx) => { - return this.internalTransaction(ctx, fn); - }); - } - writeLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions): Promise { return mutexRunExclusive( this.writeMutex, @@ -281,12 +263,6 @@ export class CapacitorSQLiteAdapter extends BaseObserver impl ); } - writeTransaction(fn: (tx: Transaction) => Promise, options?: DBLockOptions): Promise { - return this.writeLock(async (ctx) => { - return this.internalTransaction(ctx, fn); - }); - } - refreshSchema(): Promise { return this.writeLock(async (writeTx) => { return this.readLock(async (readTx) => { @@ -296,52 +272,12 @@ export class CapacitorSQLiteAdapter extends BaseObserver impl }); }); } - - getAll(sql: string, parameters?: any[]): Promise { - return this.readLock((tx) => tx.getAll(sql, parameters)); - } - - getOptional(sql: string, parameters?: any[]): Promise { - return this.readLock((tx) => tx.getOptional(sql, parameters)); - } - - get(sql: string, parameters?: any[]): Promise { - return this.readLock((tx) => tx.get(sql, parameters)); - } - - protected async internalTransaction(ctx: LockContext, fn: (tx: Transaction) => Promise): Promise { - let finalized = false; - const commit = async (): Promise => { - if (finalized) { - return { rowsAffected: 0 }; - } - finalized = true; - return ctx.execute('COMMIT'); - }; - const rollback = async (): Promise => { - if (finalized) { - return { rowsAffected: 0 }; - } - finalized = true; - return ctx.execute('ROLLBACK'); - }; - try { - await ctx.execute('BEGIN'); - const result = await fn({ - ...ctx, - commit, - rollback - }); - await commit(); - return result; - } catch (ex) { - try { - await rollback(); - } catch (ex2) { - // In rare cases, a rollback may fail. - // Safe to ignore. - } - throw ex; - } - } } + +/** + * An implementation of {@link DBAdapter} using the Capacitor Community SQLite [plugin](https://github.com/capacitor-community/sqlite). + * + * @experimental + * @alpha This is currently experimental and may change without a major version bump. + */ +export class CapacitorSQLiteAdapter extends DBAdapterDefaultMixin(CapacitorConnectionPool) {} diff --git a/packages/common/src/db/DBAdapter.ts b/packages/common/src/db/DBAdapter.ts index 580603c4a..c9934bed8 100644 --- a/packages/common/src/db/DBAdapter.ts +++ b/packages/common/src/db/DBAdapter.ts @@ -41,7 +41,7 @@ export interface DBGetUtils { get(sql: string, parameters?: any[]): Promise; } -export interface LockContext extends DBGetUtils { +export interface SqlExecutor { /** Execute a single write statement. */ execute: (query: string, params?: any[] | undefined) => Promise; /** @@ -59,6 +59,61 @@ export interface LockContext extends DBGetUtils { * ```[ { id: '33', name: 'list 1', content: 'Post content', list_id: '1' } ]``` */ executeRaw: (query: string, params?: any[] | undefined) => Promise; + + executeBatch: (query: string, params?: any[][]) => Promise; +} + +export interface LockContext extends SqlExecutor, DBGetUtils {} + +/** + * Implements {@link DBGetUtils} on a {@link SqlRunner}. + */ +export function DBGetUtilsDefaultMixin Omit>( + Base: TBase +) { + return class extends Base implements DBGetUtils, SqlExecutor { + async getAll(sql: string, parameters?: any[]): Promise { + const res = await this.execute(sql, parameters); + return res.rows?._array ?? []; + } + + async getOptional(sql: string, parameters?: any[]): Promise { + const res = await this.execute(sql, parameters); + return res.rows?.item(0) ?? null; + } + + async get(sql: string, parameters?: any[]): Promise { + const res = await this.execute(sql, parameters); + const first = res.rows?.item(0); + if (!first) { + throw new Error('Result set is empty'); + } + return first; + } + + async executeBatch(query: string, params: any[][] = []): Promise { + // If this context can run batch statements natively, use that. + // @ts-ignore + if (super.executeBatch) { + // @ts-ignore + return super.executeBatch(query, params); + } + + // Emulate executeBatch by running statements individually. + let lastInsertId: number | undefined; + let rowsAffected = 0; + for (const set of params) { + const result = await this.execute(query, set); + lastInsertId = result.insertId; + rowsAffected += result.rowsAffected; + } + + return { + rowsAffected, + insertId: lastInsertId + }; + } + }; } export interface Transaction extends LockContext { @@ -107,22 +162,119 @@ export interface DBLockOptions { timeoutMs?: number; } -export interface DBAdapter extends BaseObserverInterface, DBGetUtils { - close: () => void | Promise; - execute: (query: string, params?: any[]) => Promise; - executeRaw: (query: string, params?: any[]) => Promise; - executeBatch: (query: string, params?: any[][]) => Promise; +export interface ConnectionPool extends BaseObserverInterface { name: string; + close: () => void | Promise; readLock: (fn: (tx: LockContext) => Promise, options?: DBLockOptions) => Promise; - readTransaction: (fn: (tx: Transaction) => Promise, options?: DBLockOptions) => Promise; writeLock: (fn: (tx: LockContext) => Promise, options?: DBLockOptions) => Promise; - writeTransaction: (fn: (tx: Transaction) => Promise, options?: DBLockOptions) => Promise; + /** * This method refreshes the schema information across all connections. This is for advanced use cases, and should generally not be needed. */ refreshSchema: () => Promise; } +export interface DBAdapter extends ConnectionPool, SqlExecutor, DBGetUtils { + readTransaction: (fn: (tx: Transaction) => Promise, options?: DBLockOptions) => Promise; + writeTransaction: (fn: (tx: Transaction) => Promise, options?: DBLockOptions) => Promise; +} + +/** + * A mixin to implement {@link DBAdapter} by delegating to {@link ConnectionPool.readLock} and + * {@link ConnectionPool.writeLock}. + */ +export function DBAdapterDefaultMixin ConnectionPool>(Base: TBase) { + return class extends Base implements DBAdapter { + readTransaction(fn: (tx: Transaction) => Promise, options?: DBLockOptions): Promise { + return this.readLock((ctx) => TransactionImplementation.runWith(ctx, fn), options); + } + + writeTransaction(fn: (tx: Transaction) => Promise, options?: DBLockOptions): Promise { + return this.writeLock((ctx) => TransactionImplementation.runWith(ctx, fn), options); + } + + getAll(sql: string, parameters?: any[]): Promise { + return this.readLock((ctx) => ctx.getAll(sql, parameters)); + } + + getOptional(sql: string, parameters?: any[]): Promise { + return this.readLock((ctx) => ctx.getOptional(sql, parameters)); + } + + get(sql: string, parameters?: any[]): Promise { + return this.readLock((ctx) => ctx.get(sql, parameters)); + } + + execute(query: string, params?: any[]): Promise { + return this.writeLock((ctx) => ctx.execute(query, params)); + } + + executeRaw(query: string, params?: any[]): Promise { + return this.writeLock((ctx) => ctx.executeRaw(query, params)); + } + + executeBatch(query: string, params?: any[][]): Promise { + return this.writeTransaction((tx) => tx.executeBatch(query, params)); + } + }; +} + +class BaseTransaction implements SqlExecutor { + private finalized = false; + + constructor(private inner: SqlExecutor) {} + + async commit(): Promise { + if (this.finalized) { + return { rowsAffected: 0 }; + } + this.finalized = true; + return this.inner.execute('COMMIT'); + } + + async rollback(): Promise { + if (this.finalized) { + return { rowsAffected: 0 }; + } + this.finalized = true; + return this.inner.execute('ROLLBACK'); + } + + execute(query: string, params?: any[] | undefined): Promise { + return this.inner.execute(query, params); + } + + executeRaw(query: string, params?: any[] | undefined): Promise { + return this.inner.executeRaw(query, params); + } + + executeBatch(query: string, params?: any[][]): Promise { + return this.inner.executeBatch(query, params); + } +} + +class TransactionImplementation extends DBGetUtilsDefaultMixin(BaseTransaction) { + static async runWith(ctx: LockContext, fn: (tx: Transaction) => Promise): Promise { + let tx = new TransactionImplementation(ctx); + + try { + await ctx.execute('BEGIN IMMEDIATE'); + + const result = await fn(tx); + await tx.commit(); + return result; + } catch (ex) { + try { + await tx.rollback(); + } catch (ex2) { + // In rare cases, a rollback may fail. + // Safe to ignore. + } + throw ex; + } + } +} + export function isBatchedUpdateNotification( update: BatchedUpdateNotification | UpdateNotification ): update is BatchedUpdateNotification { diff --git a/packages/node/src/db/PowerSyncDatabase.ts b/packages/node/src/db/PowerSyncDatabase.ts index 25f347c15..d184741ee 100644 --- a/packages/node/src/db/PowerSyncDatabase.ts +++ b/packages/node/src/db/PowerSyncDatabase.ts @@ -17,7 +17,7 @@ import { import { NodeCustomConnectionOptions, NodeRemote } from '../sync/stream/NodeRemote.js'; import { NodeStreamingSyncImplementation } from '../sync/stream/NodeStreamingSyncImplementation.js'; -import { WorkerConnectionPool } from './WorkerConnectionPool.js'; +import { WorkerPoolDatabaseAdapter } from './WorkerConnectionPool.js'; import { NodeSQLOpenOptions } from './options.js'; export type NodePowerSyncDatabaseOptions = PowerSyncDatabaseOptions & { @@ -54,14 +54,14 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { } async _initialize(): Promise { - await (this.database as WorkerConnectionPool).initialize(); + await (this.database as WorkerPoolDatabaseAdapter).initialize(); } /** * Opens a DBAdapter using better-sqlite3 as the default SQLite open factory. */ protected openDBAdapter(options: PowerSyncDatabaseOptionsWithSettings): DBAdapter { - return new WorkerConnectionPool(options.database); + return new WorkerPoolDatabaseAdapter(options.database); } protected generateBucketStorageAdapter(): BucketStorageAdapter { diff --git a/packages/node/src/db/RemoteConnection.ts b/packages/node/src/db/RemoteConnection.ts index 05c9503fe..f440e5862 100644 --- a/packages/node/src/db/RemoteConnection.ts +++ b/packages/node/src/db/RemoteConnection.ts @@ -1,4 +1,10 @@ -import { ConnectionClosedError, LockContext, QueryResult } from '@powersync/common'; +import { + ConnectionClosedError, + DBGetUtilsDefaultMixin, + QueryResult, + SqlExecutor, + LockContext +} from '@powersync/common'; import { releaseProxy, Remote } from 'comlink'; import { Worker } from 'node:worker_threads'; import { AsyncDatabase, AsyncDatabaseOpener, ProxiedQueryResult } from './AsyncDatabase.js'; @@ -6,7 +12,7 @@ import { AsyncDatabase, AsyncDatabaseOpener, ProxiedQueryResult } from './AsyncD /** * A PowerSync database connection implemented with RPC calls to a background worker. */ -export class RemoteConnection implements LockContext { +class BaseRemoteConnection implements SqlExecutor { isBusy = false; private readonly worker: Worker; @@ -65,14 +71,14 @@ export class RemoteConnection implements LockContext { executeBatch(query: string, params: any[][] = []): Promise { return this.withRemote(async () => { const result = await this.database.executeBatch(query, params ?? []); - return RemoteConnection.wrapQueryResult(result); + return BaseRemoteConnection.wrapQueryResult(result); }); } execute(query: string, params?: any[] | undefined): Promise { return this.withRemote(async () => { const result = await this.database.execute(query, params ?? []); - return RemoteConnection.wrapQueryResult(result); + return BaseRemoteConnection.wrapQueryResult(result); }); } @@ -82,25 +88,6 @@ export class RemoteConnection implements LockContext { }); } - async getAll(sql: string, parameters?: any[]): Promise { - const res = await this.execute(sql, parameters); - return res.rows?._array ?? []; - } - - async getOptional(sql: string, parameters?: any[]): Promise { - const res = await this.execute(sql, parameters); - return res.rows?.item(0) ?? null; - } - - async get(sql: string, parameters?: any[]): Promise { - const res = await this.execute(sql, parameters); - const first = res.rows?.item(0); - if (!first) { - throw new Error('Result set is empty'); - } - return first; - } - async refreshSchema() { await this.execute("pragma table_info('sqlite_master')"); } @@ -127,3 +114,5 @@ export class RemoteConnection implements LockContext { }; } } + +export class RemoteConnection extends DBGetUtilsDefaultMixin(BaseRemoteConnection) implements LockContext {} diff --git a/packages/node/src/db/WorkerConnectionPool.ts b/packages/node/src/db/WorkerConnectionPool.ts index 696c9a70a..b982cd90d 100644 --- a/packages/node/src/db/WorkerConnectionPool.ts +++ b/packages/node/src/db/WorkerConnectionPool.ts @@ -6,7 +6,8 @@ import { Worker } from 'node:worker_threads'; import { BaseObserver, BatchedUpdateNotification, - DBAdapter, + ConnectionPool, + DBAdapterDefaultMixin, DBAdapterListener, DBLockOptions, LockContext, @@ -35,7 +36,7 @@ const defaultDatabaseImplementation: NodeDatabaseImplementation = { /** * Adapter for better-sqlite3 */ -export class WorkerConnectionPool extends BaseObserver implements DBAdapter { +export class WorkerConnectionPool extends BaseObserver implements ConnectionPool { private readonly options: NodeSQLOpenOptions; public readonly name: string; @@ -236,93 +237,11 @@ export class WorkerConnectionPool extends BaseObserver implem })(); } - readTransaction( - fn: (tx: BetterSQLite3Transaction) => Promise, - _options?: DBLockOptions | undefined - ): Promise { - return this.readLock((ctx) => this.internalTransaction(ctx as RemoteConnection, fn)); - } - - writeTransaction( - fn: (tx: BetterSQLite3Transaction) => Promise, - _options?: DBLockOptions | undefined - ): Promise { - return this.writeLock((ctx) => this.internalTransaction(ctx as RemoteConnection, fn)); - } - - private async internalTransaction( - connection: RemoteConnection, - fn: (tx: BetterSQLite3Transaction) => Promise - ): Promise { - let finalized = false; - const commit = async (): Promise => { - if (!finalized) { - finalized = true; - await connection.execute('COMMIT'); - } - return { rowsAffected: 0 }; - }; - const rollback = async (): Promise => { - if (!finalized) { - finalized = true; - await connection.execute('ROLLBACK'); - } - return { rowsAffected: 0 }; - }; - try { - await connection.execute('BEGIN'); - const result = await fn({ - execute: (query, params) => connection.execute(query, params), - executeRaw: (query, params) => connection.executeRaw(query, params), - executeBatch: (query, params) => connection.executeBatch(query, params), - get: (query, params) => connection.get(query, params), - getAll: (query, params) => connection.getAll(query, params), - getOptional: (query, params) => connection.getOptional(query, params), - commit, - rollback - }); - await commit(); - return result; - } catch (ex) { - try { - await rollback(); - } catch (ex2) { - // In rare cases, a rollback may fail. - // Safe to ignore. - } - throw ex; - } - } - - getAll(sql: string, parameters?: any[]): Promise { - return this.readLock((ctx) => ctx.getAll(sql, parameters)); - } - - getOptional(sql: string, parameters?: any[]): Promise { - return this.readLock((ctx) => ctx.getOptional(sql, parameters)); - } - - get(sql: string, parameters?: any[]): Promise { - return this.readLock((ctx) => ctx.get(sql, parameters)); - } - - execute(query: string, params?: any[] | undefined): Promise { - return this.writeLock((ctx) => ctx.execute(query, params)); - } - - executeRaw(query: string, params?: any[] | undefined): Promise { - return this.writeLock((ctx) => ctx.executeRaw(query, params)); - } - - executeBatch(query: string, params?: any[][]): Promise { - return this.writeTransaction((ctx) => ctx.executeBatch(query, params)); - } - async refreshSchema() { await this.writeConnection.refreshSchema(); - for (const readConnection of this.readConnections) { - await readConnection.refreshSchema(); - } + await Promise.all(this.readConnections.map((c) => c.refreshSchema())); } } + +export class WorkerPoolDatabaseAdapter extends DBAdapterDefaultMixin(WorkerConnectionPool) {} diff --git a/packages/node/tests/PowerSyncDatabase.test.ts b/packages/node/tests/PowerSyncDatabase.test.ts index d5cbd21eb..86cfa65c1 100644 --- a/packages/node/tests/PowerSyncDatabase.test.ts +++ b/packages/node/tests/PowerSyncDatabase.test.ts @@ -245,3 +245,10 @@ databaseTest('clear raw tables', async ({ database }) => { await database.disconnectAndClear(); expect(await database.getAll('SELECT * FROM lists')).toHaveLength(0); }); + +databaseTest('execute batch', async ({ database }) => { + await database.execute('CREATE TABLE users (id TEXT NOT NULL PRIMARY KEY, name TEXT)'); + + await database.executeBatch('INSERT INTO users (id, name) VALUES (uuid(), ?)', [['a'], ['b'], ['c']]); + expect(await database.getAll('SELECT * FROM users')).toHaveLength(3); +}); diff --git a/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts b/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts index 5704a48b1..521bf2d33 100644 --- a/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts +++ b/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts @@ -3,8 +3,11 @@ import { BaseObserver, BatchedUpdateNotification, DBAdapterListener, + DBGetUtilsDefaultMixin, + LockContext, QueryResult, RowUpdateType, + SqlExecutor, UpdateNotification } from '@powersync/common'; @@ -19,7 +22,7 @@ export type OPSQLiteUpdateNotification = { rowId: number; }; -export class OPSQLiteConnection extends BaseObserver { +class OPSQLiteExecutor extends BaseObserver implements Omit { protected DB: DB; private updateBuffer: UpdateNotification[]; @@ -101,7 +104,7 @@ export class OPSQLiteConnection extends BaseObserver { return await this.DB.executeRaw(query, params); } - async executeBatch(query: string, params: any[][] = []): Promise { + async executeNativeBatch(query: string, params: any[][] = []): Promise { const tuple: SQLBatchTuple[] = [[query, params[0]]]; params.slice(1).forEach((p) => tuple.push([query, p])); @@ -110,25 +113,9 @@ export class OPSQLiteConnection extends BaseObserver { rowsAffected: result.rowsAffected ?? 0 }; } +} - async getAll(sql: string, parameters?: any[]): Promise { - const result = await this.DB.execute(sql, parameters); - return (result.rows ?? []) as T[]; - } - - async getOptional(sql: string, parameters?: any[]): Promise { - const result = await this.DB.execute(sql, parameters); - return (result.rows?.[0] as T) ?? null; - } - - async get(sql: string, parameters?: any[]): Promise { - const result = await this.getOptional(sql, parameters); - if (!result) { - throw new Error('Result set is empty'); - } - return result as T; - } - +export class OPSQLiteConnection extends DBGetUtilsDefaultMixin(OPSQLiteExecutor) implements LockContext { async refreshSchema() { await this.get("PRAGMA table_info('sqlite_master')"); } diff --git a/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts b/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts index ebfcf077d..993e6f9b6 100644 --- a/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts +++ b/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts @@ -1,7 +1,9 @@ -import { getDylibPath, open, type DB } from '@op-engineering/op-sqlite'; +import { getDylibPath, open, SQLBatchTuple, type DB } from '@op-engineering/op-sqlite'; import { BaseObserver, + ConnectionPool, DBAdapter, + DBAdapterDefaultMixin, DBAdapterListener, DBLockOptions, QueryResult, @@ -24,7 +26,7 @@ export type OPSQLiteAdapterOptions = { const READ_CONNECTIONS = 5; -export class OPSQLiteDBAdapter extends BaseObserver implements DBAdapter { +class OPSQLiteConnectionPool extends BaseObserver implements ConnectionPool { name: string; protected writeMutex: Mutex; @@ -229,81 +231,6 @@ export class OPSQLiteDBAdapter extends BaseObserver implement }); } - readTransaction(fn: (tx: Transaction) => Promise, options?: DBLockOptions): Promise { - return this.readLock((ctx) => this.internalTransaction(ctx, fn)); - } - - writeTransaction(fn: (tx: Transaction) => Promise, options?: DBLockOptions): Promise { - return this.writeLock((ctx) => this.internalTransaction(ctx, fn)); - } - - getAll(sql: string, parameters?: any[]): Promise { - return this.readLock((ctx) => ctx.getAll(sql, parameters)); - } - - getOptional(sql: string, parameters?: any[]): Promise { - return this.readLock((ctx) => ctx.getOptional(sql, parameters)); - } - - get(sql: string, parameters?: any[]): Promise { - return this.readLock((ctx) => ctx.get(sql, parameters)); - } - - execute(query: string, params?: any[]) { - return this.writeLock((ctx) => ctx.execute(query, params)); - } - - executeRaw(query: string, params?: any[]) { - return this.writeLock((ctx) => ctx.executeRaw(query, params)); - } - - async executeBatch(query: string, params: any[][] = []): Promise { - return this.writeLock((ctx) => ctx.executeBatch(query, params)); - } - - protected async internalTransaction( - connection: OPSQLiteConnection, - fn: (tx: Transaction) => Promise - ): Promise { - let finalized = false; - const commit = async (): Promise => { - if (finalized) { - return { rowsAffected: 0 }; - } - finalized = true; - return connection.execute('COMMIT'); - }; - const rollback = async (): Promise => { - if (finalized) { - return { rowsAffected: 0 }; - } - finalized = true; - return connection.execute('ROLLBACK'); - }; - try { - await connection.execute('BEGIN'); - const result = await fn({ - execute: (query, params) => connection.execute(query, params), - executeRaw: (query, params) => connection.executeRaw(query, params), - get: (query, params) => connection.get(query, params), - getAll: (query, params) => connection.getAll(query, params), - getOptional: (query, params) => connection.getOptional(query, params), - commit, - rollback - }); - await commit(); - return result; - } catch (ex) { - try { - await rollback(); - } catch (ex2) { - // In rare cases, a rollback may fail. - // Safe to ignore. - } - throw ex; - } - } - async refreshSchema(): Promise { await this.initialized; await this.writeConnection!.refreshSchema(); @@ -315,3 +242,11 @@ export class OPSQLiteDBAdapter extends BaseObserver implement } } } + +export class OPSQLiteDBAdapter extends DBAdapterDefaultMixin(OPSQLiteConnectionPool) implements DBAdapter { + async executeBatch(query: string, params: any[][] = []): Promise { + return await this.writeLock(async (tx) => { + return await (tx as OPSQLiteConnection).executeNativeBatch(query, params); + }); + } +} diff --git a/packages/react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts b/packages/react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts index 36c05c23b..146cbe6cf 100644 --- a/packages/react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts +++ b/packages/react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts @@ -3,25 +3,17 @@ import { DBAdapter, DBAdapterListener, LockContext as PowerSyncLockContext, - Transaction as PowerSyncTransaction, DBLockOptions, - DBGetUtils, - QueryResult + ConnectionPool, + DBGetUtilsDefaultMixin, + LockContext, + DBAdapterDefaultMixin, + Transaction } from '@powersync/common'; -import type { - QuickSQLiteConnection, - LockContext as RNQSLockContext, - TransactionContext as RNQSTransactionContext -} from '@journeyapps/react-native-quick-sqlite'; - -/** - * Adapter for React Native Quick SQLite - */ -export class RNQSDBAdapter extends BaseObserver implements DBAdapter { - getAll: (sql: string, parameters?: any[]) => Promise; - getOptional: (sql: string, parameters?: any[]) => Promise; - get: (sql: string, parameters?: any[]) => Promise; +import type { QuickSQLiteConnection, LockContext as RNQSLockContext } from '@journeyapps/react-native-quick-sqlite'; +import { QueryResult, SqlExecutor } from '@powersync/common/dist/index.cjs'; +class RNQSConnectionPool extends BaseObserver implements ConnectionPool { constructor( protected baseDB: QuickSQLiteConnection, public name: string @@ -31,15 +23,6 @@ export class RNQSDBAdapter extends BaseObserver implements DB baseDB.registerTablesChangedHook((update) => { this.iterateListeners((cb) => cb.tablesUpdated?.(update)); }); - - const topLevelUtils = this.generateDBHelpers({ - // Arrow function binds `this` for use in readOnlyExecute - execute: (sql: string, params?: any[]) => this.readOnlyExecute(sql, params) - }); - // Only assigning get helpers - this.getAll = topLevelUtils.getAll; - this.getOptional = topLevelUtils.getOptional; - this.get = topLevelUtils.get; } close() { @@ -47,34 +30,46 @@ export class RNQSDBAdapter extends BaseObserver implements DB } readLock(fn: (tx: PowerSyncLockContext) => Promise, options?: DBLockOptions): Promise { - return this.baseDB.readLock((dbTx) => fn(this.generateDBHelpers(this.generateContext(dbTx))), options); + return this.baseDB.readLock((dbTx) => fn(this.generateContext(dbTx)), options); } - readTransaction(fn: (tx: PowerSyncTransaction) => Promise, options?: DBLockOptions): Promise { - return this.baseDB.readTransaction((dbTx) => fn(this.generateDBHelpers(this.generateContext(dbTx))), options); + writeLock(fn: (tx: PowerSyncLockContext) => Promise, options?: DBLockOptions): Promise { + return this.baseDB.writeLock((dbTx) => fn(this.generateContext(dbTx)), options); } - writeLock(fn: (tx: PowerSyncLockContext) => Promise, options?: DBLockOptions): Promise { - return this.baseDB.writeLock((dbTx) => fn(this.generateDBHelpers(this.generateContext(dbTx))), options); + generateContext(ctx: T) { + return new QuickSqliteContext(ctx); } - writeTransaction(fn: (tx: PowerSyncTransaction) => Promise, options?: DBLockOptions): Promise { - return this.baseDB.writeTransaction((dbTx) => fn(this.generateDBHelpers(this.generateContext(dbTx))), options); + async refreshSchema() { + await this.baseDB.refreshSchema(); } +} + +class QuickSqliteExecutor implements Omit { + constructor(readonly context: RNQSLockContext) {} execute(query: string, params?: any[]) { - return this.baseDB.execute(query, params); + return this.context.execute(query, params); } - /** * 'executeRaw' is not implemented in RNQS, this falls back to 'execute'. */ async executeRaw(query: string, params?: any[]): Promise { - const result = await this.baseDB.execute(query, params); + const result = await this.context.execute(query, params); const rows = result.rows?._array ?? []; return rows.map((row) => Object.values(row)); } +} +class QuickSqliteContext extends DBGetUtilsDefaultMixin(QuickSqliteExecutor) implements LockContext {} + +/** + * Adapter for React Native Quick SQLite + */ +export class RNQSDBAdapter extends DBAdapterDefaultMixin(RNQSConnectionPool) implements DBAdapter { + // We don't want the default implementation here, RNQS does not support executeBatch for lock contexts so that would + // be less efficient. async executeBatch(query: string, params: any[][] = []): Promise { const commands: any[] = []; @@ -87,70 +82,4 @@ export class RNQSDBAdapter extends BaseObserver implements DB rowsAffected: result.rowsAffected ? result.rowsAffected : 0 }; } - - generateContext(ctx: T) { - return { - ...ctx, - // 'executeRaw' is not implemented in RNQS, this falls back to 'execute'. - executeRaw: async (sql: string, params?: any[]) => { - const result = await ctx.execute(sql, params); - const rows = result.rows?._array ?? []; - return rows.map((row) => Object.values(row)); - } - }; - } - - /** - * This provides a top-level read only execute method which is executed inside a read-lock. - * This is necessary since the high level `execute` method uses a write-lock under - * the hood. Helper methods such as `get`, `getAll` and `getOptional` are read only, - * and should use this method. - */ - private readOnlyExecute(sql: string, params?: any[]) { - return this.baseDB.readLock((ctx) => ctx.execute(sql, params)); - } - - /** - * Adds DB get utils to lock contexts and transaction contexts - * @param tx - * @returns - */ - private generateDBHelpers Promise }>( - tx: T - ): T & DBGetUtils { - return { - ...tx, - /** - * Execute a read-only query and return results - */ - getAll: async (sql: string, parameters?: any[]): Promise => { - const res = await tx.execute(sql, parameters); - return res.rows?._array ?? []; - }, - - /** - * Execute a read-only query and return the first result, or null if the ResultSet is empty. - */ - getOptional: async (sql: string, parameters?: any[]): Promise => { - const res = await tx.execute(sql, parameters); - return res.rows?.item(0) ?? null; - }, - - /** - * Execute a read-only query and return the first result, error if the ResultSet is empty. - */ - get: async (sql: string, parameters?: any[]): Promise => { - const res = await tx.execute(sql, parameters); - const first = res.rows?.item(0); - if (!first) { - throw new Error('Result set is empty'); - } - return first; - } - }; - } - - async refreshSchema() { - await this.baseDB.refreshSchema(); - } } diff --git a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts index 69600b38b..be3bc1e26 100644 --- a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts +++ b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts @@ -7,6 +7,7 @@ import { DBLockOptions, LockContext, QueryResult, + SqlExecutor, Transaction, createLogger, type ILogger @@ -90,7 +91,8 @@ export class LockedAsyncDatabaseAdapter this.dbGetHelpers = this.generateDBHelpers({ execute: (query, params) => this.acquireLock(() => this._execute(query, params)), - executeRaw: (query, params) => this.acquireLock(() => this._executeRaw(query, params)) + executeRaw: (query, params) => this.acquireLock(() => this._executeRaw(query, params)), + executeBatch: (query, params) => this.acquireLock(() => this._executeBatch(query, params)) }); this.initPromise = this._init(); } @@ -269,19 +271,21 @@ export class LockedAsyncDatabaseAdapter } async readLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions | undefined): Promise { - await this.waitForInitialized(); - return this.acquireLock( - async () => fn(this.generateDBHelpers({ execute: this._execute, executeRaw: this._executeRaw })), - { - timeoutMs: options?.timeoutMs ?? this.options.defaultLockTimeoutMs - } - ); + // Read and write locks are the same because we only have one underlying connection. + return this.writeLock(fn, options); } async writeLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions | undefined): Promise { await this.waitForInitialized(); return this.acquireLock( - async () => fn(this.generateDBHelpers({ execute: this._execute, executeRaw: this._executeRaw })), + async () => + fn( + this.generateDBHelpers({ + execute: this._execute, + executeRaw: this._executeRaw, + executeBatch: this._executeBatch + }) + ), { timeoutMs: options?.timeoutMs ?? this.options.defaultLockTimeoutMs } @@ -370,12 +374,7 @@ export class LockedAsyncDatabaseAdapter return this.writeLock(this.wrapTransaction(fn, true)); } - private generateDBHelpers< - T extends { - execute: (sql: string, params?: any[]) => Promise; - executeRaw: (sql: string, params?: any[]) => Promise; - } - >(tx: T): T & DBGetUtils { + private generateDBHelpers(tx: T): T & LockContext { return { ...tx, /**