Skip to content

Commit 7149b77

Browse files
iainlanenpalmdependabot[bot]github-actions[bot]
authored
feat(control-plane)!: add support for handling multiple events in a single invocation (#4603)
Currently we restrict the `scale-up` Lambda to only handle a single event at a time. In very busy environments this can prove to be a bottleneck: there are calls to GitHub and AWS APIs that happen each time, and they can end up taking long enough that we can't process job queued events faster than they arrive. In our environment we are also using a pool, and typically we have responded to the alerts generated by this (SQS queue length growing) by expanding the size of the pool. This helps because we will more frequently find that we don't need to scale up, which allows the lambdas to exit a bit earlier, so we can get through the queue faster. But it makes the environment much less responsive to changes in usage patterns. At its core, this Lambda's task is to construct an EC2 `CreateFleet` call to create instances, after working out how many are needed. This is a job that can be batched. We can take any number of events, calculate the diff between our current state and the number of jobs we have, capping at the maximum, and then issue a single call. The thing to be careful about is how to handle partial failures, if EC2 creates some of the instances we wanted but not all of them. Lambda has a configurable function response type which can be set to `ReportBatchItemFailures`. In this mode, we return a list of failed messages from our handler and those are retried. We can make use of this to give back as many events as we failed to process. Now we're potentially processing multiple events in a single Lambda, one thing we should optimise for is not recreating GitHub API clients. We need one client for the app itself, which we use to find out installation IDs, and then one client for each installation which is relevant to the batch of events we are processing. This is done by creating a new client the first time we see an event for a given installation. We also remove the same `batch_size = 1` constraint from the `job-retry` Lambda. This Lambda is used to retry events that previously failed. However, instead of reporting failures to be retried, here we maintain the pre-existing fault-tolerant behaviour where errors are logged but explicitly do not cause message retries, avoiding infinite loops from persistent GitHub API issues or malformed events. Tests are added for all of this. Tests in a private repo (sorry) look good. This was running ephemeral runners with no pool, SSM high throughput enabled, the job queued check \_dis_abled, batch size of 200, wait time of 10 seconds. The workflow runs are each a matrix with 250 jobs. ![image](https://github.com/user-attachments/assets/0a656e99-8f1e-45e2-924b-0d5c1b6d6afb) --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: Niek Palm <npalm@users.noreply.github.com> Co-authored-by: Niek Palm <niek.palm@philips.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 4f827de commit 7149b77

File tree

27 files changed

+1727
-512
lines changed

27 files changed

+1727
-512
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ Join our discord community via [this invite link](https://discord.gg/bxgXW8jJGh)
155155
| <a name="input_key_name"></a> [key\_name](#input\_key\_name) | Key pair name | `string` | `null` | no |
156156
| <a name="input_kms_key_arn"></a> [kms\_key\_arn](#input\_kms\_key\_arn) | Optional CMK Key ARN to be used for Parameter Store. This key must be in the current account. | `string` | `null` | no |
157157
| <a name="input_lambda_architecture"></a> [lambda\_architecture](#input\_lambda\_architecture) | AWS Lambda architecture. Lambda functions using Graviton processors ('arm64') tend to have better price/performance than 'x86\_64' functions. | `string` | `"arm64"` | no |
158+
| <a name="input_lambda_event_source_mapping_batch_size"></a> [lambda\_event\_source\_mapping\_batch\_size](#input\_lambda\_event\_source\_mapping\_batch\_size) | Maximum number of records to pass to the lambda function in a single batch for the event source mapping. When not set, the AWS default of 10 events will be used. | `number` | `10` | no |
159+
| <a name="input_lambda_event_source_mapping_maximum_batching_window_in_seconds"></a> [lambda\_event\_source\_mapping\_maximum\_batching\_window\_in\_seconds](#input\_lambda\_event\_source\_mapping\_maximum\_batching\_window\_in\_seconds) | Maximum amount of time to gather records before invoking the lambda function, in seconds. AWS requires this to be greater than 0 if batch\_size is greater than 10. Defaults to 0. | `number` | `0` | no |
158160
| <a name="input_lambda_principals"></a> [lambda\_principals](#input\_lambda\_principals) | (Optional) add extra principals to the role created for execution of the lambda, e.g. for local testing. | <pre>list(object({<br/> type = string<br/> identifiers = list(string)<br/> }))</pre> | `[]` | no |
159161
| <a name="input_lambda_runtime"></a> [lambda\_runtime](#input\_lambda\_runtime) | AWS Lambda runtime. | `string` | `"nodejs22.x"` | no |
160162
| <a name="input_lambda_s3_bucket"></a> [lambda\_s3\_bucket](#input\_lambda\_s3\_bucket) | S3 bucket from which to specify lambda functions. This is an alternative to providing local files directly. | `string` | `null` | no |

lambdas/functions/control-plane/src/aws/runners.test.ts

Lines changed: 97 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,26 @@
1+
import { tracer } from '@aws-github-runner/aws-powertools-util';
12
import {
23
CreateFleetCommand,
3-
CreateFleetCommandInput,
4-
CreateFleetInstance,
5-
CreateFleetResult,
4+
type CreateFleetCommandInput,
5+
type CreateFleetInstance,
6+
type CreateFleetResult,
67
CreateTagsCommand,
8+
type DefaultTargetCapacityType,
79
DeleteTagsCommand,
8-
DefaultTargetCapacityType,
910
DescribeInstancesCommand,
10-
DescribeInstancesResult,
11+
type DescribeInstancesResult,
1112
EC2Client,
1213
SpotAllocationStrategy,
1314
TerminateInstancesCommand,
1415
} from '@aws-sdk/client-ec2';
15-
import { GetParameterCommand, GetParameterResult, PutParameterCommand, SSMClient } from '@aws-sdk/client-ssm';
16-
import { tracer } from '@aws-github-runner/aws-powertools-util';
16+
import { GetParameterCommand, type GetParameterResult, PutParameterCommand, SSMClient } from '@aws-sdk/client-ssm';
1717
import { mockClient } from 'aws-sdk-client-mock';
1818
import 'aws-sdk-client-mock-jest/vitest';
1919

20+
import { beforeEach, describe, expect, it, vi } from 'vitest';
2021
import ScaleError from './../scale-runners/ScaleError';
21-
import { createRunner, listEC2Runners, tag, untag, terminateRunner } from './runners';
22-
import { RunnerInfo, RunnerInputParameters, RunnerType } from './runners.d';
23-
import { describe, it, expect, beforeEach, vi } from 'vitest';
22+
import { createRunner, listEC2Runners, tag, terminateRunner, untag } from './runners';
23+
import type { RunnerInfo, RunnerInputParameters, RunnerType } from './runners.d';
2424

2525
process.env.AWS_REGION = 'eu-east-1';
2626
const mockEC2Client = mockClient(EC2Client);
@@ -110,7 +110,10 @@ describe('list instances', () => {
110110

111111
it('check orphan tag.', async () => {
112112
const instances: DescribeInstancesResult = mockRunningInstances;
113-
instances.Reservations![0].Instances![0].Tags!.push({ Key: 'ghr:orphan', Value: 'true' });
113+
instances.Reservations![0].Instances![0].Tags!.push({
114+
Key: 'ghr:orphan',
115+
Value: 'true',
116+
});
114117
mockEC2Client.on(DescribeInstancesCommand).resolves(instances);
115118

116119
const resp = await listEC2Runners();
@@ -132,7 +135,11 @@ describe('list instances', () => {
132135

133136
it('filters instances on repo name', async () => {
134137
mockEC2Client.on(DescribeInstancesCommand).resolves(mockRunningInstances);
135-
await listEC2Runners({ runnerType: 'Repo', runnerOwner: REPO_NAME, environment: undefined });
138+
await listEC2Runners({
139+
runnerType: 'Repo',
140+
runnerOwner: REPO_NAME,
141+
environment: undefined,
142+
});
136143
expect(mockEC2Client).toHaveReceivedCommandWith(DescribeInstancesCommand, {
137144
Filters: [
138145
{ Name: 'instance-state-name', Values: ['running', 'pending'] },
@@ -145,7 +152,11 @@ describe('list instances', () => {
145152

146153
it('filters instances on org name', async () => {
147154
mockEC2Client.on(DescribeInstancesCommand).resolves(mockRunningInstances);
148-
await listEC2Runners({ runnerType: 'Org', runnerOwner: ORG_NAME, environment: undefined });
155+
await listEC2Runners({
156+
runnerType: 'Org',
157+
runnerOwner: ORG_NAME,
158+
environment: undefined,
159+
});
149160
expect(mockEC2Client).toHaveReceivedCommandWith(DescribeInstancesCommand, {
150161
Filters: [
151162
{ Name: 'instance-state-name', Values: ['running', 'pending'] },
@@ -249,7 +260,9 @@ describe('terminate runner', () => {
249260
};
250261
await terminateRunner(runner.instanceId);
251262

252-
expect(mockEC2Client).toHaveReceivedCommandWith(TerminateInstancesCommand, { InstanceIds: [runner.instanceId] });
263+
expect(mockEC2Client).toHaveReceivedCommandWith(TerminateInstancesCommand, {
264+
InstanceIds: [runner.instanceId],
265+
});
253266
});
254267
});
255268

@@ -324,7 +337,10 @@ describe('create runner', () => {
324337
await createRunner(createRunnerConfig({ ...defaultRunnerConfig, type: type }));
325338

326339
expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, {
327-
...expectedCreateFleetRequest({ ...defaultExpectedFleetRequestValues, type: type }),
340+
...expectedCreateFleetRequest({
341+
...defaultExpectedFleetRequestValues,
342+
type: type,
343+
}),
328344
});
329345
});
330346

@@ -333,24 +349,36 @@ describe('create runner', () => {
333349

334350
mockEC2Client.on(CreateFleetCommand).resolves({ Instances: instances });
335351

336-
await createRunner({ ...createRunnerConfig(defaultRunnerConfig), numberOfRunners: 2 });
352+
await createRunner({
353+
...createRunnerConfig(defaultRunnerConfig),
354+
numberOfRunners: 2,
355+
});
337356

338357
expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, {
339-
...expectedCreateFleetRequest({ ...defaultExpectedFleetRequestValues, totalTargetCapacity: 2 }),
358+
...expectedCreateFleetRequest({
359+
...defaultExpectedFleetRequestValues,
360+
totalTargetCapacity: 2,
361+
}),
340362
});
341363
});
342364

343365
it('calls create fleet of 1 instance with the on-demand capacity', async () => {
344366
await createRunner(createRunnerConfig({ ...defaultRunnerConfig, capacityType: 'on-demand' }));
345367
expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, {
346-
...expectedCreateFleetRequest({ ...defaultExpectedFleetRequestValues, capacityType: 'on-demand' }),
368+
...expectedCreateFleetRequest({
369+
...defaultExpectedFleetRequestValues,
370+
capacityType: 'on-demand',
371+
}),
347372
});
348373
});
349374

350375
it('calls run instances with the on-demand capacity', async () => {
351376
await createRunner(createRunnerConfig({ ...defaultRunnerConfig, maxSpotPrice: '0.1' }));
352377
expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, {
353-
...expectedCreateFleetRequest({ ...defaultExpectedFleetRequestValues, maxSpotPrice: '0.1' }),
378+
...expectedCreateFleetRequest({
379+
...defaultExpectedFleetRequestValues,
380+
maxSpotPrice: '0.1',
381+
}),
354382
});
355383
});
356384

@@ -367,8 +395,16 @@ describe('create runner', () => {
367395
},
368396
};
369397
mockSSMClient.on(GetParameterCommand).resolves(paramValue);
370-
await createRunner(createRunnerConfig({ ...defaultRunnerConfig, amiIdSsmParameterName: 'my-ami-id-param' }));
371-
const expectedRequest = expectedCreateFleetRequest({ ...defaultExpectedFleetRequestValues, imageId: 'ami-123' });
398+
await createRunner(
399+
createRunnerConfig({
400+
...defaultRunnerConfig,
401+
amiIdSsmParameterName: 'my-ami-id-param',
402+
}),
403+
);
404+
const expectedRequest = expectedCreateFleetRequest({
405+
...defaultExpectedFleetRequestValues,
406+
imageId: 'ami-123',
407+
});
372408
expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, expectedRequest);
373409
expect(mockSSMClient).toHaveReceivedCommandWith(GetParameterCommand, {
374410
Name: 'my-ami-id-param',
@@ -380,7 +416,10 @@ describe('create runner', () => {
380416
await createRunner(createRunnerConfig({ ...defaultRunnerConfig, tracingEnabled: true }));
381417

382418
expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, {
383-
...expectedCreateFleetRequest({ ...defaultExpectedFleetRequestValues, tracingEnabled: true }),
419+
...expectedCreateFleetRequest({
420+
...defaultExpectedFleetRequestValues,
421+
tracingEnabled: true,
422+
}),
384423
});
385424
});
386425
});
@@ -419,9 +458,12 @@ describe('create runner with errors', () => {
419458
});
420459

421460
it('test ScaleError with multiple error.', async () => {
422-
createFleetMockWithErrors(['UnfulfillableCapacity', 'SomeError']);
461+
createFleetMockWithErrors(['UnfulfillableCapacity', 'MaxSpotInstanceCountExceeded', 'NotMappedError']);
423462

424-
await expect(createRunner(createRunnerConfig(defaultRunnerConfig))).rejects.toBeInstanceOf(ScaleError);
463+
await expect(createRunner(createRunnerConfig(defaultRunnerConfig))).rejects.toMatchObject({
464+
name: 'ScaleError',
465+
failedInstanceCount: 2,
466+
});
425467
expect(mockEC2Client).toHaveReceivedCommandWith(
426468
CreateFleetCommand,
427469
expectedCreateFleetRequest(defaultExpectedFleetRequestValues),
@@ -465,7 +507,12 @@ describe('create runner with errors', () => {
465507
mockSSMClient.on(GetParameterCommand).rejects(new Error('Some error'));
466508

467509
await expect(
468-
createRunner(createRunnerConfig({ ...defaultRunnerConfig, amiIdSsmParameterName: 'my-ami-id-param' })),
510+
createRunner(
511+
createRunnerConfig({
512+
...defaultRunnerConfig,
513+
amiIdSsmParameterName: 'my-ami-id-param',
514+
}),
515+
),
469516
).rejects.toBeInstanceOf(Error);
470517
expect(mockEC2Client).not.toHaveReceivedCommand(CreateFleetCommand);
471518
expect(mockSSMClient).not.toHaveReceivedCommand(PutParameterCommand);
@@ -530,7 +577,7 @@ describe('create runner with errors fail over to OnDemand', () => {
530577
}),
531578
});
532579

533-
// second call with with OnDemand failback
580+
// second call with with OnDemand fallback
534581
expect(mockEC2Client).toHaveReceivedNthCommandWith(2, CreateFleetCommand, {
535582
...expectedCreateFleetRequest({
536583
...defaultExpectedFleetRequestValues,
@@ -540,17 +587,25 @@ describe('create runner with errors fail over to OnDemand', () => {
540587
});
541588
});
542589

543-
it('test InsufficientInstanceCapacity no failback.', async () => {
590+
it('test InsufficientInstanceCapacity no fallback.', async () => {
544591
await expect(
545-
createRunner(createRunnerConfig({ ...defaultRunnerConfig, onDemandFailoverOnError: [] })),
592+
createRunner(
593+
createRunnerConfig({
594+
...defaultRunnerConfig,
595+
onDemandFailoverOnError: [],
596+
}),
597+
),
546598
).rejects.toBeInstanceOf(Error);
547599
});
548600

549-
it('test InsufficientInstanceCapacity with mutlipte instances and fallback to on demand .', async () => {
601+
it('test InsufficientInstanceCapacity with multiple instances and fallback to on demand .', async () => {
550602
const instancesIds = ['i-123', 'i-456'];
551603
createFleetMockWithWithOnDemandFallback(['InsufficientInstanceCapacity'], instancesIds);
552604

553-
const instancesResult = await createRunner({ ...createRunnerConfig(defaultRunnerConfig), numberOfRunners: 2 });
605+
const instancesResult = await createRunner({
606+
...createRunnerConfig(defaultRunnerConfig),
607+
numberOfRunners: 2,
608+
});
554609
expect(instancesResult).toEqual(instancesIds);
555610

556611
expect(mockEC2Client).toHaveReceivedCommandTimes(CreateFleetCommand, 2);
@@ -580,7 +635,10 @@ describe('create runner with errors fail over to OnDemand', () => {
580635
createFleetMockWithWithOnDemandFallback(['UnfulfillableCapacity'], instancesIds);
581636

582637
await expect(
583-
createRunner({ ...createRunnerConfig(defaultRunnerConfig), numberOfRunners: 2 }),
638+
createRunner({
639+
...createRunnerConfig(defaultRunnerConfig),
640+
numberOfRunners: 2,
641+
}),
584642
).rejects.toBeInstanceOf(Error);
585643

586644
expect(mockEC2Client).toHaveReceivedCommandTimes(CreateFleetCommand, 1);
@@ -626,7 +684,10 @@ function createFleetMockWithWithOnDemandFallback(errors: string[], instances?: s
626684

627685
mockEC2Client
628686
.on(CreateFleetCommand)
629-
.resolvesOnce({ Instances: [instanceesFirstCall], Errors: errors.map((e) => ({ ErrorCode: e })) })
687+
.resolvesOnce({
688+
Instances: [instanceesFirstCall],
689+
Errors: errors.map((e) => ({ ErrorCode: e })),
690+
})
630691
.resolvesOnce({ Instances: [instancesSecondCall] });
631692
}
632693

@@ -673,7 +734,10 @@ interface ExpectedFleetRequestValues {
673734
function expectedCreateFleetRequest(expectedValues: ExpectedFleetRequestValues): CreateFleetCommandInput {
674735
const tags = [
675736
{ Key: 'ghr:Application', Value: 'github-action-runner' },
676-
{ Key: 'ghr:created_by', Value: expectedValues.totalTargetCapacity > 1 ? 'pool-lambda' : 'scale-up-lambda' },
737+
{
738+
Key: 'ghr:created_by',
739+
Value: expectedValues.totalTargetCapacity > 1 ? 'pool-lambda' : 'scale-up-lambda',
740+
},
677741
{ Key: 'ghr:Type', Value: expectedValues.type },
678742
{ Key: 'ghr:Owner', Value: REPO_NAME },
679743
];

lambdas/functions/control-plane/src/aws/runners.ts

Lines changed: 55 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -166,53 +166,62 @@ async function processFleetResult(
166166
): Promise<string[]> {
167167
const instances: string[] = fleet.Instances?.flatMap((i) => i.InstanceIds?.flatMap((j) => j) || []) || [];
168168

169-
if (instances.length !== runnerParameters.numberOfRunners) {
170-
logger.warn(
171-
`${
172-
instances.length === 0 ? 'No' : instances.length + ' off ' + runnerParameters.numberOfRunners
173-
} instances created.`,
174-
{ data: fleet },
175-
);
176-
const errors = fleet.Errors?.flatMap((e) => e.ErrorCode || '') || [];
177-
178-
// Educated guess of errors that would make sense to retry based on the list
179-
// https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html
180-
const scaleErrors = [
181-
'UnfulfillableCapacity',
182-
'MaxSpotInstanceCountExceeded',
183-
'TargetCapacityLimitExceededException',
184-
'RequestLimitExceeded',
185-
'ResourceLimitExceeded',
186-
'MaxSpotInstanceCountExceeded',
187-
'MaxSpotFleetRequestCountExceeded',
188-
'InsufficientInstanceCapacity',
189-
];
190-
191-
if (
192-
errors.some((e) => runnerParameters.onDemandFailoverOnError?.includes(e)) &&
193-
runnerParameters.ec2instanceCriteria.targetCapacityType === 'spot'
194-
) {
195-
logger.warn(`Create fleet failed, initatiing fall back to on demand instances.`);
196-
logger.debug('Create fleet failed.', { data: fleet.Errors });
197-
const numberOfInstances = runnerParameters.numberOfRunners - instances.length;
198-
const instancesOnDemand = await createRunner({
199-
...runnerParameters,
200-
numberOfRunners: numberOfInstances,
201-
onDemandFailoverOnError: ['InsufficientInstanceCapacity'],
202-
ec2instanceCriteria: { ...runnerParameters.ec2instanceCriteria, targetCapacityType: 'on-demand' },
203-
});
204-
instances.push(...instancesOnDemand);
205-
return instances;
206-
} else if (errors.some((e) => scaleErrors.includes(e))) {
207-
logger.warn('Create fleet failed, ScaleError will be thrown to trigger retry for ephemeral runners.');
208-
logger.debug('Create fleet failed.', { data: fleet.Errors });
209-
throw new ScaleError('Failed to create instance, create fleet failed.');
210-
} else {
211-
logger.warn('Create fleet failed, error not recognized as scaling error.', { data: fleet.Errors });
212-
throw Error('Create fleet failed, no instance created.');
213-
}
169+
if (instances.length === runnerParameters.numberOfRunners) {
170+
return instances;
214171
}
215-
return instances;
172+
173+
logger.warn(
174+
`${
175+
instances.length === 0 ? 'No' : instances.length + ' off ' + runnerParameters.numberOfRunners
176+
} instances created.`,
177+
{ data: fleet },
178+
);
179+
180+
const errors = fleet.Errors?.flatMap((e) => e.ErrorCode || '') || [];
181+
182+
if (
183+
errors.some((e) => runnerParameters.onDemandFailoverOnError?.includes(e)) &&
184+
runnerParameters.ec2instanceCriteria.targetCapacityType === 'spot'
185+
) {
186+
logger.warn(`Create fleet failed, initatiing fall back to on demand instances.`);
187+
logger.debug('Create fleet failed.', { data: fleet.Errors });
188+
const numberOfInstances = runnerParameters.numberOfRunners - instances.length;
189+
const instancesOnDemand = await createRunner({
190+
...runnerParameters,
191+
numberOfRunners: numberOfInstances,
192+
onDemandFailoverOnError: ['InsufficientInstanceCapacity'],
193+
ec2instanceCriteria: { ...runnerParameters.ec2instanceCriteria, targetCapacityType: 'on-demand' },
194+
});
195+
instances.push(...instancesOnDemand);
196+
return instances;
197+
}
198+
199+
// Educated guess of errors that would make sense to retry based on the list
200+
// https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html
201+
const scaleErrors = [
202+
'UnfulfillableCapacity',
203+
'MaxSpotInstanceCountExceeded',
204+
'TargetCapacityLimitExceededException',
205+
'RequestLimitExceeded',
206+
'ResourceLimitExceeded',
207+
'MaxSpotInstanceCountExceeded',
208+
'MaxSpotFleetRequestCountExceeded',
209+
'InsufficientInstanceCapacity',
210+
];
211+
212+
const failedCount = countScaleErrors(errors, scaleErrors);
213+
if (failedCount > 0) {
214+
logger.warn('Create fleet failed, ScaleError will be thrown to trigger retry for ephemeral runners.');
215+
logger.debug('Create fleet failed.', { data: fleet.Errors });
216+
throw new ScaleError(failedCount);
217+
}
218+
219+
logger.warn('Create fleet failed, error not recognized as scaling error.', { data: fleet.Errors });
220+
throw Error('Create fleet failed, no instance created.');
221+
}
222+
223+
function countScaleErrors(errors: string[], scaleErrors: string[]): number {
224+
return errors.reduce((acc, e) => (scaleErrors.includes(e) ? acc + 1 : acc), 0);
216225
}
217226

218227
async function getAmiIdOverride(runnerParameters: Runners.RunnerInputParameters): Promise<string | undefined> {

0 commit comments

Comments
 (0)