Skip to content
Merged
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ Join our discord community via [this invite link](https://discord.gg/bxgXW8jJGh)
| <a name="input_key_name"></a> [key\_name](#input\_key\_name) | Key pair name | `string` | `null` | no |
| <a name="input_kms_key_arn"></a> [kms\_key\_arn](#input\_kms\_key\_arn) | Optional CMK Key ARN to be used for Parameter Store. This key must be in the current account. | `string` | `null` | no |
| <a name="input_lambda_architecture"></a> [lambda\_architecture](#input\_lambda\_architecture) | AWS Lambda architecture. Lambda functions using Graviton processors ('arm64') tend to have better price/performance than 'x86\_64' functions. | `string` | `"arm64"` | no |
| <a name="input_lambda_event_source_mapping_batch_size"></a> [lambda\_event\_source\_mapping\_batch\_size](#input\_lambda\_event\_source\_mapping\_batch\_size) | Maximum number of records to pass to the lambda function in a single batch for the event source mapping. When not set, the AWS default of 10 events will be used. | `number` | `10` | no |
| <a name="input_lambda_event_source_mapping_maximum_batching_window_in_seconds"></a> [lambda\_event\_source\_mapping\_maximum\_batching\_window\_in\_seconds](#input\_lambda\_event\_source\_mapping\_maximum\_batching\_window\_in\_seconds) | Maximum amount of time to gather records before invoking the lambda function, in seconds. AWS requires this to be greater than 0 if batch\_size is greater than 10. Defaults to 0. | `number` | `0` | no |
| <a name="input_lambda_principals"></a> [lambda\_principals](#input\_lambda\_principals) | (Optional) add extra principals to the role created for execution of the lambda, e.g. for local testing. | <pre>list(object({<br/> type = string<br/> identifiers = list(string)<br/> }))</pre> | `[]` | no |
| <a name="input_lambda_runtime"></a> [lambda\_runtime](#input\_lambda\_runtime) | AWS Lambda runtime. | `string` | `"nodejs22.x"` | no |
| <a name="input_lambda_s3_bucket"></a> [lambda\_s3\_bucket](#input\_lambda\_s3\_bucket) | S3 bucket from which to specify lambda functions. This is an alternative to providing local files directly. | `string` | `null` | no |
Expand Down
130 changes: 97 additions & 33 deletions lambdas/functions/control-plane/src/aws/runners.test.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
import { tracer } from '@aws-github-runner/aws-powertools-util';
import {
CreateFleetCommand,
CreateFleetCommandInput,
CreateFleetInstance,
CreateFleetResult,
type CreateFleetCommandInput,
type CreateFleetInstance,
type CreateFleetResult,
CreateTagsCommand,
type DefaultTargetCapacityType,
DeleteTagsCommand,
DefaultTargetCapacityType,
DescribeInstancesCommand,
DescribeInstancesResult,
type DescribeInstancesResult,
EC2Client,
SpotAllocationStrategy,
TerminateInstancesCommand,
} from '@aws-sdk/client-ec2';
import { GetParameterCommand, GetParameterResult, PutParameterCommand, SSMClient } from '@aws-sdk/client-ssm';
import { tracer } from '@aws-github-runner/aws-powertools-util';
import { GetParameterCommand, type GetParameterResult, PutParameterCommand, SSMClient } from '@aws-sdk/client-ssm';
import { mockClient } from 'aws-sdk-client-mock';
import 'aws-sdk-client-mock-jest/vitest';

import { beforeEach, describe, expect, it, vi } from 'vitest';
import ScaleError from './../scale-runners/ScaleError';
import { createRunner, listEC2Runners, tag, untag, terminateRunner } from './runners';
import { RunnerInfo, RunnerInputParameters, RunnerType } from './runners.d';
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { createRunner, listEC2Runners, tag, terminateRunner, untag } from './runners';
import type { RunnerInfo, RunnerInputParameters, RunnerType } from './runners.d';

process.env.AWS_REGION = 'eu-east-1';
const mockEC2Client = mockClient(EC2Client);
Expand Down Expand Up @@ -110,7 +110,10 @@ describe('list instances', () => {

it('check orphan tag.', async () => {
const instances: DescribeInstancesResult = mockRunningInstances;
instances.Reservations![0].Instances![0].Tags!.push({ Key: 'ghr:orphan', Value: 'true' });
instances.Reservations![0].Instances![0].Tags!.push({
Key: 'ghr:orphan',
Value: 'true',
});
mockEC2Client.on(DescribeInstancesCommand).resolves(instances);

const resp = await listEC2Runners();
Expand All @@ -132,7 +135,11 @@ describe('list instances', () => {

it('filters instances on repo name', async () => {
mockEC2Client.on(DescribeInstancesCommand).resolves(mockRunningInstances);
await listEC2Runners({ runnerType: 'Repo', runnerOwner: REPO_NAME, environment: undefined });
await listEC2Runners({
runnerType: 'Repo',
runnerOwner: REPO_NAME,
environment: undefined,
});
expect(mockEC2Client).toHaveReceivedCommandWith(DescribeInstancesCommand, {
Filters: [
{ Name: 'instance-state-name', Values: ['running', 'pending'] },
Expand All @@ -145,7 +152,11 @@ describe('list instances', () => {

it('filters instances on org name', async () => {
mockEC2Client.on(DescribeInstancesCommand).resolves(mockRunningInstances);
await listEC2Runners({ runnerType: 'Org', runnerOwner: ORG_NAME, environment: undefined });
await listEC2Runners({
runnerType: 'Org',
runnerOwner: ORG_NAME,
environment: undefined,
});
expect(mockEC2Client).toHaveReceivedCommandWith(DescribeInstancesCommand, {
Filters: [
{ Name: 'instance-state-name', Values: ['running', 'pending'] },
Expand Down Expand Up @@ -249,7 +260,9 @@ describe('terminate runner', () => {
};
await terminateRunner(runner.instanceId);

expect(mockEC2Client).toHaveReceivedCommandWith(TerminateInstancesCommand, { InstanceIds: [runner.instanceId] });
expect(mockEC2Client).toHaveReceivedCommandWith(TerminateInstancesCommand, {
InstanceIds: [runner.instanceId],
});
});
});

Expand Down Expand Up @@ -324,7 +337,10 @@ describe('create runner', () => {
await createRunner(createRunnerConfig({ ...defaultRunnerConfig, type: type }));

expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, {
...expectedCreateFleetRequest({ ...defaultExpectedFleetRequestValues, type: type }),
...expectedCreateFleetRequest({
...defaultExpectedFleetRequestValues,
type: type,
}),
});
});

Expand All @@ -333,24 +349,36 @@ describe('create runner', () => {

mockEC2Client.on(CreateFleetCommand).resolves({ Instances: instances });

await createRunner({ ...createRunnerConfig(defaultRunnerConfig), numberOfRunners: 2 });
await createRunner({
...createRunnerConfig(defaultRunnerConfig),
numberOfRunners: 2,
});

expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, {
...expectedCreateFleetRequest({ ...defaultExpectedFleetRequestValues, totalTargetCapacity: 2 }),
...expectedCreateFleetRequest({
...defaultExpectedFleetRequestValues,
totalTargetCapacity: 2,
}),
});
});

it('calls create fleet of 1 instance with the on-demand capacity', async () => {
await createRunner(createRunnerConfig({ ...defaultRunnerConfig, capacityType: 'on-demand' }));
expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, {
...expectedCreateFleetRequest({ ...defaultExpectedFleetRequestValues, capacityType: 'on-demand' }),
...expectedCreateFleetRequest({
...defaultExpectedFleetRequestValues,
capacityType: 'on-demand',
}),
});
});

it('calls run instances with the on-demand capacity', async () => {
await createRunner(createRunnerConfig({ ...defaultRunnerConfig, maxSpotPrice: '0.1' }));
expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, {
...expectedCreateFleetRequest({ ...defaultExpectedFleetRequestValues, maxSpotPrice: '0.1' }),
...expectedCreateFleetRequest({
...defaultExpectedFleetRequestValues,
maxSpotPrice: '0.1',
}),
});
});

Expand All @@ -367,8 +395,16 @@ describe('create runner', () => {
},
};
mockSSMClient.on(GetParameterCommand).resolves(paramValue);
await createRunner(createRunnerConfig({ ...defaultRunnerConfig, amiIdSsmParameterName: 'my-ami-id-param' }));
const expectedRequest = expectedCreateFleetRequest({ ...defaultExpectedFleetRequestValues, imageId: 'ami-123' });
await createRunner(
createRunnerConfig({
...defaultRunnerConfig,
amiIdSsmParameterName: 'my-ami-id-param',
}),
);
const expectedRequest = expectedCreateFleetRequest({
...defaultExpectedFleetRequestValues,
imageId: 'ami-123',
});
expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, expectedRequest);
expect(mockSSMClient).toHaveReceivedCommandWith(GetParameterCommand, {
Name: 'my-ami-id-param',
Expand All @@ -380,7 +416,10 @@ describe('create runner', () => {
await createRunner(createRunnerConfig({ ...defaultRunnerConfig, tracingEnabled: true }));

expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, {
...expectedCreateFleetRequest({ ...defaultExpectedFleetRequestValues, tracingEnabled: true }),
...expectedCreateFleetRequest({
...defaultExpectedFleetRequestValues,
tracingEnabled: true,
}),
});
});
});
Expand Down Expand Up @@ -419,9 +458,12 @@ describe('create runner with errors', () => {
});

it('test ScaleError with multiple error.', async () => {
createFleetMockWithErrors(['UnfulfillableCapacity', 'SomeError']);
createFleetMockWithErrors(['UnfulfillableCapacity', 'MaxSpotInstanceCountExceeded', 'NotMappedError']);

await expect(createRunner(createRunnerConfig(defaultRunnerConfig))).rejects.toBeInstanceOf(ScaleError);
await expect(createRunner(createRunnerConfig(defaultRunnerConfig))).rejects.toMatchObject({
name: 'ScaleError',
failedInstanceCount: 2,
});
expect(mockEC2Client).toHaveReceivedCommandWith(
CreateFleetCommand,
expectedCreateFleetRequest(defaultExpectedFleetRequestValues),
Expand Down Expand Up @@ -465,7 +507,12 @@ describe('create runner with errors', () => {
mockSSMClient.on(GetParameterCommand).rejects(new Error('Some error'));

await expect(
createRunner(createRunnerConfig({ ...defaultRunnerConfig, amiIdSsmParameterName: 'my-ami-id-param' })),
createRunner(
createRunnerConfig({
...defaultRunnerConfig,
amiIdSsmParameterName: 'my-ami-id-param',
}),
),
).rejects.toBeInstanceOf(Error);
expect(mockEC2Client).not.toHaveReceivedCommand(CreateFleetCommand);
expect(mockSSMClient).not.toHaveReceivedCommand(PutParameterCommand);
Expand Down Expand Up @@ -530,7 +577,7 @@ describe('create runner with errors fail over to OnDemand', () => {
}),
});

// second call with with OnDemand failback
// second call with with OnDemand fallback
expect(mockEC2Client).toHaveReceivedNthCommandWith(2, CreateFleetCommand, {
...expectedCreateFleetRequest({
...defaultExpectedFleetRequestValues,
Expand All @@ -540,17 +587,25 @@ describe('create runner with errors fail over to OnDemand', () => {
});
});

it('test InsufficientInstanceCapacity no failback.', async () => {
it('test InsufficientInstanceCapacity no fallback.', async () => {
await expect(
createRunner(createRunnerConfig({ ...defaultRunnerConfig, onDemandFailoverOnError: [] })),
createRunner(
createRunnerConfig({
...defaultRunnerConfig,
onDemandFailoverOnError: [],
}),
),
).rejects.toBeInstanceOf(Error);
});

it('test InsufficientInstanceCapacity with mutlipte instances and fallback to on demand .', async () => {
it('test InsufficientInstanceCapacity with multiple instances and fallback to on demand .', async () => {
const instancesIds = ['i-123', 'i-456'];
createFleetMockWithWithOnDemandFallback(['InsufficientInstanceCapacity'], instancesIds);

const instancesResult = await createRunner({ ...createRunnerConfig(defaultRunnerConfig), numberOfRunners: 2 });
const instancesResult = await createRunner({
...createRunnerConfig(defaultRunnerConfig),
numberOfRunners: 2,
});
expect(instancesResult).toEqual(instancesIds);

expect(mockEC2Client).toHaveReceivedCommandTimes(CreateFleetCommand, 2);
Expand Down Expand Up @@ -580,7 +635,10 @@ describe('create runner with errors fail over to OnDemand', () => {
createFleetMockWithWithOnDemandFallback(['UnfulfillableCapacity'], instancesIds);

await expect(
createRunner({ ...createRunnerConfig(defaultRunnerConfig), numberOfRunners: 2 }),
createRunner({
...createRunnerConfig(defaultRunnerConfig),
numberOfRunners: 2,
}),
).rejects.toBeInstanceOf(Error);

expect(mockEC2Client).toHaveReceivedCommandTimes(CreateFleetCommand, 1);
Expand Down Expand Up @@ -626,7 +684,10 @@ function createFleetMockWithWithOnDemandFallback(errors: string[], instances?: s

mockEC2Client
.on(CreateFleetCommand)
.resolvesOnce({ Instances: [instanceesFirstCall], Errors: errors.map((e) => ({ ErrorCode: e })) })
.resolvesOnce({
Instances: [instanceesFirstCall],
Errors: errors.map((e) => ({ ErrorCode: e })),
})
.resolvesOnce({ Instances: [instancesSecondCall] });
}

Expand Down Expand Up @@ -673,7 +734,10 @@ interface ExpectedFleetRequestValues {
function expectedCreateFleetRequest(expectedValues: ExpectedFleetRequestValues): CreateFleetCommandInput {
const tags = [
{ Key: 'ghr:Application', Value: 'github-action-runner' },
{ Key: 'ghr:created_by', Value: expectedValues.totalTargetCapacity > 1 ? 'pool-lambda' : 'scale-up-lambda' },
{
Key: 'ghr:created_by',
Value: expectedValues.totalTargetCapacity > 1 ? 'pool-lambda' : 'scale-up-lambda',
},
{ Key: 'ghr:Type', Value: expectedValues.type },
{ Key: 'ghr:Owner', Value: REPO_NAME },
];
Expand Down
101 changes: 55 additions & 46 deletions lambdas/functions/control-plane/src/aws/runners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,53 +166,62 @@ async function processFleetResult(
): Promise<string[]> {
const instances: string[] = fleet.Instances?.flatMap((i) => i.InstanceIds?.flatMap((j) => j) || []) || [];

if (instances.length !== runnerParameters.numberOfRunners) {
logger.warn(
`${
instances.length === 0 ? 'No' : instances.length + ' off ' + runnerParameters.numberOfRunners
} instances created.`,
{ data: fleet },
);
const errors = fleet.Errors?.flatMap((e) => e.ErrorCode || '') || [];

// Educated guess of errors that would make sense to retry based on the list
// https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html
const scaleErrors = [
'UnfulfillableCapacity',
'MaxSpotInstanceCountExceeded',
'TargetCapacityLimitExceededException',
'RequestLimitExceeded',
'ResourceLimitExceeded',
'MaxSpotInstanceCountExceeded',
'MaxSpotFleetRequestCountExceeded',
'InsufficientInstanceCapacity',
];

if (
errors.some((e) => runnerParameters.onDemandFailoverOnError?.includes(e)) &&
runnerParameters.ec2instanceCriteria.targetCapacityType === 'spot'
) {
logger.warn(`Create fleet failed, initatiing fall back to on demand instances.`);
logger.debug('Create fleet failed.', { data: fleet.Errors });
const numberOfInstances = runnerParameters.numberOfRunners - instances.length;
const instancesOnDemand = await createRunner({
...runnerParameters,
numberOfRunners: numberOfInstances,
onDemandFailoverOnError: ['InsufficientInstanceCapacity'],
ec2instanceCriteria: { ...runnerParameters.ec2instanceCriteria, targetCapacityType: 'on-demand' },
});
instances.push(...instancesOnDemand);
return instances;
} else if (errors.some((e) => scaleErrors.includes(e))) {
logger.warn('Create fleet failed, ScaleError will be thrown to trigger retry for ephemeral runners.');
logger.debug('Create fleet failed.', { data: fleet.Errors });
throw new ScaleError('Failed to create instance, create fleet failed.');
} else {
logger.warn('Create fleet failed, error not recognized as scaling error.', { data: fleet.Errors });
throw Error('Create fleet failed, no instance created.');
}
if (instances.length === runnerParameters.numberOfRunners) {
return instances;
}
return instances;

logger.warn(
`${
instances.length === 0 ? 'No' : instances.length + ' off ' + runnerParameters.numberOfRunners
} instances created.`,
{ data: fleet },
);

const errors = fleet.Errors?.flatMap((e) => e.ErrorCode || '') || [];

if (
errors.some((e) => runnerParameters.onDemandFailoverOnError?.includes(e)) &&
runnerParameters.ec2instanceCriteria.targetCapacityType === 'spot'
) {
logger.warn(`Create fleet failed, initatiing fall back to on demand instances.`);
logger.debug('Create fleet failed.', { data: fleet.Errors });
const numberOfInstances = runnerParameters.numberOfRunners - instances.length;
const instancesOnDemand = await createRunner({
...runnerParameters,
numberOfRunners: numberOfInstances,
onDemandFailoverOnError: ['InsufficientInstanceCapacity'],
ec2instanceCriteria: { ...runnerParameters.ec2instanceCriteria, targetCapacityType: 'on-demand' },
});
instances.push(...instancesOnDemand);
return instances;
}

// Educated guess of errors that would make sense to retry based on the list
// https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html
const scaleErrors = [
'UnfulfillableCapacity',
'MaxSpotInstanceCountExceeded',
'TargetCapacityLimitExceededException',
'RequestLimitExceeded',
'ResourceLimitExceeded',
'MaxSpotInstanceCountExceeded',
'MaxSpotFleetRequestCountExceeded',
'InsufficientInstanceCapacity',
];

const failedCount = countScaleErrors(errors, scaleErrors);
if (failedCount > 0) {
logger.warn('Create fleet failed, ScaleError will be thrown to trigger retry for ephemeral runners.');
logger.debug('Create fleet failed.', { data: fleet.Errors });
throw new ScaleError(failedCount);
}

logger.warn('Create fleet failed, error not recognized as scaling error.', { data: fleet.Errors });
throw Error('Create fleet failed, no instance created.');
}

function countScaleErrors(errors: string[], scaleErrors: string[]): number {
return errors.reduce((acc, e) => (scaleErrors.includes(e) ? acc + 1 : acc), 0);
}

async function getAmiIdOverride(runnerParameters: Runners.RunnerInputParameters): Promise<string | undefined> {
Expand Down
Loading