Skip to content

Commit ee56281

Browse files
committed
runSynchronously
1 parent cb2d7ea commit ee56281

File tree

6 files changed

+115
-20
lines changed

6 files changed

+115
-20
lines changed

example/convex/example.test.ts

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
22
import { initConvexTest } from "./setup.test";
3-
import { internal } from "./_generated/api";
4-
import { migrations } from "./example";
3+
import { components, internal } from "./_generated/api";
4+
import { runSynchronously } from "@convex-dev/migrations";
55

66
describe("example", () => {
77
beforeEach(async () => {
@@ -12,7 +12,7 @@ describe("example", () => {
1212
vi.useRealTimers();
1313
});
1414

15-
test("setDefaultValue", async () => {
15+
test("test setDefaultValue migration", async () => {
1616
const t = initConvexTest();
1717
await t.mutation(internal.example.seed, { count: 10 });
1818
await t.run(async (ctx) => {
@@ -21,16 +21,31 @@ describe("example", () => {
2121
expect(docs.some((doc) => doc.optionalField === undefined)).toBe(true);
2222
});
2323
await t.run(async (ctx) => {
24-
await migrations.runOne(ctx, internal.example.setDefaultValue, {
25-
batchSize: 2,
26-
dryRun: false,
27-
});
24+
await runSynchronously(
25+
ctx,
26+
components.migrations,
27+
internal.example.setDefaultValue,
28+
{ batchSize: 2 },
29+
);
2830
});
29-
await t.finishAllScheduledFunctions(vi.runAllTimers);
3031
await t.run(async (ctx) => {
3132
const after = await ctx.db.query("myTable").collect();
3233
expect(after).toHaveLength(10);
3334
expect(after.every((doc) => doc.optionalField !== undefined)).toBe(true);
3435
});
3536
});
37+
38+
test("test failingMigration", async () => {
39+
const t = initConvexTest();
40+
await t.mutation(internal.example.seed, { count: 10 });
41+
await expect(
42+
t.run(async (ctx) => {
43+
await runSynchronously(
44+
ctx,
45+
components.migrations,
46+
internal.example.failingMigration,
47+
);
48+
}),
49+
).rejects.toThrow("This migration fails after the first");
50+
});
3651
});

package-lock.json

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
"@vitejs/plugin-react": "5.0.4",
7272
"chokidar-cli": "3.0.0",
7373
"convex": "1.29.0",
74-
"convex-test": "0.0.38",
74+
"convex-test": "^0.0.40-alpha.0",
7575
"cpy-cli": "6.0.0",
7676
"eslint": "9.39.1",
7777
"eslint-plugin-react": "7.37.5",

src/client/index.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,80 @@ export type MigrationFunctionReference = FunctionReference<
609609
MigrationArgs
610610
>;
611611

612+
/**
613+
* Start a migration and run it synchronously until it's done.
614+
* If this function crashes, the migration will not continue.
615+
*
616+
* ```ts
617+
* // In an action
618+
* const toRun = internal.migrations.myMigration;
619+
* await runSynchronously(ctx, components.migrations, toRun);
620+
* ```
621+
*
622+
* If it's already in progress, it will no-op.
623+
* If you run a migration that had previously failed which was part of a series,
624+
* it will not resume the series.
625+
* To resume a series, call the series again: {@link Migrations.runSerially}.
626+
*
627+
* Note: It's up to you to determine if it's safe to run a migration while
628+
* others are in progress. It won't run multiple instance of the same migration
629+
* but it currently allows running multiple migrations on the same table.
630+
*
631+
* @param ctx Context from an action.
632+
* @param component The migrations component, usually `components.migrations`.
633+
* @param fnRef The migration function to run. Like `internal.migrations.foo`.
634+
* @param opts Options to start the migration.
635+
* It's helpful to see what it would do without committing the transaction.
636+
*/
637+
export async function runSynchronously(
638+
ctx: ActionCtx,
639+
component: ComponentApi,
640+
fnRef: MigrationFunctionReference,
641+
opts?: {
642+
/**
643+
* The cursor to start from.
644+
* null: start from the beginning.
645+
* undefined: start, or resume from where it failed. No-ops if already done.
646+
*/
647+
cursor?: string | null;
648+
/**
649+
* The number of documents to process in a batch.
650+
* Overrides the migrations's configured batch size.
651+
*/
652+
batchSize?: number;
653+
/**
654+
* If true, it will run a batch and then throw an error.
655+
* It's helpful to see what it would do without committing the transaction.
656+
*/
657+
dryRun?: boolean;
658+
},
659+
) {
660+
let cursor = opts?.cursor;
661+
while (true) {
662+
const status = await ctx.runMutation(component.lib.migrate, {
663+
name: getFunctionName(fnRef),
664+
fnHandle: await createFunctionHandle(fnRef),
665+
cursor,
666+
batchSize: opts?.batchSize,
667+
dryRun: opts?.dryRun ?? false,
668+
oneBatchOnly: true,
669+
});
670+
if (status.isDone) {
671+
return status;
672+
}
673+
if (status.error) {
674+
throw new Error(status.error);
675+
}
676+
if (!status.cursor || status.cursor === cursor) {
677+
throw new Error(
678+
"Invariant violation: Migration did not make progress." +
679+
`\nStatus: ${JSON.stringify(status)}`,
680+
);
681+
}
682+
cursor = status.cursor;
683+
}
684+
}
685+
612686
/* Type utils follow */
613687

614688
type QueryCtx = Pick<GenericQueryCtx<GenericDataModel>, "runQuery">;

src/component/_generated/component.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ export type ComponentApi<Name extends string | undefined = string | undefined> =
9595
fnHandle: string;
9696
name: string;
9797
next?: Array<{ fnHandle: string; name: string }>;
98+
oneBatchOnly?: boolean;
9899
},
99100
{
100101
batchSize?: number;

src/component/lib.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const runMigrationArgs = {
2727
cursor: v.optional(v.union(v.string(), v.null())),
2828

2929
batchSize: v.optional(v.number()),
30+
oneBatchOnly: v.optional(v.boolean()),
3031
next: v.optional(
3132
v.array(
3233
v.object({
@@ -43,7 +44,7 @@ export const migrate = mutation({
4344
returns: migrationStatus,
4445
handler: async (ctx, args) => {
4546
// Step 1: Get or create the state.
46-
const { fnHandle, batchSize, next: next_, dryRun, ...initialState } = args;
47+
const { fnHandle, batchSize, next: next_, dryRun, name } = args;
4748
if (batchSize !== undefined && batchSize <= 0) {
4849
throw new Error("Batch size must be greater than 0");
4950
}
@@ -58,11 +59,11 @@ export const migrate = mutation({
5859
const state =
5960
(await ctx.db
6061
.query("migrations")
61-
.withIndex("name", (q) => q.eq("name", args.name))
62+
.withIndex("name", (q) => q.eq("name", name))
6263
.unique()) ??
6364
(await ctx.db.get(
6465
await ctx.db.insert("migrations", {
65-
...initialState,
66+
name,
6667
cursor: args.cursor ?? null,
6768
isDone: false,
6869
processed: 0,
@@ -122,7 +123,9 @@ export const migrate = mutation({
122123
}
123124

124125
// Step 3: Schedule the next batch or next migration.
125-
if (!state.isDone) {
126+
if (args.oneBatchOnly) {
127+
state.workerId = undefined;
128+
} else if (!state.isDone) {
126129
// Recursively schedule the next batch.
127130
state.workerId = await ctx.scheduler.runAfter(0, api.lib.migrate, {
128131
...args,
@@ -159,7 +162,7 @@ export const migrate = mutation({
159162
}
160163
} else {
161164
console.info(
162-
`Migration ${args.name} is done.` +
165+
`Migration ${name} is done.` +
163166
(i < next.length ? ` Next: ${next[i]!.name}` : ""),
164167
);
165168
}
@@ -171,7 +174,7 @@ export const migrate = mutation({
171174
updateState(e.data.result);
172175
} else {
173176
state.error = e instanceof Error ? e.message : String(e);
174-
console.error(`Migration ${args.name} failed: ${state.error}`);
177+
console.error(`Migration ${name} failed: ${state.error}`);
175178
}
176179
if (dryRun) {
177180
const status = await getMigrationState(ctx, state);
@@ -296,7 +299,9 @@ async function cancelMigration(ctx: MutationCtx, migration: Doc<"migrations">) {
296299
return state;
297300
}
298301
if (state.state === "inProgress") {
299-
await ctx.scheduler.cancel(migration.workerId!);
302+
if (!migration.workerId) {
303+
await ctx.scheduler.cancel(migration.workerId!);
304+
}
300305
console.log(`Canceled migration ${migration.name}`);
301306
return { ...state, state: "canceled" as const };
302307
}

0 commit comments

Comments
 (0)