Skip to content

Commit d4db4e2

Browse files
authored
[MySQL] Automatic schema change handling (#287)
* Removed zongji type mappings which are now provided by the Zongji package directly Added check for tablemap events * Moved most of the binlog event handling logic to a separate BinlogListener class. Introduced a mechanism to limit the maximum size of the binlog processing queue, thus also limiting memory usage. This maximum processing queue size is configurable * Updated the BinLogStream to use the new BinLogListener * Renamed BinlogListener to BinLogListener * Added changeset * Simplified BinLogListener stopping mechanism Cleaned up BinLogStream logs a bit * Corrected BinLogListener name. Simplified BinLogListener stopping mechanism * Supply port for binlog listener connections. * Only set up binlog heartbeat once the listener is fully started up. Added a few more defensive stopped checks to the binlog listener * Updated changeset * Changed binlog backpressure mechanism to be based on processing queue memory usage rather than number of events * Changed binlog backpressure mechanism to be based on processing queue memory usage rather than number of events. Introduced a maximum timeout that the binlog processing queue can be paused before auto-resuming. This is to prevent the replication connection timing out. * Added optional columns field to SourceEntityDescriptor Made SourceTable implement SourceEntityDescriptor interface * Cleanup unused imports * Ensure column values are preserved when available Report 0 storage metrics instead of ignoring them. SourceTable. Moved MySQL table detail retrieval logic to utility function. * Added basic schema change handling for MySQL * Revert columns field addition to SourceEntityDescriptor * Added schema change handling for the MySQL binlog replication. * Include powersync core version in metrics metadata * Code analysis cleanup * Merge conflicts * Fixed parser import * Fixed mysql->sqlite rows parsing that would filter out columns with null values * Cleaned up SchemaChange handling in BinLogListener Improved binlog table filtering Added extended type definitions for node-sql-parser package * Added schema change tests Cleaned up MySQL tests in general and added a few new test utils * Change binlog event receive log message to debug * Revert and fix mysql->sqlite row conversion for null value columns * Added conditional skip of mysql schema test for syntax that does not exist in version 5.7 * Fixed version checking for mysql 5.7 incompatible test * Fix skip test on mysql 5.7 schema change * Reverted mysql dev docker compose Updated to released zongji listener version * Moved schema change handling to processing queue Catch parsing errors, and log an error if the DDL query might apply to one of the tables in the sync rules. * Fixed bug where multiple zongji listeners could be started if multiple schema change events were in the processing queue Added small timeout to test to prevent rare race condition * Extended node-sql-parser type definitions Added util functions to identify the different types of DDL statements * - Simplified schema change types - Added more detections of constraint changes - Removed detection of create table statements since they can be detected and reacted to when row events are received for new tables - Added multiple extra test cases * Removed unused constant * Skip unsupported schema test for MySQL 5.7 * Added error handling for zongji emitted schema errors * Added changeset * Typo fixes from pr feedback * Removed filters from mysql dev docker config * Added safeguard for gtid splitting when no transactions have been run on the mysql database yet. * BinLog listener now correctly takes schema into account for replication. TableFilter creation is now internally handled in the BinLog listener Pause/unpause binlog listening now uses the same stop start functionality used for schema change handling. * BinLog stream now correctly honors multiple schemas in the sync rules. * Added tests for multi schema support * MySQL util fix post merge * Removed accidentally commited keepalive code in BinLogStream. * Cleaned up Binlog docs and comments a bit * Removed potentially spammy log entry.
1 parent 2378e36 commit d4db4e2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2205
-525
lines changed

.changeset/wet-berries-enjoy.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
---
2+
'@powersync/service-module-postgres-storage': minor
3+
'@powersync/service-module-mongodb-storage': minor
4+
'@powersync/service-core-tests': minor
5+
'@powersync/service-module-postgres': minor
6+
'@powersync/service-module-mongodb': minor
7+
'@powersync/service-core': minor
8+
'@powersync/service-module-mysql': minor
9+
'@powersync/service-sync-rules': minor
10+
---
11+
12+
MySQL:
13+
- Added schema change handling
14+
- Except for some edge cases, the following schema changes are now handled automatically:
15+
- Creation, renaming, dropping and truncation of tables.
16+
- Creation and dropping of unique indexes and primary keys.
17+
- Adding, modifying, dropping and renaming of table columns.
18+
- If a schema change cannot handled automatically, a warning with details will be logged.
19+
- Mismatches in table schema from the Zongji binlog listener are now handled more gracefully.
20+
- Replication of wildcard tables is now supported.
21+
- Improved logging for binlog event processing.

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,9 @@ export class MongoSyncBucketStorage
164164
async resolveTable(options: storage.ResolveTableOptions): Promise<storage.ResolveTableResult> {
165165
const { group_id, connection_id, connection_tag, entity_descriptor } = options;
166166

167-
const { schema, name: table, objectId, replicationColumns } = entity_descriptor;
167+
const { schema, name, objectId, replicaIdColumns } = entity_descriptor;
168168

169-
const columns = replicationColumns.map((column) => ({
169+
const normalizedReplicaIdColumns = replicaIdColumns.map((column) => ({
170170
name: column.name,
171171
type: column.type,
172172
type_oid: column.typeId
@@ -178,8 +178,8 @@ export class MongoSyncBucketStorage
178178
group_id: group_id,
179179
connection_id: connection_id,
180180
schema_name: schema,
181-
table_name: table,
182-
replica_id_columns2: columns
181+
table_name: name,
182+
replica_id_columns2: normalizedReplicaIdColumns
183183
};
184184
if (objectId != null) {
185185
filter.relation_id = objectId;
@@ -192,24 +192,24 @@ export class MongoSyncBucketStorage
192192
connection_id: connection_id,
193193
relation_id: objectId,
194194
schema_name: schema,
195-
table_name: table,
195+
table_name: name,
196196
replica_id_columns: null,
197-
replica_id_columns2: columns,
197+
replica_id_columns2: normalizedReplicaIdColumns,
198198
snapshot_done: false,
199199
snapshot_status: undefined
200200
};
201201

202202
await col.insertOne(doc, { session });
203203
}
204-
const sourceTable = new storage.SourceTable(
205-
doc._id,
206-
connection_tag,
207-
objectId,
208-
schema,
209-
table,
210-
replicationColumns,
211-
doc.snapshot_done ?? true
212-
);
204+
const sourceTable = new storage.SourceTable({
205+
id: doc._id,
206+
connectionTag: connection_tag,
207+
objectId: objectId,
208+
schema: schema,
209+
name: name,
210+
replicaIdColumns: replicaIdColumns,
211+
snapshotComplete: doc.snapshot_done ?? true
212+
});
213213
sourceTable.syncEvent = options.sync_rules.tableTriggersEvent(sourceTable);
214214
sourceTable.syncData = options.sync_rules.tableSyncsData(sourceTable);
215215
sourceTable.syncParameters = options.sync_rules.tableSyncsParameters(sourceTable);
@@ -224,7 +224,7 @@ export class MongoSyncBucketStorage
224224

225225
let dropTables: storage.SourceTable[] = [];
226226
// Detect tables that are either renamed, or have different replica_id_columns
227-
let truncateFilter = [{ schema_name: schema, table_name: table }] as any[];
227+
let truncateFilter = [{ schema_name: schema, table_name: name }] as any[];
228228
if (objectId != null) {
229229
// Only detect renames if the source uses relation ids.
230230
truncateFilter.push({ relation_id: objectId });
@@ -242,15 +242,16 @@ export class MongoSyncBucketStorage
242242
.toArray();
243243
dropTables = truncate.map(
244244
(doc) =>
245-
new storage.SourceTable(
246-
doc._id,
247-
connection_tag,
248-
doc.relation_id,
249-
doc.schema_name,
250-
doc.table_name,
251-
doc.replica_id_columns2?.map((c) => ({ name: c.name, typeOid: c.type_oid, type: c.type })) ?? [],
252-
doc.snapshot_done ?? true
253-
)
245+
new storage.SourceTable({
246+
id: doc._id,
247+
connectionTag: connection_tag,
248+
objectId: doc.relation_id,
249+
schema: doc.schema_name,
250+
name: doc.table_name,
251+
replicaIdColumns:
252+
doc.replica_id_columns2?.map((c) => ({ name: c.name, typeOid: c.type_oid, type: c.type })) ?? [],
253+
snapshotComplete: doc.snapshot_done ?? true
254+
})
254255
);
255256

256257
result = {
@@ -577,7 +578,6 @@ export class MongoSyncBucketStorage
577578
`${this.slot_name} Cleared batch of data in ${lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS}ms, continuing...`
578579
);
579580
await timers.setTimeout(lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS / 5);
580-
continue;
581581
} else {
582582
throw e;
583583
}

modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import * as sync_rules from '@powersync/service-sync-rules';
55
import * as service_types from '@powersync/service-types';
66

77
import { MongoManager } from '../replication/MongoManager.js';
8-
import { constructAfterRecord, createCheckpoint, STANDALONE_CHECKPOINT_ID } from '../replication/MongoRelation.js';
8+
import { constructAfterRecord, STANDALONE_CHECKPOINT_ID } from '../replication/MongoRelation.js';
99
import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js';
1010
import * as types from '../types/types.js';
1111
import { escapeRegExp } from '../utils.js';
@@ -137,15 +137,15 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
137137
if (tablePattern.isWildcard) {
138138
patternResult.tables = [];
139139
for (let collection of collections) {
140-
const sourceTable = new SourceTable(
141-
0,
142-
this.connectionTag,
143-
collection.name,
144-
schema,
145-
collection.name,
146-
[],
147-
true
148-
);
140+
const sourceTable = new SourceTable({
141+
id: 0,
142+
connectionTag: this.connectionTag,
143+
objectId: collection.name,
144+
schema: schema,
145+
name: collection.name,
146+
replicaIdColumns: [],
147+
snapshotComplete: true
148+
});
149149
let errors: service_types.ReplicationError[] = [];
150150
if (collection.type == 'view') {
151151
errors.push({ level: 'warning', message: `Collection ${schema}.${tablePattern.name} is a view` });
@@ -164,15 +164,15 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
164164
});
165165
}
166166
} else {
167-
const sourceTable = new SourceTable(
168-
0,
169-
this.connectionTag,
170-
tablePattern.name,
171-
schema,
172-
tablePattern.name,
173-
[],
174-
true
175-
);
167+
const sourceTable = new SourceTable({
168+
id: 0,
169+
connectionTag: this.connectionTag,
170+
objectId: tablePattern.name,
171+
schema: schema,
172+
name: tablePattern.name,
173+
replicaIdColumns: [],
174+
snapshotComplete: true
175+
});
176176

177177
const syncData = sqlSyncRules.tableSyncsData(sourceTable);
178178
const syncParameters = sqlSyncRules.tableSyncsParameters(sourceTable);

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ export class ChangeStream {
215215

216216
async estimatedCountNumber(table: storage.SourceTable): Promise<number> {
217217
const db = this.client.db(table.schema);
218-
return await db.collection(table.table).estimatedDocumentCount();
218+
return await db.collection(table.name).estimatedDocumentCount();
219219
}
220220

221221
/**
@@ -449,7 +449,7 @@ export class ChangeStream {
449449
const totalEstimatedCount = await this.estimatedCountNumber(table);
450450
let at = table.snapshotStatus?.replicatedCount ?? 0;
451451
const db = this.client.db(table.schema);
452-
const collection = db.collection(table.table);
452+
const collection = db.collection(table.name);
453453
await using query = new ChunkedSnapshotQuery({
454454
collection,
455455
key: table.snapshotStatus?.lastKey,

modules/module-mongodb/src/replication/MongoRelation.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.S
1313
schema: source.db,
1414
// Not relevant for MongoDB - we use db + coll name as the identifier
1515
objectId: undefined,
16-
replicationColumns: [{ name: '_id' }]
16+
replicaIdColumns: [{ name: '_id' }]
1717
} satisfies storage.SourceEntityDescriptor;
1818
}
1919

@@ -22,7 +22,7 @@ export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.S
2222
*/
2323
export function getCacheIdentifier(source: storage.SourceEntityDescriptor | storage.SourceTable): string {
2424
if (source instanceof storage.SourceTable) {
25-
return `${source.schema}.${source.table}`;
25+
return `${source.schema}.${source.name}`;
2626
}
2727
return `${source.schema}.${source.name}`;
2828
}

modules/module-mysql/dev/docker/mysql/init-scripts/my.cnf

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,4 @@ enforce-gtid-consistency = ON
44
# Row format required for ZongJi
55
binlog_format = row
66
log_bin=mysql-bin
7-
server-id=1
8-
binlog-do-db=mydatabase
9-
replicate-do-table=mydatabase.lists
7+
server-id=1

modules/module-mysql/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@
3333
"@powersync/service-sync-rules": "workspace:*",
3434
"@powersync/service-types": "workspace:*",
3535
"@powersync/service-jsonbig": "workspace:*",
36-
"@powersync/mysql-zongji": "0.2.0",
36+
"@powersync/mysql-zongji": "^0.4.0",
3737
"async": "^3.2.4",
3838
"mysql2": "^3.11.0",
39+
"node-sql-parser": "^5.3.9",
3940
"semver": "^7.5.4",
4041
"ts-codec": "^1.3.0",
4142
"uri-js": "^4.4.1",

modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
208208
idColumnsResult = await common.getReplicationIdentityColumns({
209209
connection: connection,
210210
schema,
211-
table_name: tableName
211+
tableName: tableName
212212
});
213213
} catch (ex) {
214214
idColumnsError = { level: 'fatal', message: ex.message };
@@ -217,7 +217,15 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
217217
}
218218

219219
const idColumns = idColumnsResult?.columns ?? [];
220-
const sourceTable = new storage.SourceTable(0, this.config.tag, tableName, schema, tableName, idColumns, true);
220+
const sourceTable = new storage.SourceTable({
221+
id: 0,
222+
connectionTag: this.config.tag,
223+
objectId: tableName,
224+
schema: schema,
225+
name: tableName,
226+
replicaIdColumns: idColumns,
227+
snapshotComplete: true
228+
});
221229
const syncData = syncRules.tableSyncsData(sourceTable);
222230
const syncParameters = syncRules.tableSyncsParameters(sourceTable);
223231

@@ -232,7 +240,7 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
232240
let selectError: service_types.ReplicationError | null = null;
233241
try {
234242
await this.retriedQuery({
235-
query: `SELECT * FROM ${sourceTable.table} LIMIT 1`
243+
query: `SELECT * FROM ${sourceTable.name} LIMIT 1`
236244
});
237245
} catch (e) {
238246
selectError = { level: 'fatal', message: e.message };

modules/module-mysql/src/common/ReplicatedGTID.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,15 @@ export class ReplicatedGTID {
9292
* @returns A comparable string in the format
9393
* `padded_end_transaction|raw_gtid|binlog_filename|binlog_position`
9494
*/
95-
get comparable() {
95+
get comparable(): string {
9696
const { raw, position } = this;
9797
const [, transactionRanges] = this.raw.split(':');
9898

99+
// This means no transactions have been executed on the database yet
100+
if (!transactionRanges) {
101+
return ReplicatedGTID.ZERO.comparable;
102+
}
103+
99104
let maxTransactionId = 0;
100105

101106
for (const range of transactionRanges.split(',')) {
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
export * from './check-source-configuration.js';
2-
export * from './get-replication-columns.js';
3-
export * from './get-tables-from-pattern.js';
2+
export * from './schema-utils.js';
43
export * from './mysql-to-sqlite.js';
54
export * from './read-executed-gtid.js';
65
export * from './ReplicatedGTID.js';

0 commit comments

Comments
 (0)