Skip to content

Commit e76c75e

Browse files
Adding opts to torchx k8s scheduler to allow overriding cpu/memory overhead and AWS EFA device count.
Differential Revision: D88564180 Pull Request resolved: #1174
1 parent 2b2af7c commit e76c75e

File tree

2 files changed

+151
-20
lines changed

2 files changed

+151
-20
lines changed

torchx/schedulers/kubernetes_scheduler.py

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,14 @@ def sanitize_for_serialization(obj: object) -> object:
294294
return api.sanitize_for_serialization(obj)
295295

296296

297-
def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod":
297+
def role_to_pod(
298+
name: str,
299+
role: Role,
300+
service_account: Optional[str],
301+
reserved_millicpu: int = RESERVED_MILLICPU,
302+
reserved_memmb: int = RESERVED_MEMMB,
303+
efa_device_count: Optional[int] = None,
304+
) -> "V1Pod":
298305
from kubernetes.client.models import ( # noqa: F811 redefinition of unused
299306
V1Container,
300307
V1ContainerPort,
@@ -324,18 +331,29 @@ def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod
324331
if resource.cpu > 0:
325332
mcpu = int(resource.cpu * 1000)
326333
limits["cpu"] = f"{mcpu}m"
327-
request_mcpu = max(mcpu - RESERVED_MILLICPU, 0)
334+
request_mcpu = max(mcpu - reserved_millicpu, 0)
328335
requests["cpu"] = f"{request_mcpu}m"
329336
if resource.memMB > 0:
330337
limits["memory"] = f"{int(resource.memMB)}M"
331-
request_memMB = max(int(resource.memMB) - RESERVED_MEMMB, 0)
338+
request_memMB = max(int(resource.memMB) - reserved_memmb, 0)
332339
requests["memory"] = f"{request_memMB}M"
333340
if resource.gpu > 0:
334341
requests["nvidia.com/gpu"] = limits["nvidia.com/gpu"] = str(resource.gpu)
335342

343+
EFA_DEVICE = "vpc.amazonaws.com/efa"
336344
for device_name, device_limit in resource.devices.items():
337345
limits[device_name] = str(device_limit)
338346

347+
# Handle EFA device count override:
348+
# - None (default): use whatever count is in the resource spec (already added above)
349+
# - 0: remove EFA devices entirely
350+
# - N > 0: set EFA device count to N (override or add)
351+
if efa_device_count is not None:
352+
if efa_device_count == 0:
353+
limits.pop(EFA_DEVICE, None)
354+
else:
355+
limits[EFA_DEVICE] = str(efa_device_count)
356+
339357
resources = V1ResourceRequirements(
340358
limits=limits,
341359
requests=requests,
@@ -475,6 +493,9 @@ def app_to_resource(
475493
queue: str,
476494
service_account: Optional[str],
477495
priority_class: Optional[str] = None,
496+
reserved_millicpu: int = RESERVED_MILLICPU,
497+
reserved_memmb: int = RESERVED_MEMMB,
498+
efa_device_count: Optional[int] = None,
478499
) -> Dict[str, Any]:
479500
"""
480501
app_to_resource creates a volcano job kubernetes resource definition from
@@ -507,7 +528,14 @@ def app_to_resource(
507528
replica_role.env["TORCHX_RANK0_HOST"] = "localhost"
508529
replica_role.env["TORCHX_IMAGE"] = replica_role.image
509530

510-
pod = role_to_pod(name, replica_role, service_account)
531+
pod = role_to_pod(
532+
name,
533+
replica_role,
534+
service_account,
535+
reserved_millicpu,
536+
reserved_memmb,
537+
efa_device_count,
538+
)
511539
if k8s_metadata := role.metadata.get("kubernetes"):
512540
if isinstance(k8s_metadata, str):
513541
import fsspec
@@ -589,6 +617,9 @@ class KubernetesOpts(TypedDict, total=False):
589617
service_account: Optional[str]
590618
priority_class: Optional[str]
591619
validate_spec: Optional[bool]
620+
reserved_millicpu: Optional[int]
621+
reserved_memmb: Optional[int]
622+
efa_device_count: Optional[int]
592623

593624

594625
class KubernetesScheduler(DockerWorkspaceMixin, Scheduler[KubernetesOpts]):
@@ -783,7 +814,26 @@ def _submit_dryrun(
783814
priority_class, str
784815
), "priority_class must be a str"
785816

786-
resource = app_to_resource(app, queue, service_account, priority_class)
817+
reserved_millicpu = cfg.get("reserved_millicpu", RESERVED_MILLICPU)
818+
assert isinstance(reserved_millicpu, int), "reserved_millicpu must be an int"
819+
820+
reserved_memmb = cfg.get("reserved_memmb", RESERVED_MEMMB)
821+
assert isinstance(reserved_memmb, int), "reserved_memmb must be an int"
822+
823+
efa_device_count = cfg.get("efa_device_count")
824+
assert efa_device_count is None or isinstance(
825+
efa_device_count, int
826+
), "efa_device_count must be an int or None"
827+
828+
resource = app_to_resource(
829+
app,
830+
queue,
831+
service_account,
832+
priority_class,
833+
reserved_millicpu,
834+
reserved_memmb,
835+
efa_device_count,
836+
)
787837

788838
if cfg.get("validate_spec"):
789839
try:
@@ -889,6 +939,25 @@ def _run_opts(self) -> runopts:
889939
help="Validate job spec using Kubernetes API dry-run before submission",
890940
default=True,
891941
)
942+
opts.add(
943+
"reserved_millicpu",
944+
type_=int,
945+
help="Amount of CPU in millicores to reserve for Kubernetes system overhead (default: 100)",
946+
default=RESERVED_MILLICPU,
947+
)
948+
opts.add(
949+
"reserved_memmb",
950+
type_=int,
951+
help="Amount of memory in MB to reserve for Kubernetes system overhead (default: 1024)",
952+
default=RESERVED_MEMMB,
953+
)
954+
opts.add(
955+
"efa_device_count",
956+
type_=int,
957+
help="EFA device count override: None/unset=use resource spec, "
958+
"0=remove EFA, N>0=set EFA count to N",
959+
default=None,
960+
)
892961
return opts
893962

894963
def describe(self, app_id: str) -> Optional[DescribeAppResponse]:

torchx/schedulers/test/kubernetes_scheduler_test.py

Lines changed: 77 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -476,32 +476,91 @@ def test_device_mounts(self) -> None:
476476
)
477477
self.assertTrue(pod.spec.containers[0].security_context.privileged)
478478

479-
def test_resource_devices(self) -> None:
480-
scheduler = create_scheduler("test")
481-
482-
role = specs.Role(
479+
def test_efa_device_override(self) -> None:
480+
"""Test EFA device count can be overridden via efa_device_count parameter."""
481+
role_with_efa = specs.Role(
483482
name="foo",
484483
image="",
485484
resource=specs.Resource(
486485
cpu=2,
487486
memMB=3000,
488487
gpu=4,
489-
devices={
490-
"vpc.amazonaws.com/efa": 4,
491-
},
488+
devices={"vpc.amazonaws.com/efa": 4},
492489
),
493490
)
491+
role_without_efa = specs.Role(
492+
name="foo",
493+
image="",
494+
resource=specs.Resource(cpu=2, memMB=3000, gpu=4),
495+
)
496+
497+
# Default: use resource spec's EFA count (or no EFA if not in spec)
498+
pod = role_to_pod("foo", role_with_efa, service_account="")
499+
self.assertEqual(
500+
pod.spec.containers[0].resources.limits["vpc.amazonaws.com/efa"], "4"
501+
)
502+
503+
pod = role_to_pod("foo", role_without_efa, service_account="")
504+
self.assertNotIn(
505+
"vpc.amazonaws.com/efa", pod.spec.containers[0].resources.limits
506+
)
507+
508+
# Override to 0: remove EFA entirely
509+
pod = role_to_pod("foo", role_with_efa, service_account="", efa_device_count=0)
510+
self.assertNotIn(
511+
"vpc.amazonaws.com/efa", pod.spec.containers[0].resources.limits
512+
)
513+
514+
# Override to different count: use override value
515+
pod = role_to_pod("foo", role_with_efa, service_account="", efa_device_count=8)
516+
self.assertEqual(
517+
pod.spec.containers[0].resources.limits["vpc.amazonaws.com/efa"], "8"
518+
)
519+
520+
# Add EFA when not in resource spec
521+
pod = role_to_pod(
522+
"foo", role_without_efa, service_account="", efa_device_count=32
523+
)
524+
self.assertEqual(
525+
pod.spec.containers[0].resources.limits["vpc.amazonaws.com/efa"], "32"
526+
)
527+
528+
def test_reserved_resources_override(self) -> None:
529+
"""Test that reserved_millicpu and reserved_memmb overrides work correctly."""
530+
role = specs.Role(
531+
name="foo",
532+
image="",
533+
resource=specs.Resource(cpu=2, gpu=0, memMB=3000),
534+
)
535+
536+
# Default: 100 millicpu and 1024 memmb reserved
494537
pod = role_to_pod("foo", role, service_account="")
538+
self.assertEqual(pod.spec.containers[0].resources.limits["cpu"], "2000m")
539+
self.assertEqual(pod.spec.containers[0].resources.limits["memory"], "3000M")
495540
self.assertEqual(
496-
pod.spec.containers[0].resources.limits,
497-
{
498-
"cpu": "2000m",
499-
"memory": "3000M",
500-
"nvidia.com/gpu": "4",
501-
"vpc.amazonaws.com/efa": "4",
502-
},
541+
pod.spec.containers[0].resources.requests["cpu"], "1900m"
542+
) # 2000 - 100
543+
self.assertEqual(
544+
pod.spec.containers[0].resources.requests["memory"], "1976M"
545+
) # 3000 - 1024
546+
547+
# Custom overrides for both CPU and memory
548+
pod = role_to_pod(
549+
"foo", role, service_account="", reserved_millicpu=300, reserved_memmb=1000
550+
)
551+
self.assertEqual(
552+
pod.spec.containers[0].resources.requests["cpu"], "1700m"
553+
) # 2000 - 300
554+
self.assertEqual(
555+
pod.spec.containers[0].resources.requests["memory"], "2000M"
556+
) # 3000 - 1000
557+
558+
# Zero reserved: requests equal limits
559+
pod = role_to_pod(
560+
"foo", role, service_account="", reserved_millicpu=0, reserved_memmb=0
503561
)
504-
self.assertFalse(pod.spec.containers[0].security_context.privileged)
562+
self.assertEqual(pod.spec.containers[0].resources.requests["cpu"], "2000m")
563+
self.assertEqual(pod.spec.containers[0].resources.requests["memory"], "3000M")
505564

506565
def test_instance_type(self) -> None:
507566
scheduler = create_scheduler("test")
@@ -797,6 +856,9 @@ def test_runopts(self) -> None:
797856
"service_account",
798857
"priority_class",
799858
"validate_spec",
859+
"reserved_millicpu",
860+
"reserved_memmb",
861+
"efa_device_count",
800862
},
801863
)
802864

0 commit comments

Comments
 (0)