Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/slimy-mails-move.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
'@powersync/react-native': minor
'@powersync/common': minor
'@powersync/web': minor
'@powersync/op-sqlite': patch
'@powersync/capacitor': patch
'@powersync/node': patch
---

Share common db adapter implementation logic.
234 changes: 75 additions & 159 deletions packages/adapter-sql-js/src/SQLJSAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ import {
BaseListener,
BaseObserver,
BatchedUpdateNotification,
ConnectionPool,
ControlledExecutor,
createLogger,
DBAdapter,
DBAdapterDefaultMixin,
DBAdapterListener,
DBGetUtilsDefaultMixin,
DBLockOptions,
ILogger,
LockContext,
QueryResult,
SqlExecutor,
SQLOpenFactory,
SQLOpenOptions,
Transaction
Expand Down Expand Up @@ -56,7 +60,7 @@ interface TableObserverListener extends BaseListener {
}
class TableObserver extends BaseObserver<TableObserverListener> {}

export class SQLJSDBAdapter extends BaseObserver<DBAdapterListener> implements DBAdapter {
class SqlJsConnectionPool extends BaseObserver<DBAdapterListener> implements ConnectionPool {
protected initPromise: Promise<SQLJs.Database>;
protected _db: SQLJs.Database | null;
protected tableUpdateCache: Set<string>;
Expand Down Expand Up @@ -136,129 +140,17 @@ export class SQLJSDBAdapter extends BaseObserver<DBAdapterListener> implements D
db.close();
}

protected generateLockContext(): LockContext {
const execute = async (query: string, params?: any[]): Promise<QueryResult> => {
const db = await this.getDB();
const statement = db.prepare(query);
const rawResults: any[][] = [];
let columnNames: string[] | null = null;
try {
if (params) {
statement.bind(params);
}
while (statement.step()) {
if (!columnNames) {
columnNames = statement.getColumnNames();
}
rawResults.push(statement.get());
}

const rows = rawResults.map((row) => {
return Object.fromEntries(row.map((value, index) => [columnNames![index], value]));
});
return {
// `lastInsertId` is not available in the original version of SQL.js or its types, but it's available in the fork we use.
insertId: (db as any).lastInsertId(),
rowsAffected: db.getRowsModified(),
rows: {
_array: rows,
length: rows.length,
item: (idx: number) => rows[idx]
}
};
} finally {
statement.free();
}
};

const getAll = async <T>(query: string, params?: any[]): Promise<T[]> => {
const result = await execute(query, params);
return result.rows?._array ?? ([] as T[]);
};

const getOptional = async <T>(query: string, params?: any[]): Promise<T | null> => {
const results = await getAll<T>(query, params);
return results.length > 0 ? results[0] : null;
};

const get = async <T>(query: string, params?: any[]): Promise<T> => {
const result = await getOptional<T>(query, params);
if (!result) {
throw new Error(`No results for query: ${query}`);
}
return result;
};

const executeRaw = async (query: string, params?: any[]): Promise<any[][]> => {
const db = await this.getDB();
const statement = db.prepare(query);
const rawResults: any[][] = [];
try {
if (params) {
statement.bind(params);
}
while (statement.step()) {
rawResults.push(statement.get());
}
return rawResults;
} finally {
statement.free();
}
};

return {
getAll,
getOptional,
get,
executeRaw,
execute
};
}

execute(query: string, params?: any[]): Promise<QueryResult> {
return this.writeLock((tx) => tx.execute(query, params));
}

executeRaw(query: string, params?: any[]): Promise<any[][]> {
return this.writeLock((tx) => tx.executeRaw(query, params));
}

async executeBatch(query: string, params: any[][] = []): Promise<QueryResult> {
let totalRowsAffected = 0;
const db = await this.getDB();

const stmt = db.prepare(query);
try {
for (const paramSet of params) {
stmt.run(paramSet);
totalRowsAffected += db.getRowsModified();
}

return {
rowsAffected: totalRowsAffected
};
} finally {
stmt.free();
}
}

/**
* We're not using separate read/write locks here because we can't implement connection pools on top of SQL.js.
*/
readLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
return this.writeLock(fn, options);
}

readTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions): Promise<T> {
return this.readLock(async (ctx) => {
return this.internalTransaction(ctx, fn);
});
}

writeLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
return this.mutex.runExclusive(async () => {
const db = await this.getDB();
const result = await fn(this.generateLockContext());
const result = await fn(new SqlJsLockContext(db));

// No point to schedule a write if there's no persister.
if (this.options.persister) {
Expand All @@ -276,61 +168,85 @@ export class SQLJSDBAdapter extends BaseObserver<DBAdapterListener> implements D
});
}

writeTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions): Promise<T> {
return this.writeLock(async (ctx) => {
return this.internalTransaction(ctx, fn);
});
}

refreshSchema(): Promise<void> {
return this.get("PRAGMA table_info('sqlite_master')");
async refreshSchema(): Promise<void> {
await this.writeLock((ctx) => ctx.get("PRAGMA table_info('sqlite_master')"));
}
}

getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
return this.readLock((tx) => tx.getAll<T>(sql, parameters));
}
class SqlJsExecutor implements SqlExecutor {
constructor(readonly db: SQLJs.Database) {}

getOptional<T>(sql: string, parameters?: any[]): Promise<T | null> {
return this.readLock((tx) => tx.getOptional<T>(sql, parameters));
}
async execute(query: string, params?: any[]): Promise<QueryResult> {
const db = this.db;
const statement = db.prepare(query);
const rawResults: any[][] = [];
let columnNames: string[] | null = null;
try {
if (params) {
statement.bind(params);
}
while (statement.step()) {
if (!columnNames) {
columnNames = statement.getColumnNames();
}
rawResults.push(statement.get());
}

get<T>(sql: string, parameters?: any[]): Promise<T> {
return this.readLock((tx) => tx.get<T>(sql, parameters));
const rows = rawResults.map((row) => {
return Object.fromEntries(row.map((value, index) => [columnNames![index], value]));
});
return {
// `lastInsertId` is not available in the original version of SQL.js or its types, but it's available in the fork we use.
insertId: (db as any).lastInsertId(),
rowsAffected: db.getRowsModified(),
rows: {
_array: rows,
length: rows.length,
item: (idx: number) => rows[idx]
}
};
} finally {
statement.free();
}
}

protected async internalTransaction<T>(ctx: LockContext, fn: (tx: Transaction) => Promise<T>): Promise<T> {
let finalized = false;
const commit = async (): Promise<QueryResult> => {
if (finalized) {
return { rowsAffected: 0 };
async executeRaw(query: string, params?: any[]): Promise<any[][]> {
const db = this.db;
const statement = db.prepare(query);
const rawResults: any[][] = [];
try {
if (params) {
statement.bind(params);
}
finalized = true;
return ctx.execute('COMMIT');
};
const rollback = async (): Promise<QueryResult> => {
if (finalized) {
return { rowsAffected: 0 };
while (statement.step()) {
rawResults.push(statement.get());
}
finalized = true;
return ctx.execute('ROLLBACK');
};
return rawResults;
} finally {
statement.free();
}
}

async executeBatch(query: string, params: any[][] = []): Promise<QueryResult> {
let totalRowsAffected = 0;
const db = this.db;

const stmt = db.prepare(query);
try {
await ctx.execute('BEGIN');
const result = await fn({
...ctx,
commit,
rollback
});
await commit();
return result;
} catch (ex) {
try {
await rollback();
} catch (ex2) {
// In rare cases, a rollback may fail.
// Safe to ignore.
for (const paramSet of params) {
stmt.run(paramSet);
totalRowsAffected += db.getRowsModified();
}
throw ex;

return {
rowsAffected: totalRowsAffected
};
} finally {
stmt.free();
}
}
}

class SqlJsLockContext extends DBGetUtilsDefaultMixin(SqlJsExecutor) implements LockContext {}

export class SQLJSDBAdapter extends DBAdapterDefaultMixin(SqlJsConnectionPool) implements DBAdapter {}
Loading
Loading