Skip to content

Commit 48220b7

Browse files
feat(sync-engine): replace pg-node-migrations with custom runner
- Custom migration runner with advisory locks (no checksum validation) - Use {{schema}} placeholders for configurable schema support - Add getMigrations(schema) export for inspecting migrations Fixes #191, fixes #77
1 parent 511a4a2 commit 48220b7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+265
-199
lines changed

packages/sync-engine/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
],
2727
"dependencies": {
2828
"pg": "^8.16.3",
29-
"pg-node-migrations": "0.0.8",
3029
"yesql": "^7.0.0"
3130
},
3231
"peerDependencies": {
Lines changed: 100 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,137 @@
11
import { Client } from 'pg'
2-
import { migrate } from 'pg-node-migrations'
32
import fs from 'node:fs'
4-
import pino from 'pino'
53
import path from 'node:path'
4+
import type { Logger } from 'pino'
65
import type { ConnectionOptions } from 'node:tls'
76

87
type MigrationConfig = {
98
schema: string
109
databaseUrl: string
1110
ssl?: ConnectionOptions
12-
logger?: pino.Logger
11+
logger?: Logger
1312
}
1413

15-
async function connectAndMigrate(
16-
client: Client,
17-
migrationsDirectory: string,
18-
config: MigrationConfig,
19-
logOnError = false
20-
) {
21-
if (!fs.existsSync(migrationsDirectory)) {
22-
config.logger?.info(`Migrations directory ${migrationsDirectory} not found, skipping`)
23-
return
24-
}
14+
const MIGRATION_LOCK_ID = 72987329
15+
const MIGRATIONS_DIR = path.join(__dirname, 'migrations')
2516

26-
const optionalConfig = {
27-
schemaName: config.schema,
28-
tableName: 'migrations',
29-
}
17+
function parseFileName(fileName: string): { id: number; name: string } | null {
18+
const match = /^(\d+)[-_](.*)\.sql$/i.exec(fileName)
19+
if (!match) return null
20+
return { id: parseInt(match[1], 10), name: match[2] }
21+
}
22+
23+
export type Migration = { id: number; name: string; sql: string }
24+
25+
/**
26+
* Returns all migrations with schema placeholders replaced.
27+
* Useful for inspecting migrations or running them manually with psql.
28+
*/
29+
export function getMigrations(schema: string = 'stripe'): Migration[] {
30+
if (!fs.existsSync(MIGRATIONS_DIR)) return []
31+
32+
return fs
33+
.readdirSync(MIGRATIONS_DIR)
34+
.filter((f) => f.endsWith('.sql'))
35+
.sort()
36+
.map((fileName) => {
37+
const parsed = parseFileName(fileName)
38+
if (!parsed) return null
39+
40+
const raw = fs.readFileSync(path.join(MIGRATIONS_DIR, fileName), 'utf8')
41+
const sql = raw.replace(/\{\{schema\}\}/g, schema)
42+
43+
return { id: parsed.id, name: parsed.name, sql }
44+
})
45+
.filter((m): m is Migration => m !== null)
46+
}
47+
48+
/**
49+
* Applies a single migration file within a transaction.
50+
* Supports disabling transactions via `-- postgres-migrations disable-transaction` comment.
51+
*/
52+
async function applyMigration(
53+
client: Client,
54+
tableName: string,
55+
migration: { id: number; name: string; sql: string }
56+
): Promise<void> {
57+
const useTransaction = !migration.sql.includes('-- postgres-migrations disable-transaction')
3058

3159
try {
32-
await migrate({ client }, migrationsDirectory, optionalConfig)
33-
} catch (error) {
34-
if (logOnError && error instanceof Error) {
35-
config.logger?.error(error, 'Migration error:')
36-
} else {
37-
throw error
60+
if (useTransaction) await client.query('START TRANSACTION')
61+
await client.query(migration.sql)
62+
await client.query(`INSERT INTO ${tableName} (id, name) VALUES ($1, $2)`, [
63+
migration.id,
64+
migration.name,
65+
])
66+
if (useTransaction) await client.query('COMMIT')
67+
} catch (err) {
68+
if (useTransaction) {
69+
try {
70+
await client.query('ROLLBACK')
71+
} catch {
72+
// Connection may already be broken
73+
}
3874
}
75+
throw new Error(
76+
`Migration ${migration.id} (${migration.name}) failed: ${err instanceof Error ? err.message : String(err)}`
77+
)
3978
}
4079
}
4180

4281
export async function runMigrations(config: MigrationConfig): Promise<void> {
43-
// Init DB
4482
const client = new Client({
4583
connectionString: config.databaseUrl,
4684
ssl: config.ssl,
4785
connectionTimeoutMillis: 10_000,
4886
})
4987

88+
const tableName = `"${config.schema}"."migrations"`
89+
const migrations = getMigrations(config.schema)
90+
5091
try {
51-
// Run migrations
5292
await client.connect()
5393

54-
// Ensure schema exists, not doing it via migration to not break current migration checksums
55-
await client.query(`CREATE SCHEMA IF NOT EXISTS ${config.schema};`)
94+
if (migrations.length === 0) {
95+
config.logger?.info(`No migrations found, skipping`)
96+
return
97+
}
5698

5799
config.logger?.info('Running migrations')
58100

59-
await connectAndMigrate(client, path.resolve(__dirname, './migrations'), config)
101+
await client.query(`SELECT pg_advisory_lock(${MIGRATION_LOCK_ID})`)
102+
103+
try {
104+
await client.query(`CREATE SCHEMA IF NOT EXISTS "${config.schema}"`)
105+
106+
await client.query(`
107+
CREATE TABLE IF NOT EXISTS ${tableName} (
108+
id integer PRIMARY KEY,
109+
name varchar(100) UNIQUE NOT NULL,
110+
executed_at timestamp DEFAULT current_timestamp
111+
)
112+
`)
113+
114+
// Remove legacy hash column from pg-node-migrations (checksums no longer validated)
115+
await client.query(`ALTER TABLE ${tableName} DROP COLUMN IF EXISTS hash`)
116+
117+
const { rows: applied } = await client.query<{ id: number }>(`SELECT id FROM ${tableName}`)
118+
const appliedIds = new Set(applied.map((r) => r.id))
119+
120+
for (const migration of migrations) {
121+
if (appliedIds.has(migration.id)) continue
122+
123+
config.logger?.info(`Applying migration ${migration.id}: ${migration.name}`)
124+
await applyMigration(client, tableName, migration)
125+
}
126+
} finally {
127+
await client.query(`SELECT pg_advisory_unlock(${MIGRATION_LOCK_ID})`)
128+
}
129+
130+
config.logger?.info('Finished migrations')
60131
} catch (err) {
61132
config.logger?.error(err, 'Error running migrations')
133+
throw err
62134
} finally {
63135
await client.end()
64-
config.logger?.info('Finished migrations')
65136
}
66137
}

packages/sync-engine/src/database/migrations/0001_products.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
create table if not exists "stripe"."products" (
1+
create table if not exists "{{schema}}"."products" (
22
"id" text primary key,
33
"object" text,
44
"active" boolean,

packages/sync-engine/src/database/migrations/0002_customers.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
create table if not exists "stripe"."customers" (
1+
create table if not exists "{{schema}}"."customers" (
22
"id" text primary key,
33
"object" text,
44
"address" jsonb,
Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,41 @@
11
DO $$
22
BEGIN
3-
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'pricing_type') THEN
4-
create type "stripe"."pricing_type" as enum ('one_time', 'recurring');
3+
IF NOT EXISTS (
4+
SELECT 1 FROM pg_type t
5+
JOIN pg_namespace n ON t.typnamespace = n.oid
6+
WHERE t.typname = 'pricing_type' AND n.nspname = '{{schema}}'
7+
) THEN
8+
create type "{{schema}}"."pricing_type" as enum ('one_time', 'recurring');
59
END IF;
6-
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'pricing_tiers') THEN
7-
create type "stripe"."pricing_tiers" as enum ('graduated', 'volume');
10+
IF NOT EXISTS (
11+
SELECT 1 FROM pg_type t
12+
JOIN pg_namespace n ON t.typnamespace = n.oid
13+
WHERE t.typname = 'pricing_tiers' AND n.nspname = '{{schema}}'
14+
) THEN
15+
create type "{{schema}}"."pricing_tiers" as enum ('graduated', 'volume');
816
END IF;
9-
--more types here...
1017
END
1118
$$;
1219

1320

14-
create table if not exists "stripe"."prices" (
21+
create table if not exists "{{schema}}"."prices" (
1522
"id" text primary key,
1623
"object" text,
1724
"active" boolean,
1825
"currency" text,
1926
"metadata" jsonb,
2027
"nickname" text,
2128
"recurring" jsonb,
22-
"type" stripe.pricing_type,
29+
"type" "{{schema}}"."pricing_type",
2330
"unit_amount" integer,
2431
"billing_scheme" text,
2532
"created" integer,
2633
"livemode" boolean,
2734
"lookup_key" text,
28-
"tiers_mode" stripe.pricing_tiers,
35+
"tiers_mode" "{{schema}}"."pricing_tiers",
2936
"transform_quantity" jsonb,
3037
"unit_amount_decimal" text,
3138

32-
"product" text references stripe.products
39+
"product" text references "{{schema}}"."products"
3340
);
3441

packages/sync-engine/src/database/migrations/0004_subscriptions.sql

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11

22
DO $$
33
BEGIN
4-
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'subscription_status') THEN
5-
create type "stripe"."subscription_status" as enum (
4+
IF NOT EXISTS (
5+
SELECT 1 FROM pg_type t
6+
JOIN pg_namespace n ON t.typnamespace = n.oid
7+
WHERE t.typname = 'subscription_status' AND n.nspname = '{{schema}}'
8+
) THEN
9+
create type "{{schema}}"."subscription_status" as enum (
610
'trialing',
711
'active',
812
'canceled',
@@ -15,7 +19,7 @@ BEGIN
1519
END
1620
$$;
1721

18-
create table if not exists "stripe"."subscriptions" (
22+
create table if not exists "{{schema}}"."subscriptions" (
1923
"id" text primary key,
2024
"object" text,
2125
"cancel_at_period_end" boolean,
@@ -26,7 +30,7 @@ create table if not exists "stripe"."subscriptions" (
2630
"metadata" jsonb,
2731
"pending_setup_intent" text,
2832
"pending_update" jsonb,
29-
"status" "stripe"."subscription_status",
33+
"status" "{{schema}}"."subscription_status",
3034
"application_fee_percent" double precision,
3135
"billing_cycle_anchor" integer,
3236
"billing_thresholds" jsonb,
@@ -49,7 +53,7 @@ create table if not exists "stripe"."subscriptions" (
4953
"trial_start" jsonb,
5054

5155
"schedule" text,
52-
"customer" text references "stripe"."customers",
56+
"customer" text references "{{schema}}"."customers",
5357
"latest_invoice" text, -- not yet joined
5458
"plan" text -- not yet joined
5559
);

packages/sync-engine/src/database/migrations/0005_invoices.sql

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11

22
DO $$
33
BEGIN
4-
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'invoice_status') THEN
5-
create type "stripe"."invoice_status" as enum ('draft', 'open', 'paid', 'uncollectible', 'void');
4+
IF NOT EXISTS (
5+
SELECT 1 FROM pg_type t
6+
JOIN pg_namespace n ON t.typnamespace = n.oid
7+
WHERE t.typname = 'invoice_status' AND n.nspname = '{{schema}}'
8+
) THEN
9+
create type "{{schema}}"."invoice_status" as enum ('draft', 'open', 'paid', 'uncollectible', 'void');
610
END IF;
711
END
812
$$;
913

1014

11-
create table if not exists "stripe"."invoices" (
15+
create table if not exists "{{schema}}"."invoices" (
1216
"id" text primary key,
1317
"object" text,
1418
"auto_advance" boolean,
@@ -20,7 +24,7 @@ create table if not exists "stripe"."invoices" (
2024
"metadata" jsonb,
2125
"period_end" integer,
2226
"period_start" integer,
23-
"status" "stripe"."invoice_status",
27+
"status" "{{schema}}"."invoice_status",
2428
"total" bigint,
2529
"account_country" text,
2630
"account_name" text,
@@ -67,8 +71,8 @@ create table if not exists "stripe"."invoices" (
6771
"transfer_data" jsonb,
6872
"webhooks_delivered_at" integer,
6973

70-
"customer" text references "stripe"."customers",
71-
"subscription" text references "stripe"."subscriptions",
74+
"customer" text references "{{schema}}"."customers",
75+
"subscription" text references "{{schema}}"."subscriptions",
7276
"payment_intent" text, -- not yet implemented
7377
"default_payment_method" text, -- not yet implemented
7478
"default_source" text, -- not yet implemented

packages/sync-engine/src/database/migrations/0006_charges.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11

2-
create table if not exists "stripe".charges (
2+
create table if not exists "{{schema}}"."charges" (
33
id text primary key,
44
object text,
55
card jsonb,

packages/sync-engine/src/database/migrations/0007_coupons.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
create table if not exists "stripe".coupons (
1+
create table if not exists "{{schema}}"."coupons" (
22
id text primary key,
33
object text,
44
name text,

packages/sync-engine/src/database/migrations/0008_disputes.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
create table if not exists "stripe".disputes (
1+
create table if not exists "{{schema}}"."disputes" (
22
id text primary key,
33
object text,
44
amount bigint,

0 commit comments

Comments
 (0)