From 6f202984d3787f7f14d0ac60b5272a91d81e5a00 Mon Sep 17 00:00:00 2001 From: Antonio Rodriguez Date: Wed, 19 Mar 2025 22:26:26 +0100 Subject: [PATCH 1/4] Parse sinfo output to manage resources --- .../commands/slurm_manage_resources.py | 109 ++++++++++++++++++ coldfront/plugins/slurm/utils.py | 14 +++ 2 files changed, 123 insertions(+) create mode 100644 coldfront/plugins/slurm/management/commands/slurm_manage_resources.py diff --git a/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py b/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py new file mode 100644 index 0000000000..c0ac169e9d --- /dev/null +++ b/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py @@ -0,0 +1,109 @@ +import logging +import os +import re +from functools import reduce +from cProfile import Profile + +from django.core.management.base import BaseCommand, CommandError + +from coldfront.core.resource.models import ResourceType, ResourceAttribute, ResourceAttributeType, AttributeType, Resource +from coldfront.core.project.models import Project +from coldfront.plugins.slurm.utils import slurm_get_nodes_info +from django.utils.datetime_safe import datetime + +logger = logging.getLogger(__name__) + +class Command(BaseCommand): + help = 'Manage slurm resources from sinfo output' + + def get_output_from_file(self, file_path): + try: + keys = None + with open(file_path, 'r') as output_file: + for line in output_file: + if keys is None: + keys = re.sub(r'\s+', ' ', line).strip().lower().split(' ') + else: + values = re.sub(r'\s+', ' ', line).strip().split(' ') + yield dict(zip(keys, values)) + except FileNotFoundError: + print(f"File at {file_path} does not exist. Cant simulate output.") + except IOError as e: + print(f"An error occurred: {e}") + + + def add_arguments(self, parser): + parser.add_argument("-e", "--environment", help="Environment, use dev to simulate output") + parser.add_argument('--profile', action='store_true', default=False) + + def handle(self, *args, **options): + if options.get('profile', False): + profiler = Profile() + profiler.runcall(self._handle, *args, **options) + profiler.print_stats() + else: + self._handle(*args, **options) + + def _handle(self, *args, **options): + def calculate_gpu_count(gres_value): + if 'null' in gres_value: + return 0 + gpu_list = gres_value.split(',') + return reduce(lambda x, y: x + y,[int(gpu_info.split(':')[2].replace('(S','')) for gpu_info in gpu_list]) + + def calculate_cpu_count(row): + if row.get('S:C:T', None) is None: + return 0 + cpu_count = row.get('S:C:T').split(':')[1] + return int(cpu_count) + + def calculate_owner_value(project_list, row): + owner_name = '' + project_name_list = [project.title for project in project_list] + owner_lists = row.get('groups', '').split(',') + owner_project = [name_owner for name_owner in owner_lists if name_owner in project_name_list] + if len(owner_project) > 0: + return owner_project[0] + if {'cluster_users', 'slurm-admin'}.issubset(set(owner_lists)): + return'FASRC' + return owner_name + + env = options['environment'] or 'production' + if 'dev' in env: + output = self.get_output_from_file(os.path.join(os.getcwd(), 'coldfront/plugins/slurm/management/commands/sinfo.txt')) + else: + output = slurm_get_nodes_info() + print(f'Running on {env} mode') + project_list = Project.objects.all() + compute_node, compute_node_created = ResourceType.objects.get_or_create(name='Compute Node', description='Compute Node') + partition_resource_type, partition_created = ResourceType.objects.get_or_create(name='Cluster Partition', description='Cluster Partition') + int_attribute_type = AttributeType.objects.get(name='Int') + text_attribute_type = AttributeType.objects.get(name='Text') + gpu_count_attribute_type, gpu_count_created = ResourceAttributeType.objects.get_or_create(name='GPU Count', defaults={'attribute_type': int_attribute_type}) + core_count_attribute_type, core_count_created = ResourceAttributeType.objects.get_or_create(name='Core Count', defaults={'attribute_type': int_attribute_type}) + features_attribute_type, features_created = ResourceAttributeType.objects.get_or_create(name='Features', defaults={'attribute_type': text_attribute_type}) + owner_attribute_type, owner_created = ResourceAttributeType.objects.get_or_create(name='Owner', defaults={'attribute_type': text_attribute_type}) + service_end_attribute_type, service_end_created = ResourceAttributeType.objects.get_or_create(name='ServiceEnd', defaults={'attribute_type': text_attribute_type}) + processed_resources = set() + bulk_process_resource_attribute = [] + bulk_update_resource = [] + for row in output: + new_resource, compute_node_created_created = Resource.objects.get_or_create(name=row['nodelist'], defaults={'is_allocatable':False, 'resource_type':compute_node}) + Resource.objects.get_or_create(name=row['partition'], defaults={'resource_type':partition_resource_type}) + bulk_process_resource_attribute.append(ResourceAttribute(resource_attribute_type=gpu_count_attribute_type, resource=new_resource, value=calculate_gpu_count(row['gres']))) + bulk_process_resource_attribute.append(ResourceAttribute(resource_attribute_type=core_count_attribute_type, resource=new_resource, value=calculate_cpu_count(row))) + bulk_process_resource_attribute.append(ResourceAttribute(resource_attribute_type=features_attribute_type, resource=new_resource, value=row.get('avail_features', '(null)'))) + bulk_process_resource_attribute.append(ResourceAttribute(resource_attribute_type=owner_attribute_type, resource=new_resource, value=calculate_owner_value(project_list, row))) + if new_resource.is_available is False: + bulk_update_resource.append(Resource(name=row['nodelist'], is_available=True)) + bulk_process_resource_attribute.append(ResourceAttribute(resource=new_resource, value=' ', resource_attribute_type=service_end_attribute_type)) + processed_resources.add(new_resource.name) + ResourceAttribute.objects.bulk_create(bulk_process_resource_attribute, update_conflicts=True, unique_fields=[], update_fields=['value']) + Resource.objects.bulk_create(bulk_update_resource, update_conflicts=True, unique_fields=[], update_fields=['is_available']) + bulk_process_resource_attribute = [] + bulk_update_resource = [] + for resource_to_delete in Resource.objects.exclude(name__in=list(processed_resources)).filter(is_available=True, resource_type=compute_node): + bulk_update_resource.append(Resource(name=resource_to_delete.name, is_available=False)) + bulk_process_resource_attribute.append(ResourceAttribute(resource=resource_to_delete, value=str(datetime.now()), resource_attribute_type=service_end_attribute_type)) + ResourceAttribute.objects.bulk_create(bulk_process_resource_attribute, update_conflicts=True, unique_fields=[], update_fields=['value']) + Resource.objects.bulk_create(bulk_update_resource, update_conflicts=True, unique_fields=[], update_fields=['is_available']) \ No newline at end of file diff --git a/coldfront/plugins/slurm/utils.py b/coldfront/plugins/slurm/utils.py index ee1befd25f..29007d3631 100644 --- a/coldfront/plugins/slurm/utils.py +++ b/coldfront/plugins/slurm/utils.py @@ -1,4 +1,5 @@ import logging +import re import shlex import struct import subprocess @@ -30,6 +31,7 @@ SLURM_CMD_CHECK_ASSOCIATION = SLURM_SACCTMGR_PATH + ' list associations User={} Cluster={} Account={} Format=Cluster,Account,User,QOS -P' SLURM_CMD_BLOCK_ACCOUNT = SLURM_SACCTMGR_PATH + ' -Q -i modify account {} where Cluster={} set GrpSubmitJobs=0' SLURM_CMD_DUMP_CLUSTER = SLURM_SACCTMGR_PATH + ' dump {} file={}' +SLURM_CMD_SINFO_NODES = 'sinfo -N -r --format="%19N %23P %66G %5a %5c %8z %65f %50g"' logger = logging.getLogger(__name__) @@ -100,6 +102,18 @@ def slurm_block_account(cluster, account, noop=False): cmd = SLURM_CMD_BLOCK_ACCOUNT.format(shlex.quote(account), shlex.quote(cluster)) _run_slurm_cmd(cmd, noop=noop) +def slurm_get_nodes_info(): + def table_to_dict_list(table_info): + keys = re.sub(r'\s+', ' ', table_info[0]).strip().lower().split(' ') + values = [re.sub(r'\s+', ' ', row).strip().split(' ') for row in table_info[1:] if len(row)>0] + result_list = [dict(zip(keys, row)) for row in values if len(row)> 0] + return result_list + + cmd = SLURM_CMD_SINFO_NODES + nodes_info = _run_slurm_cmd(cmd, noop=False) + nodes_info = nodes_info.decode('utf-8').split('\n') + return table_to_dict_list(nodes_info) + def slurm_check_assoc(user, cluster, account): cmd = SLURM_CMD_CHECK_ASSOCIATION.format( shlex.quote(user), shlex.quote(cluster), shlex.quote(account) From aef859d8b7e3cdadfd0052023d204006527c70bb Mon Sep 17 00:00:00 2001 From: Antonio Rodriguez Date: Fri, 21 Mar 2025 23:00:43 +0100 Subject: [PATCH 2/4] Add resourceattrtypes to defauls command. Fix parsing of gpu count. Prevent bulk update error --- .../commands/add_resource_defaults.py | 14 ++++--- .../commands/slurm_manage_resources.py | 38 +++++++++++-------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/coldfront/core/resource/management/commands/add_resource_defaults.py b/coldfront/core/resource/management/commands/add_resource_defaults.py index d159b97037..3f3eacbea1 100644 --- a/coldfront/core/resource/management/commands/add_resource_defaults.py +++ b/coldfront/core/resource/management/commands/add_resource_defaults.py @@ -28,17 +28,19 @@ def handle(self, *args, **options): ('file_count', 'Int'), ('allocated_tb', 'Float'), ('ChangeableAllocations', 'Yes/No'), - # ('Core Count', 'Int'), + ('Core Count', 'Int'), + ('GPU Count', 'Int'), # ('expiry_time', 'Int'), # ('fee_applies', 'Yes/No'), - # ('Node Count', 'Int'), - # ('Owner', 'Text'), + #('Node Count', 'Int'), + ('Features', 'Text'), + ('Owner', 'Text'), ('quantity_default_value', 'Int'), ('quantity_label', 'Text'), ('xdmod_resource', 'Text'), # ('eula', 'Text'), # ('OnDemand','Yes/No'), - # ('ServiceEnd', 'Date'), + ('ServiceEnd', 'Date'), # ('ServiceStart', 'Date'), # ('slurm_cluster', 'Text'), # ('slurm_specs', 'Attribute Expanded Text'), @@ -61,8 +63,8 @@ def handle(self, *args, **options): ('Storage Tier', 'Storage Tier',), ('Cloud', 'Cloud Computing'), ('Cluster', 'Cluster servers'), - # ('Cluster Partition', 'Cluster Partition '), - # ('Compute Node', 'Compute Node'), + ('Cluster Partition', 'Cluster Partition '), + ('Compute Node', 'Compute Node'), # ('Server', 'Extra servers providing various services'), # ('Software License', 'Software license purchased by users'), # ('Storage', 'NAS storage'), diff --git a/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py b/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py index c0ac169e9d..67efdc652b 100644 --- a/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py +++ b/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py @@ -31,7 +31,6 @@ def get_output_from_file(self, file_path): except IOError as e: print(f"An error occurred: {e}") - def add_arguments(self, parser): parser.add_argument("-e", "--environment", help="Environment, use dev to simulate output") parser.add_argument('--profile', action='store_true', default=False) @@ -48,7 +47,7 @@ def _handle(self, *args, **options): def calculate_gpu_count(gres_value): if 'null' in gres_value: return 0 - gpu_list = gres_value.split(',') + gpu_list = gres_value.split('),') return reduce(lambda x, y: x + y,[int(gpu_info.split(':')[2].replace('(S','')) for gpu_info in gpu_list]) def calculate_cpu_count(row): @@ -77,13 +76,11 @@ def calculate_owner_value(project_list, row): project_list = Project.objects.all() compute_node, compute_node_created = ResourceType.objects.get_or_create(name='Compute Node', description='Compute Node') partition_resource_type, partition_created = ResourceType.objects.get_or_create(name='Cluster Partition', description='Cluster Partition') - int_attribute_type = AttributeType.objects.get(name='Int') - text_attribute_type = AttributeType.objects.get(name='Text') - gpu_count_attribute_type, gpu_count_created = ResourceAttributeType.objects.get_or_create(name='GPU Count', defaults={'attribute_type': int_attribute_type}) - core_count_attribute_type, core_count_created = ResourceAttributeType.objects.get_or_create(name='Core Count', defaults={'attribute_type': int_attribute_type}) - features_attribute_type, features_created = ResourceAttributeType.objects.get_or_create(name='Features', defaults={'attribute_type': text_attribute_type}) - owner_attribute_type, owner_created = ResourceAttributeType.objects.get_or_create(name='Owner', defaults={'attribute_type': text_attribute_type}) - service_end_attribute_type, service_end_created = ResourceAttributeType.objects.get_or_create(name='ServiceEnd', defaults={'attribute_type': text_attribute_type}) + gpu_count_attribute_type = ResourceAttributeType.objects.get(name='GPU Count') + core_count_attribute_type = ResourceAttributeType.objects.get(name='Core Count') + features_attribute_type = ResourceAttributeType.objects.get(name='Features') + owner_attribute_type = ResourceAttributeType.objects.get(name='Owner') + service_end_attribute_type = ResourceAttributeType.objects.get(name='ServiceEnd') processed_resources = set() bulk_process_resource_attribute = [] bulk_update_resource = [] @@ -95,15 +92,24 @@ def calculate_owner_value(project_list, row): bulk_process_resource_attribute.append(ResourceAttribute(resource_attribute_type=features_attribute_type, resource=new_resource, value=row.get('avail_features', '(null)'))) bulk_process_resource_attribute.append(ResourceAttribute(resource_attribute_type=owner_attribute_type, resource=new_resource, value=calculate_owner_value(project_list, row))) if new_resource.is_available is False: - bulk_update_resource.append(Resource(name=row['nodelist'], is_available=True)) - bulk_process_resource_attribute.append(ResourceAttribute(resource=new_resource, value=' ', resource_attribute_type=service_end_attribute_type)) + bulk_update_resource.append(Resource(name=row['nodelist'], is_available=True, resource_type=compute_node)) + bulk_process_resource_attribute.append(ResourceAttribute(resource=new_resource, value=None, resource_attribute_type=service_end_attribute_type)) processed_resources.add(new_resource.name) - ResourceAttribute.objects.bulk_create(bulk_process_resource_attribute, update_conflicts=True, unique_fields=[], update_fields=['value']) - Resource.objects.bulk_create(bulk_update_resource, update_conflicts=True, unique_fields=[], update_fields=['is_available']) + try: + ResourceAttribute.objects.bulk_create(bulk_process_resource_attribute, update_conflicts=True, unique_fields=[], update_fields=['value']) + Resource.objects.bulk_create(bulk_update_resource, update_conflicts=True, unique_fields=[], update_fields=['is_available']) + except Exception as e: + logger.error(f'Error processing resources info: {str(e)}') + raise bulk_process_resource_attribute = [] bulk_update_resource = [] for resource_to_delete in Resource.objects.exclude(name__in=list(processed_resources)).filter(is_available=True, resource_type=compute_node): - bulk_update_resource.append(Resource(name=resource_to_delete.name, is_available=False)) + resource_to_delete.is_available = False + bulk_update_resource.append(resource_to_delete) bulk_process_resource_attribute.append(ResourceAttribute(resource=resource_to_delete, value=str(datetime.now()), resource_attribute_type=service_end_attribute_type)) - ResourceAttribute.objects.bulk_create(bulk_process_resource_attribute, update_conflicts=True, unique_fields=[], update_fields=['value']) - Resource.objects.bulk_create(bulk_update_resource, update_conflicts=True, unique_fields=[], update_fields=['is_available']) \ No newline at end of file + try: + ResourceAttribute.objects.bulk_create(bulk_process_resource_attribute, update_conflicts=True, unique_fields=[], update_fields=['value']) + Resource.objects.bulk_create(bulk_update_resource, update_conflicts=True, unique_fields=[], update_fields=['is_available']) + except Exception as e: + logger.error(f'Error cleaning up resources: {str(e)}') + raise \ No newline at end of file From 5d2df29665ce7e330236fc72a582867b9da0cc67 Mon Sep 17 00:00:00 2001 From: Antonio Rodriguez Date: Wed, 26 Mar 2025 13:13:40 +0100 Subject: [PATCH 3/4] Use bulk_update_with_history and bulk_create_with_history to generate change history --- .../commands/slurm_manage_resources.py | 104 ++++++++++++++---- 1 file changed, 84 insertions(+), 20 deletions(-) diff --git a/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py b/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py index e4ec9365cb..918c2cd0ea 100644 --- a/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py +++ b/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py @@ -5,6 +5,7 @@ from cProfile import Profile from django.core.management.base import BaseCommand, CommandError +from simple_history.utils import bulk_update_with_history, bulk_create_with_history from coldfront.core.resource.models import ResourceType, ResourceAttribute, ResourceAttributeType, AttributeType, Resource from coldfront.core.project.models import Project @@ -27,9 +28,9 @@ def get_output_from_file(self, file_path): values = re.sub(r'\s+', ' ', line).strip().split(' ') yield dict(zip(keys, values)) except FileNotFoundError: - print(f"File at {file_path} does not exist. Cant simulate output.") + logger.error(f"File at {file_path} does not exist. Cant simulate output.") except IOError as e: - print(f"An error occurred: {e}") + logger.error(f"An error occurred: {e}") def add_arguments(self, parser): parser.add_argument("-e", "--environment", help="Environment, use dev to simulate output") @@ -51,9 +52,9 @@ def calculate_gpu_count(gres_value): return reduce(lambda x, y: x + y,[int(gpu_info.split(':')[2].replace('(S','')) for gpu_info in gpu_list]) def calculate_cpu_count(row): - if row.get('S:C:T', None) is None: + if row.get('s:c:t', None) is None: return 0 - cpu_count = row.get('S:C:T').split(':')[1] + cpu_count = row.get('s:c:t').split(':')[1] return int(cpu_count) def calculate_owner_value(project_list, row): @@ -69,47 +70,110 @@ def calculate_owner_value(project_list, row): env = options['environment'] or 'production' if 'dev' in env: - output = self.get_output_from_file(os.path.join(os.getcwd(), 'coldfront/plugins/slurm/management/commands/sinfo.txt')) + output = self.get_output_from_file(os.path.join(os.getcwd(), 'coldfront/plugins/slurm/management/commands/sinfo_output.txt')) else: output = slurm_get_nodes_info() - print(f'Running on {env} mode') + logger.debug(f'Running on {env} mode') project_list = Project.objects.all() compute_node = ResourceType.objects.get(name='Compute Node') + attribute_type_name_list = ['GPU Count', 'Core Count', 'Features', 'Owner', 'ServiceEnd'] partition_resource_type = ResourceType.objects.get(name='Cluster Partition') gpu_count_attribute_type = ResourceAttributeType.objects.get(name='GPU Count') core_count_attribute_type = ResourceAttributeType.objects.get(name='Core Count') features_attribute_type = ResourceAttributeType.objects.get(name='Features') owner_attribute_type = ResourceAttributeType.objects.get(name='Owner') service_end_attribute_type = ResourceAttributeType.objects.get(name='ServiceEnd') + existing_resource_attributes = list(ResourceAttribute.objects.filter( + resource_attribute_type__name__in=attribute_type_name_list, + resource__resource_type__name='Compute Node' + ).values_list('pk', 'resource__name', 'resource_attribute_type__name') + ) + existing_resource_attributes_check = [f'{resource_att[1]} {resource_att[2]}' for resource_att in existing_resource_attributes] + existing_resource_attributes_pk_map = {f'{resource_att[1]} {resource_att[2]}': resource_att[0] for resource_att in existing_resource_attributes} processed_resources = set() - bulk_process_resource_attribute = [] + bulk_update_resource_attribute = [] + bulk_create_resource_attribute = [] bulk_update_resource = [] + processed_resource_attribute = [] for row in output: new_resource, compute_node_created_created = Resource.objects.get_or_create(name=row['nodelist'], defaults={'is_allocatable':False, 'resource_type':compute_node}) Resource.objects.get_or_create(name=row['partition'], defaults={'resource_type':partition_resource_type}) - bulk_process_resource_attribute.append(ResourceAttribute(resource_attribute_type=gpu_count_attribute_type, resource=new_resource, value=calculate_gpu_count(row['gres']))) - bulk_process_resource_attribute.append(ResourceAttribute(resource_attribute_type=core_count_attribute_type, resource=new_resource, value=calculate_cpu_count(row))) - bulk_process_resource_attribute.append(ResourceAttribute(resource_attribute_type=features_attribute_type, resource=new_resource, value=row.get('avail_features', '(null)'))) - bulk_process_resource_attribute.append(ResourceAttribute(resource_attribute_type=owner_attribute_type, resource=new_resource, value=calculate_owner_value(project_list, row))) + + gpu_count = ResourceAttribute(resource_attribute_type=gpu_count_attribute_type, resource=new_resource, value=calculate_gpu_count(row['gres'])) + gpu_count_key = f"{row['nodelist']} {gpu_count_attribute_type.name}" + if gpu_count_key in existing_resource_attributes_check: + gpu_count.pk = existing_resource_attributes_pk_map[gpu_count_key] + bulk_update_resource_attribute.append(gpu_count) + else: + if gpu_count_key not in processed_resource_attribute: + bulk_create_resource_attribute.append(gpu_count) + processed_resource_attribute.append(gpu_count_key) + + core_count = ResourceAttribute(resource_attribute_type=core_count_attribute_type, resource=new_resource, value=calculate_cpu_count(row)) + core_count_key = f"{row['nodelist']} {core_count_attribute_type.name}" + if core_count_key in existing_resource_attributes_check: + core_count.pk = existing_resource_attributes_pk_map[core_count_key] + bulk_update_resource_attribute.append(core_count) + else: + if core_count_key not in processed_resource_attribute: + bulk_create_resource_attribute.append(core_count) + processed_resource_attribute.append(core_count_key) + + features = ResourceAttribute(resource_attribute_type=features_attribute_type, resource=new_resource, value=row.get('avail_features', '(null)')) + features_key = f"{row['nodelist']} {features_attribute_type.name}" + if features_key in existing_resource_attributes_check: + features.pk = existing_resource_attributes_pk_map[features_key] + bulk_update_resource_attribute.append(features) + else: + if features_key not in processed_resource_attribute: + bulk_create_resource_attribute.append(features) + processed_resource_attribute.append(features_key) + + owner = ResourceAttribute(resource_attribute_type=owner_attribute_type, resource=new_resource, value=calculate_owner_value(project_list, row)) + owner_key = f"{row['nodelist']} {owner_attribute_type.name}" + if owner_key in existing_resource_attributes_check: + owner.pk = existing_resource_attributes_pk_map[owner_key] + bulk_update_resource_attribute.append(owner) + else: + if owner_key not in processed_resource_attribute: + bulk_create_resource_attribute.append(owner) + processed_resource_attribute.append(owner_key) + if new_resource.is_available is False: - bulk_update_resource.append(Resource(name=row['nodelist'], is_available=True, resource_type=compute_node)) - bulk_process_resource_attribute.append(ResourceAttribute(resource=new_resource, value=None, resource_attribute_type=service_end_attribute_type)) + new_resource.is_available = True + bulk_update_resource.append(new_resource) + service_end_pk = existing_resource_attributes_pk_map[f"{row['nodelist']} {service_end_attribute_type.name}"] + bulk_update_resource_attribute.append(ResourceAttribute(resource=new_resource, value=None, resource_attribute_type=service_end_attribute_type, pk=service_end_pk)) processed_resources.add(new_resource.name) try: - ResourceAttribute.objects.bulk_create(bulk_process_resource_attribute, update_conflicts=True, unique_fields=[], update_fields=['value']) - Resource.objects.bulk_create(bulk_update_resource, update_conflicts=True, unique_fields=[], update_fields=['is_available']) + logger.debug(f'Updating {len(bulk_update_resource_attribute)} ResourceAttribute records') + bulk_update_with_history(bulk_update_resource_attribute, ResourceAttribute, ['value'], batch_size=500, default_change_reason='slurm_manage_resource command') + logger.debug(f'Updating {len(bulk_update_resource)} Resource records') + bulk_update_with_history(bulk_update_resource, Resource, ['is_available'], batch_size=500, default_change_reason='slurm_manage_resource command') + logger.debug(f'Creating {len(bulk_create_resource_attribute)} ResourceAttribute records') + bulk_create_with_history(bulk_create_resource_attribute, ResourceAttribute, batch_size=500, default_change_reason='slurm_manage_resource command') except Exception as e: - logger.error(f'Error processing resources info: {str(e)}') + logger.debug(f'Error processing resources info: {str(e)}') raise - bulk_process_resource_attribute = [] + bulk_update_resource_attribute = [] + bulk_create_resource_attribute = [] bulk_update_resource = [] for resource_to_delete in Resource.objects.exclude(name__in=list(processed_resources)).filter(is_available=True, resource_type=compute_node): resource_to_delete.is_available = False bulk_update_resource.append(resource_to_delete) - bulk_process_resource_attribute.append(ResourceAttribute(resource=resource_to_delete, value=str(datetime.now()), resource_attribute_type=service_end_attribute_type)) + service_end = ResourceAttribute(resource=resource_to_delete, value=str(datetime.now()), resource_attribute_type=service_end_attribute_type) + if f"{resource_to_delete.name} {service_end_attribute_type.name}" in existing_resource_attributes_check: + service_end.pk = existing_resource_attributes_pk_map[f"{resource_to_delete.name} {service_end_attribute_type.name}"] + bulk_update_resource_attribute.append(service_end) + else: + bulk_create_resource_attribute.append(service_end) try: - ResourceAttribute.objects.bulk_create(bulk_process_resource_attribute, update_conflicts=True, unique_fields=[], update_fields=['value']) - Resource.objects.bulk_create(bulk_update_resource, update_conflicts=True, unique_fields=[], update_fields=['is_available']) + logger.debug(f'Decommissioning {len(bulk_update_resource)} Resource records') + bulk_update_with_history(bulk_update_resource, Resource, ['is_available'], batch_size=500, default_change_reason='slurm_manage_resource command') + logger.debug(f'Creating {len(bulk_create_resource_attribute)} ServiceEnd ResourceAttribute records') + bulk_create_with_history(bulk_create_resource_attribute, ResourceAttribute, batch_size=500, default_change_reason='slurm_manage_resource command') + logger.debug(f'Updating {len(bulk_update_resource_attribute)} ServiceEnd ResourceAttribute records') + bulk_update_with_history(bulk_update_resource_attribute, ResourceAttribute, ['value'], batch_size=500, default_change_reason='slurm_manage_resource command') except Exception as e: logger.error(f'Error cleaning up resources: {str(e)}') raise \ No newline at end of file From 7abd545a644748253222d6d1a53dbf3ab81d2c2e Mon Sep 17 00:00:00 2001 From: Antonio Rodriguez Date: Fri, 28 Mar 2025 17:22:11 +0100 Subject: [PATCH 4/4] Add current cluster as parent resoruce for compute nodes --- coldfront/core/resource/models.py | 2 + .../commands/slurm_manage_resources.py | 55 +++++++++++++++---- 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/coldfront/core/resource/models.py b/coldfront/core/resource/models.py index b37f2e7b37..5662415af1 100644 --- a/coldfront/core/resource/models.py +++ b/coldfront/core/resource/models.py @@ -3,7 +3,9 @@ from django.db import models from django.conf import settings from django.contrib.auth.models import Group +from django.utils.translation import gettext_lazy as _ from django.core.exceptions import ValidationError, ObjectDoesNotExist +from model_utils.fields import AutoLastModifiedField from model_utils.models import TimeStampedModel from simple_history.models import HistoricalRecords diff --git a/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py b/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py index 918c2cd0ea..411bd31f3c 100644 --- a/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py +++ b/coldfront/plugins/slurm/management/commands/slurm_manage_resources.py @@ -3,14 +3,14 @@ import re from functools import reduce from cProfile import Profile +from django.utils import timezone -from django.core.management.base import BaseCommand, CommandError +from django.core.management.base import BaseCommand from simple_history.utils import bulk_update_with_history, bulk_create_with_history from coldfront.core.resource.models import ResourceType, ResourceAttribute, ResourceAttributeType, AttributeType, Resource from coldfront.core.project.models import Project from coldfront.plugins.slurm.utils import slurm_get_nodes_info -from django.utils.datetime_safe import datetime logger = logging.getLogger(__name__) @@ -68,13 +68,19 @@ def calculate_owner_value(project_list, row): return'FASRC' return owner_name + def get_cluster(): + return Resource.objects.get(resource_type__name='Cluster') + + env = options['environment'] or 'production' if 'dev' in env: - output = self.get_output_from_file(os.path.join(os.getcwd(), 'coldfront/plugins/slurm/management/commands/sinfo_output.txt')) + output = self.get_output_from_file(os.path.join(os.getcwd(), 'coldfront/plugins/slurm/management/commands/sinfo.txt')) else: output = slurm_get_nodes_info() logger.debug(f'Running on {env} mode') + modify_history_date = timezone.now() project_list = Project.objects.all() + current_cluster = get_cluster() compute_node = ResourceType.objects.get(name='Compute Node') attribute_type_name_list = ['GPU Count', 'Core Count', 'Features', 'Owner', 'ServiceEnd'] partition_resource_type = ResourceType.objects.get(name='Cluster Partition') @@ -96,7 +102,14 @@ def calculate_owner_value(project_list, row): bulk_update_resource = [] processed_resource_attribute = [] for row in output: - new_resource, compute_node_created_created = Resource.objects.get_or_create(name=row['nodelist'], defaults={'is_allocatable':False, 'resource_type':compute_node}) + new_resource, compute_node_created_created = Resource.objects.get_or_create( + name=row['nodelist'], + defaults={ + 'is_allocatable':False, + 'resource_type':compute_node, + 'parent_resource':current_cluster + } + ) Resource.objects.get_or_create(name=row['partition'], defaults={'resource_type':partition_resource_type}) gpu_count = ResourceAttribute(resource_attribute_type=gpu_count_attribute_type, resource=new_resource, value=calculate_gpu_count(row['gres'])) @@ -143,13 +156,27 @@ def calculate_owner_value(project_list, row): new_resource.is_available = True bulk_update_resource.append(new_resource) service_end_pk = existing_resource_attributes_pk_map[f"{row['nodelist']} {service_end_attribute_type.name}"] - bulk_update_resource_attribute.append(ResourceAttribute(resource=new_resource, value=None, resource_attribute_type=service_end_attribute_type, pk=service_end_pk)) + bulk_update_resource_attribute.append( + ResourceAttribute( + resource=new_resource, value=None, + resource_attribute_type=service_end_attribute_type, + pk=service_end_pk, + modified=modify_history_date + ) + ) processed_resources.add(new_resource.name) try: logger.debug(f'Updating {len(bulk_update_resource_attribute)} ResourceAttribute records') - bulk_update_with_history(bulk_update_resource_attribute, ResourceAttribute, ['value'], batch_size=500, default_change_reason='slurm_manage_resource command') + bulk_update_with_history( + bulk_update_resource_attribute, ResourceAttribute, ['value'], + batch_size=500, default_change_reason='slurm_manage_resource command', + default_date=modify_history_date + ) logger.debug(f'Updating {len(bulk_update_resource)} Resource records') - bulk_update_with_history(bulk_update_resource, Resource, ['is_available'], batch_size=500, default_change_reason='slurm_manage_resource command') + bulk_update_with_history( + bulk_update_resource, Resource, ['is_available'], batch_size=500, + default_change_reason='slurm_manage_resource command', default_date=modify_history_date + ) logger.debug(f'Creating {len(bulk_create_resource_attribute)} ResourceAttribute records') bulk_create_with_history(bulk_create_resource_attribute, ResourceAttribute, batch_size=500, default_change_reason='slurm_manage_resource command') except Exception as e: @@ -161,19 +188,25 @@ def calculate_owner_value(project_list, row): for resource_to_delete in Resource.objects.exclude(name__in=list(processed_resources)).filter(is_available=True, resource_type=compute_node): resource_to_delete.is_available = False bulk_update_resource.append(resource_to_delete) - service_end = ResourceAttribute(resource=resource_to_delete, value=str(datetime.now()), resource_attribute_type=service_end_attribute_type) + service_end = ResourceAttribute(resource=resource_to_delete, value=modify_history_date, resource_attribute_type=service_end_attribute_type) if f"{resource_to_delete.name} {service_end_attribute_type.name}" in existing_resource_attributes_check: service_end.pk = existing_resource_attributes_pk_map[f"{resource_to_delete.name} {service_end_attribute_type.name}"] bulk_update_resource_attribute.append(service_end) else: bulk_create_resource_attribute.append(service_end) try: - logger.debug(f'Decommissioning {len(bulk_update_resource)} Resource records') - bulk_update_with_history(bulk_update_resource, Resource, ['is_available'], batch_size=500, default_change_reason='slurm_manage_resource command') + logger.debug(f'Decommissioning {bulk_update_resource} Resource records') + bulk_update_with_history( + bulk_update_resource, Resource, ['is_available'], batch_size=500, + default_change_reason='slurm_manage_resource command', default_date=modify_history_date + ) logger.debug(f'Creating {len(bulk_create_resource_attribute)} ServiceEnd ResourceAttribute records') bulk_create_with_history(bulk_create_resource_attribute, ResourceAttribute, batch_size=500, default_change_reason='slurm_manage_resource command') logger.debug(f'Updating {len(bulk_update_resource_attribute)} ServiceEnd ResourceAttribute records') - bulk_update_with_history(bulk_update_resource_attribute, ResourceAttribute, ['value'], batch_size=500, default_change_reason='slurm_manage_resource command') + bulk_update_with_history( + bulk_update_resource_attribute, ResourceAttribute, ['value'], batch_size=500, + default_change_reason='slurm_manage_resource command', default_date=modify_history_date + ) except Exception as e: logger.error(f'Error cleaning up resources: {str(e)}') raise \ No newline at end of file