Skip to content

Commit fc0335b

Browse files
authored
Merge pull request #23 from get-convex/ian/run-sync
Ian/run sync
2 parents cb2d7ea + 8ed051d commit fc0335b

File tree

7 files changed

+205
-18
lines changed

7 files changed

+205
-18
lines changed

README.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,4 +442,57 @@ await migrations.getStatus(ctx, { migrations: ["myNewMutation"] });
442442
await migrations.cancel(ctx, "myNewMutation");
443443
```
444444

445+
## Running migrations synchronously
446+
447+
If you want to run a migration synchronously from a test or action, you can use
448+
`runToCompletion`. Note that if the action crashes or is canceled, it will not
449+
continue migrating in the background.
450+
451+
From an action:
452+
453+
```ts
454+
import { components, internal } from "./_generated/api";
455+
import { internalAction } from "./_generated/server";
456+
import { runToCompletion } from "@convex-dev/migrations";
457+
458+
export const myAction = internalAction({
459+
args: {},
460+
handler: async (ctx) => {
461+
//...
462+
const toRun = internal.example.setDefaultValue;
463+
await runToCompletion(ctx, components.migrations, toRun);
464+
},
465+
});
466+
```
467+
468+
In a test:
469+
470+
```ts
471+
import { test } from "vitest";
472+
import { convexTest } from "convex-test";
473+
import component from "@convex-dev/migrations/test";
474+
import { runToCompletion } from "@convex-dev/migrations";
475+
import { components, internal } from "./_generated/api";
476+
import schema from "./schema";
477+
478+
test("test setDefaultValue migration", async () => {
479+
const t = convexTest(schema);
480+
// Register the component in the test instance
481+
component.register(t);
482+
483+
await t.run(async (ctx) => {
484+
// Add sample data to migrate
485+
await ctx.db.insert("myTable", { optionalField: undefined });
486+
487+
// Run the migration to completion
488+
const migrationToTest = internal.example.setDefaultValue;
489+
await runToCompletion(ctx, components.migrations, migrationToTest);
490+
491+
// Assert that the migration was successful by checking the data
492+
const docs = await ctx.db.query("myTable").collect();
493+
expect(docs.every((doc) => doc.optionalField !== undefined)).toBe(true);
494+
});
495+
});
496+
```
497+
445498
<!-- END: Include on https://convex.dev/components -->

example/convex/example.test.ts

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
22
import { initConvexTest } from "./setup.test";
3-
import { internal } from "./_generated/api";
4-
import { migrations } from "./example";
3+
import { components, internal } from "./_generated/api";
4+
import { runToCompletion } from "@convex-dev/migrations";
5+
import { createFunctionHandle, getFunctionName } from "convex/server";
56

67
describe("example", () => {
78
beforeEach(async () => {
@@ -12,7 +13,7 @@ describe("example", () => {
1213
vi.useRealTimers();
1314
});
1415

15-
test("setDefaultValue", async () => {
16+
test("test setDefaultValue migration", async () => {
1617
const t = initConvexTest();
1718
await t.mutation(internal.example.seed, { count: 10 });
1819
await t.run(async (ctx) => {
@@ -21,12 +22,51 @@ describe("example", () => {
2122
expect(docs.some((doc) => doc.optionalField === undefined)).toBe(true);
2223
});
2324
await t.run(async (ctx) => {
24-
await migrations.runOne(ctx, internal.example.setDefaultValue, {
25+
await runToCompletion(
26+
ctx,
27+
components.migrations,
28+
internal.example.setDefaultValue,
29+
{ batchSize: 2 },
30+
);
31+
});
32+
await t.run(async (ctx) => {
33+
const after = await ctx.db.query("myTable").collect();
34+
expect(after).toHaveLength(10);
35+
expect(after.every((doc) => doc.optionalField !== undefined)).toBe(true);
36+
});
37+
});
38+
39+
test("test failingMigration", async () => {
40+
const t = initConvexTest();
41+
await t.mutation(internal.example.seed, { count: 10 });
42+
await expect(
43+
t.run(async (ctx) => {
44+
await runToCompletion(
45+
ctx,
46+
components.migrations,
47+
internal.example.failingMigration,
48+
);
49+
}),
50+
).rejects.toThrow("This migration fails after the first");
51+
});
52+
53+
test("test migrating with function handle", async () => {
54+
const t = initConvexTest();
55+
await t.mutation(internal.example.seed, { count: 10 });
56+
await t.run(async (ctx) => {
57+
const docs = await ctx.db.query("myTable").collect();
58+
expect(docs).toHaveLength(10);
59+
expect(docs.some((doc) => doc.optionalField === undefined)).toBe(true);
60+
});
61+
await t.run(async (ctx) => {
62+
const fnHandle = await createFunctionHandle(
63+
internal.example.setDefaultValue,
64+
);
65+
await runToCompletion(ctx, components.migrations, fnHandle, {
66+
name: getFunctionName(internal.example.setDefaultValue),
2567
batchSize: 2,
26-
dryRun: false,
2768
});
2869
});
29-
await t.finishAllScheduledFunctions(vi.runAllTimers);
3070
await t.run(async (ctx) => {
3171
const after = await ctx.db.query("myTable").collect();
3272
expect(after).toHaveLength(10);

package-lock.json

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
"@vitejs/plugin-react": "5.0.4",
7272
"chokidar-cli": "3.0.0",
7373
"convex": "1.29.0",
74-
"convex-test": "0.0.38",
74+
"convex-test": "^0.0.40-alpha.0",
7575
"cpy-cli": "6.0.0",
7676
"eslint": "9.39.1",
7777
"eslint-plugin-react": "7.37.5",

src/client/index.ts

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
type GenericDataModel,
77
type GenericMutationCtx,
88
type GenericQueryCtx,
9+
getFunctionAddress,
910
getFunctionName,
1011
internalMutationGeneric,
1112
makeFunctionReference,
@@ -27,6 +28,7 @@ export type { MigrationArgs, MigrationResult, MigrationStatus };
2728
import { ConvexError, type GenericId } from "convex/values";
2829
import type { ComponentApi } from "../component/_generated/component.js";
2930
import { logStatusAndInstructions } from "./log.js";
31+
import type { MigrationFunctionHandle } from "../component/lib.js";
3032

3133
// Note: this value is hard-coded in the docstring below. Please keep in sync.
3234
export const DEFAULT_BATCH_SIZE = 100;
@@ -609,6 +611,92 @@ export type MigrationFunctionReference = FunctionReference<
609611
MigrationArgs
610612
>;
611613

614+
/**
615+
* Start a migration and run it synchronously until it's done.
616+
* If this function crashes, the migration will not continue.
617+
*
618+
* ```ts
619+
* // In an action
620+
* const toRun = internal.migrations.myMigration;
621+
* await runToCompletion(ctx, components.migrations, toRun);
622+
* ```
623+
*
624+
* If it's already in progress, it will no-op.
625+
* If you run a migration that had previously failed which was part of a series,
626+
* it will not resume the series.
627+
* To resume a series, call the series again: {@link Migrations.runSerially}.
628+
*
629+
* Note: It's up to you to determine if it's safe to run a migration while
630+
* others are in progress. It won't run multiple instances of the same migration
631+
* but it currently allows running multiple migrations on the same table.
632+
*
633+
* @param ctx Context from an action.
634+
* @param component The migrations component, usually `components.migrations`.
635+
* @param fnRef The migration function to run. Like `internal.migrations.foo`.
636+
* @param opts Options to start the migration.
637+
* It's helpful to see what it would do without committing the transaction.
638+
*/
639+
export async function runToCompletion(
640+
ctx: ActionCtx,
641+
component: ComponentApi,
642+
fnRef: MigrationFunctionReference | MigrationFunctionHandle,
643+
opts?: {
644+
/**
645+
* The name of the migration function, generated with getFunctionName.
646+
*/
647+
name?: string;
648+
/**
649+
* The cursor to start from.
650+
* null: start from the beginning.
651+
* undefined: start, or resume from where it failed. No-ops if already done.
652+
*/
653+
cursor?: string | null;
654+
/**
655+
* The number of documents to process in a batch.
656+
* Overrides the migrations's configured batch size.
657+
*/
658+
batchSize?: number;
659+
/**
660+
* If true, it will run a batch and then throw an error.
661+
* It's helpful to see what it would do without committing the transaction.
662+
*/
663+
dryRun?: boolean;
664+
},
665+
): Promise<MigrationStatus> {
666+
let cursor = opts?.cursor;
667+
const {
668+
name = getFunctionName(fnRef),
669+
batchSize,
670+
dryRun = false,
671+
} = opts ?? {};
672+
const address = getFunctionAddress(fnRef);
673+
const fnHandle =
674+
address.functionHandle ?? (await createFunctionHandle(fnRef));
675+
while (true) {
676+
const status = await ctx.runMutation(component.lib.migrate, {
677+
name,
678+
fnHandle,
679+
cursor,
680+
batchSize,
681+
dryRun,
682+
oneBatchOnly: true,
683+
});
684+
if (status.isDone) {
685+
return status;
686+
}
687+
if (status.error) {
688+
throw new Error(status.error);
689+
}
690+
if (!status.cursor || status.cursor === cursor) {
691+
throw new Error(
692+
"Invariant violation: Migration did not make progress." +
693+
`\nStatus: ${JSON.stringify(status)}`,
694+
);
695+
}
696+
cursor = status.cursor;
697+
}
698+
}
699+
612700
/* Type utils follow */
613701

614702
type QueryCtx = Pick<GenericQueryCtx<GenericDataModel>, "runQuery">;

src/component/_generated/component.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ export type ComponentApi<Name extends string | undefined = string | undefined> =
9595
fnHandle: string;
9696
name: string;
9797
next?: Array<{ fnHandle: string; name: string }>;
98+
oneBatchOnly?: boolean;
9899
},
99100
{
100101
batchSize?: number;

src/component/lib.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const runMigrationArgs = {
2727
cursor: v.optional(v.union(v.string(), v.null())),
2828

2929
batchSize: v.optional(v.number()),
30+
oneBatchOnly: v.optional(v.boolean()),
3031
next: v.optional(
3132
v.array(
3233
v.object({
@@ -43,7 +44,7 @@ export const migrate = mutation({
4344
returns: migrationStatus,
4445
handler: async (ctx, args) => {
4546
// Step 1: Get or create the state.
46-
const { fnHandle, batchSize, next: next_, dryRun, ...initialState } = args;
47+
const { fnHandle, batchSize, next: next_, dryRun, name } = args;
4748
if (batchSize !== undefined && batchSize <= 0) {
4849
throw new Error("Batch size must be greater than 0");
4950
}
@@ -58,11 +59,11 @@ export const migrate = mutation({
5859
const state =
5960
(await ctx.db
6061
.query("migrations")
61-
.withIndex("name", (q) => q.eq("name", args.name))
62+
.withIndex("name", (q) => q.eq("name", name))
6263
.unique()) ??
6364
(await ctx.db.get(
6465
await ctx.db.insert("migrations", {
65-
...initialState,
66+
name,
6667
cursor: args.cursor ?? null,
6768
isDone: false,
6869
processed: 0,
@@ -122,7 +123,9 @@ export const migrate = mutation({
122123
}
123124

124125
// Step 3: Schedule the next batch or next migration.
125-
if (!state.isDone) {
126+
if (args.oneBatchOnly) {
127+
state.workerId = undefined;
128+
} else if (!state.isDone) {
126129
// Recursively schedule the next batch.
127130
state.workerId = await ctx.scheduler.runAfter(0, api.lib.migrate, {
128131
...args,
@@ -159,7 +162,7 @@ export const migrate = mutation({
159162
}
160163
} else {
161164
console.info(
162-
`Migration ${args.name} is done.` +
165+
`Migration ${name} is done.` +
163166
(i < next.length ? ` Next: ${next[i]!.name}` : ""),
164167
);
165168
}
@@ -171,7 +174,7 @@ export const migrate = mutation({
171174
updateState(e.data.result);
172175
} else {
173176
state.error = e instanceof Error ? e.message : String(e);
174-
console.error(`Migration ${args.name} failed: ${state.error}`);
177+
console.error(`Migration ${name} failed: ${state.error}`);
175178
}
176179
if (dryRun) {
177180
const status = await getMigrationState(ctx, state);
@@ -296,7 +299,9 @@ async function cancelMigration(ctx: MutationCtx, migration: Doc<"migrations">) {
296299
return state;
297300
}
298301
if (state.state === "inProgress") {
299-
await ctx.scheduler.cancel(migration.workerId!);
302+
if (!migration.workerId) {
303+
await ctx.scheduler.cancel(migration.workerId!);
304+
}
300305
console.log(`Canceled migration ${migration.name}`);
301306
return { ...state, state: "canceled" as const };
302307
}

0 commit comments

Comments
 (0)