Skip to content

Commit c546134

Browse files
authored
Merge pull request #116 from powersync-ja/feat/execute-batch-web
Add executeBatch to React Native and Web
2 parents 6c43ec6 + 8f7caa5 commit c546134

File tree

9 files changed

+257
-46
lines changed

9 files changed

+257
-46
lines changed

.changeset/swift-pandas-laugh.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@journeyapps/powersync-sdk-react-native": minor
3+
"@journeyapps/powersync-sdk-common": minor
4+
"@journeyapps/powersync-sdk-web": minor
5+
---
6+
7+
Added batch execution functionality to the web and react-native SDKs. This feature allows a SQL statement with multiple parameters to be executed in a single transaction, improving performance and consistency.

packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,15 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
475475
return this.database.execute(sql, parameters);
476476
}
477477

478+
/**
479+
* Execute a batch write (INSERT/UPDATE/DELETE) query with multiple sets of parameters
480+
* and optionally return results.
481+
*/
482+
async executeBatch(query: string, params?: any[][]) {
483+
await this.waitForReady();
484+
return this.database.executeBatch(query, params);
485+
}
486+
478487
/**
479488
* Execute a read-only query and return results.
480489
*/

packages/powersync-sdk-common/src/db/DBAdapter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ export interface TableUpdateOperation {
6565
opType: RowUpdateType;
6666
rowId: number;
6767
}
68-
6968
/**
7069
* Notification of an update to one or more tables, for the purpose of realtime change notifications.
7170
*/
@@ -96,6 +95,7 @@ export interface DBLockOptions {
9695
export interface DBAdapter extends BaseObserverInterface<DBAdapterListener>, DBGetUtils {
9796
close: () => void;
9897
execute: (query: string, params?: any[]) => Promise<QueryResult>;
98+
executeBatch: (query: string, params?: any[][]) => Promise<QueryResult>;
9999
name: string;
100100
readLock: <T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions) => Promise<T>;
101101
readTransaction: <T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions) => Promise<T>;

packages/powersync-sdk-react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,19 @@ export class RNQSDBAdapter extends BaseObserver<DBAdapterListener> implements DB
6262
return this.baseDB.execute(query, params);
6363
}
6464

65+
async executeBatch(query: string, params: any[][] = []): Promise<QueryResult> {
66+
const commands: any[] = [];
67+
68+
for (let i = 0; i < params.length; i++) {
69+
commands.push([query, params[i]]);
70+
}
71+
72+
const result = await this.baseDB.executeBatch(commands);
73+
return {
74+
rowsAffected: result.rowsAffected ? result.rowsAffected : 0
75+
};
76+
}
77+
6578
/**
6679
* This provides a top-level read only execute method which is executed inside a read-lock.
6780
* This is necessary since the high level `execute` method uses a write-lock under

packages/powersync-sdk-web/src/db/adapters/SSRDBAdapter.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ export class SSRDBAdapter extends BaseObserver<DBAdapterListener> implements DBA
5353
return this.writeMutex.runExclusive(async () => MOCK_QUERY_RESPONSE);
5454
}
5555

56+
async executeBatch(query: string, params?: any[][]): Promise<QueryResult> {
57+
return this.writeMutex.runExclusive(async () => MOCK_QUERY_RESPONSE);
58+
}
59+
5660
async getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
5761
return [];
5862
}

packages/powersync-sdk-web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
} from '@journeyapps/powersync-sdk-common';
1212
import * as Comlink from 'comlink';
1313
import Logger, { ILogger } from 'js-logger';
14-
import type { DBWorkerInterface, OpenDB } from '../../../worker/db/open-db';
14+
import type { DBWorkerInterface, OpenDB, SQLBatchTuple } from '../../../worker/db/open-db';
1515
import { getWorkerDatabaseOpener } from '../../../worker/db/open-worker-database';
1616

1717
export type WASQLiteFlags = {
@@ -78,6 +78,10 @@ export class WASQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
7878
return this.writeLock((ctx) => ctx.execute(query, params));
7979
}
8080

81+
async executeBatch(query: string, params?: any[][]): Promise<QueryResult> {
82+
return this.writeLock((ctx) => this._executeBatch(query, params));
83+
}
84+
8185
/**
8286
* Wraps the worker execute function, awaiting for it to be available
8387
*/
@@ -93,6 +97,18 @@ export class WASQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
9397
};
9498
};
9599

100+
/**
101+
* Wraps the worker executeBatch function, awaiting for it to be available
102+
*/
103+
private _executeBatch = async (query: string, params?: any[]): Promise<QueryResult> => {
104+
await this.initialized;
105+
const result = await this.workerMethods!.executeBatch!(query, params);
106+
return {
107+
...result,
108+
rows: undefined,
109+
};
110+
};
111+
96112
/**
97113
* Attempts to close the connection.
98114
* Shared workers might not actually close the connection if other

packages/powersync-sdk-web/src/worker/db/open-db.ts

Lines changed: 126 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,17 @@ export type DBWorkerInterface = {
1515
// Close is only exposed when used in a single non shared webworker
1616
close?: () => void;
1717
execute: WASQLiteExecuteMethod;
18+
executeBatch: WASQLiteExecuteBatchMethod;
1819
registerOnTableChange: (callback: OnTableChangeCallback) => void;
1920
};
2021

2122
export type WASQLiteExecuteMethod = (sql: string, params?: any[]) => Promise<WASQLExecuteResult>;
22-
23+
export type WASQLiteExecuteBatchMethod = (sql: string, params?: any[]) => Promise<WASQLExecuteResult>;
2324
export type OnTableChangeCallback = (opType: number, tableName: string, rowId: number) => void;
2425
export type OpenDB = (dbFileName: string) => DBWorkerInterface;
2526

27+
export type SQLBatchTuple = [string] | [string, Array<any> | Array<Array<any>>];
28+
2629
export async function _openDB(dbFileName: string): Promise<DBWorkerInterface> {
2730
const { default: moduleFactory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite-async.mjs');
2831
const module = await moduleFactory();
@@ -52,64 +55,142 @@ export async function _openDB(dbFileName: string): Promise<DBWorkerInterface> {
5255
};
5356

5457
/**
55-
* This executes SQL statements.
58+
* This executes single SQL statements inside a requested lock.
5659
*/
5760
const execute = async (sql: string | TemplateStringsArray, bindings?: any[]): Promise<WASQLExecuteResult> => {
5861
// Running multiple statements on the same connection concurrently should not be allowed
59-
return navigator.locks.request(`db-execute-${dbFileName}`, async () => {
60-
const results = [];
61-
for await (const stmt of sqlite3.statements(db, sql as string)) {
62-
let columns;
63-
const wrappedBindings = bindings ? [bindings] : [[]];
64-
for (const binding of wrappedBindings) {
65-
// TODO not sure why this is needed currently, but booleans break
66-
binding.forEach((b, index, arr) => {
67-
if (typeof b == 'boolean') {
68-
arr[index] = b ? 1 : 0;
69-
}
70-
});
62+
return _acquireExecuteLock(async () => {
63+
return executeSingleStatement(sql, bindings);
64+
});
65+
};
7166

72-
sqlite3.reset(stmt);
73-
if (bindings) {
74-
sqlite3.bind_collection(stmt, binding);
75-
}
67+
/**
68+
* This requests a lock for executing statements.
69+
* Should only be used interanlly.
70+
*/
71+
const _acquireExecuteLock = (callback: () => Promise<any>): Promise<any> => {
72+
return navigator.locks.request(`db-execute-${dbFileName}`, callback);
73+
};
7674

77-
const rows = [];
78-
while ((await sqlite3.step(stmt)) === SQLite.SQLITE_ROW) {
79-
const row = sqlite3.row(stmt);
80-
rows.push(row);
75+
/**
76+
* This executes a single statement using SQLite3.
77+
*/
78+
const executeSingleStatement = async (
79+
sql: string | TemplateStringsArray,
80+
bindings?: any[]
81+
): Promise<WASQLExecuteResult> => {
82+
const results = [];
83+
for await (const stmt of sqlite3.statements(db, sql as string)) {
84+
let columns;
85+
const wrappedBindings = bindings ? [bindings] : [[]];
86+
for (const binding of wrappedBindings) {
87+
// TODO not sure why this is needed currently, but booleans break
88+
binding.forEach((b, index, arr) => {
89+
if (typeof b == 'boolean') {
90+
arr[index] = b ? 1 : 0;
8191
}
92+
});
8293

83-
columns = columns ?? sqlite3.column_names(stmt);
84-
if (columns.length) {
85-
results.push({ columns, rows });
86-
}
94+
sqlite3.reset(stmt);
95+
if (bindings) {
96+
sqlite3.bind_collection(stmt, binding);
8797
}
8898

89-
// When binding parameters, only a single statement is executed.
90-
if (bindings) {
91-
break;
99+
const rows = [];
100+
while ((await sqlite3.step(stmt)) === SQLite.SQLITE_ROW) {
101+
const row = sqlite3.row(stmt);
102+
rows.push(row);
92103
}
93-
}
94104

95-
let rows: Record<string, any>[] = [];
96-
for (let resultset of results) {
97-
for (let row of resultset.rows) {
98-
let outRow: Record<string, any> = {};
99-
resultset.columns.forEach((key, index) => {
100-
outRow[key] = row[index];
101-
});
102-
rows.push(outRow);
105+
columns = columns ?? sqlite3.column_names(stmt);
106+
if (columns.length) {
107+
results.push({ columns, rows });
103108
}
104109
}
105110

106-
const result = {
107-
insertId: sqlite3.last_insert_id(db),
108-
rowsAffected: sqlite3.changes(db),
109-
rows: {
110-
_array: rows,
111-
length: rows.length
111+
// When binding parameters, only a single statement is executed.
112+
if (bindings) {
113+
break;
114+
}
115+
}
116+
117+
let rows: Record<string, any>[] = [];
118+
for (let resultset of results) {
119+
for (let row of resultset.rows) {
120+
let outRow: Record<string, any> = {};
121+
resultset.columns.forEach((key, index) => {
122+
outRow[key] = row[index];
123+
});
124+
rows.push(outRow);
125+
}
126+
}
127+
128+
const result = {
129+
insertId: sqlite3.last_insert_id(db),
130+
rowsAffected: sqlite3.changes(db),
131+
rows: {
132+
_array: rows,
133+
length: rows.length
134+
}
135+
};
136+
137+
return result;
138+
};
139+
140+
/**
141+
* This executes SQL statements in a batch.
142+
*/
143+
const executeBatch = async (sql: string, bindings?: any[][]): Promise<WASQLExecuteResult> => {
144+
return _acquireExecuteLock(async () => {
145+
let affectedRows = 0;
146+
147+
const str = sqlite3.str_new(db, sql);
148+
const query = sqlite3.str_value(str);
149+
try {
150+
await executeSingleStatement('BEGIN TRANSACTION');
151+
152+
//Prepare statement once
153+
let prepared = await sqlite3.prepare_v2(db, query);
154+
if (prepared === null) {
155+
return {
156+
rowsAffected: 0
157+
};
158+
}
159+
const wrappedBindings = bindings ? bindings : [];
160+
for (const binding of wrappedBindings) {
161+
// TODO not sure why this is needed currently, but booleans break
162+
for (let i = 0; i < binding.length; i++) {
163+
let b = binding[i];
164+
if (typeof b == 'boolean') {
165+
binding[i] = b ? 1 : 0;
166+
}
167+
}
168+
169+
//Reset bindings
170+
sqlite3.reset(prepared.stmt);
171+
if (bindings) {
172+
sqlite3.bind_collection(prepared.stmt, binding);
173+
}
174+
175+
let result = await sqlite3.step(prepared.stmt);
176+
if (result === SQLite.SQLITE_DONE) {
177+
//The value returned by sqlite3_changes() immediately after an INSERT, UPDATE or DELETE statement run on a view is always zero.
178+
affectedRows += sqlite3.changes(db);
179+
}
112180
}
181+
//Finalize prepared statement
182+
await sqlite3.finalize(prepared.stmt);
183+
await executeSingleStatement('COMMIT');
184+
} catch (err) {
185+
await executeSingleStatement('ROLLBACK');
186+
return {
187+
rowsAffected: 0
188+
};
189+
} finally {
190+
sqlite3.str_finish(str);
191+
}
192+
const result = {
193+
rowsAffected: affectedRows
113194
};
114195

115196
return result;
@@ -118,6 +199,7 @@ export async function _openDB(dbFileName: string): Promise<DBWorkerInterface> {
118199

119200
return {
120201
execute: Comlink.proxy(execute),
202+
executeBatch: Comlink.proxy(executeBatch),
121203
registerOnTableChange: Comlink.proxy(registerOnTableChange),
122204
close: Comlink.proxy(() => {
123205
sqlite3.close(db);

packages/powersync-sdk-web/tests/crud.test.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,32 @@ describe('CRUD Tests', () => {
5454
expect(tx.crud[0].equals(expectedCrudEntry)).true;
5555
});
5656

57+
it('BATCH INSERT', async () => {
58+
expect(await powersync.getAll('SELECT * FROM ps_crud')).empty;
59+
60+
const query = `INSERT INTO assets(id, description) VALUES(?, ?)`;
61+
await powersync.executeBatch(query, [
62+
[testId, 'test'],
63+
['mockId', 'test1']
64+
]);
65+
66+
expect(await powersync.getAll('SELECT data FROM ps_crud ORDER BY id')).deep.equals([
67+
{
68+
data: `{"op":"PUT","type":"assets","id":"${testId}","data":{"description":"test"}}`
69+
},
70+
{
71+
data: `{"op":"PUT","type":"assets","id":"mockId","data":{"description":"test1"}}`
72+
}
73+
]);
74+
75+
const crudBatch = (await powersync.getCrudBatch(2))!;
76+
expect(crudBatch.crud.length).equals(2);
77+
const expectedCrudEntry = new CrudEntry(1, UpdateType.PUT, 'assets', testId, 1, { description: 'test' });
78+
const expectedCrudEntry2 = new CrudEntry(2, UpdateType.PUT, 'assets', 'mockId', 1, { description: 'test1' });
79+
expect(crudBatch.crud[0].equals(expectedCrudEntry)).true;
80+
expect(crudBatch.crud[1].equals(expectedCrudEntry2)).true;
81+
});
82+
5783
it('INSERT OR REPLACE', async () => {
5884
await powersync.execute('INSERT INTO assets(id, description) VALUES(?, ?)', [testId, 'test']);
5985
await powersync.execute('DELETE FROM ps_crud WHERE 1');
@@ -96,6 +122,38 @@ describe('CRUD Tests', () => {
96122
expect(tx.crud[0].equals(expectedCrudEntry)).true;
97123
});
98124

125+
it('BATCH UPDATE', async () => {
126+
await powersync.executeBatch('INSERT INTO assets(id, description, make) VALUES(?, ?, ?)', [
127+
[testId, 'test', 'test'],
128+
['mockId', 'test', 'test']
129+
]);
130+
await powersync.execute('DELETE FROM ps_crud WHERE 1');
131+
132+
await powersync.executeBatch('UPDATE assets SET description = ?, make = ?', [['test2', 'make2']]);
133+
134+
expect(await powersync.getAll('SELECT data FROM ps_crud ORDER BY id')).deep.equals([
135+
{
136+
data: `{"op":"PATCH","type":"assets","id":"${testId}","data":{"description":"test2","make":"make2"}}`
137+
},
138+
{
139+
data: `{"op":"PATCH","type":"assets","id":"mockId","data":{"description":"test2","make":"make2"}}`
140+
}
141+
]);
142+
143+
const crudBatch = (await powersync.getCrudBatch(2))!;
144+
expect(crudBatch.crud.length).equals(2);
145+
const expectedCrudEntry = new CrudEntry(3, UpdateType.PATCH, 'assets', testId, 2, {
146+
description: 'test2',
147+
make: 'make2'
148+
});
149+
const expectedCrudEntry2 = new CrudEntry(4, UpdateType.PATCH, 'assets', 'mockId', 2, {
150+
description: 'test2',
151+
make: 'make2'
152+
});
153+
expect(crudBatch.crud[0].equals(expectedCrudEntry)).true;
154+
expect(crudBatch.crud[1].equals(expectedCrudEntry2)).true;
155+
});
156+
99157
it('DELETE', async () => {
100158
await powersync.execute('INSERT INTO assets(id, description, make) VALUES(?, ?, ?)', [testId, 'test', 'test']);
101159
await powersync.execute('DELETE FROM ps_crud WHERE 1');

0 commit comments

Comments
 (0)