Skip to content

Commit 24aa49a

Browse files
committed
Move is_avail and is_down functions in scheduler
1 parent c7e38c4 commit 24aa49a

File tree

4 files changed

+47
-52
lines changed

4 files changed

+47
-52
lines changed

reframe/core/schedulers/__init__.py

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ def log(self, message, level=DEBUG2):
153153
getlogger().log(level, f'[S] {self.registered_name}: {message}')
154154

155155

156-
def filter_nodes_by_state(nodelist, state):
156+
def filter_nodes_by_state(nodelist, state, scheduler):
157157
'''Filter nodes by their state
158158
159159
:arg nodelist: List of :class:`Node` instances to filter.
@@ -178,11 +178,13 @@ def filter_nodes_by_state(nodelist, state):
178178
allowed_states = state.split('|')
179179
final_nodelist = set()
180180
for s in allowed_states:
181-
final_nodelist.update(filter_nodes_by_state(nodelist, s))
181+
final_nodelist.update(
182+
filter_nodes_by_state(nodelist, s, scheduler)
183+
)
182184

183185
nodelist = final_nodelist
184186
elif state == 'avail':
185-
nodelist = {n for n in nodelist if n.is_avail()}
187+
nodelist = {n for n in nodelist if scheduler.is_node_avail(n)}
186188
elif state != 'all':
187189
if state.endswith('*'):
188190
# non-exclusive state match
@@ -618,19 +620,22 @@ def guess_num_tasks(self):
618620
f'[F] Total available nodes: {len(available_nodes)}'
619621
)
620622

623+
available_nodes = self.scheduler.filternodes(self, available_nodes)
624+
getlogger().debug(
625+
f'[F] Total available after scheduler filter: '
626+
f'{len(available_nodes)}'
627+
)
628+
621629
# Try to guess the number of tasks now
622630
available_nodes = filter_nodes_by_state(
623-
available_nodes, self.sched_flex_alloc_nodes.lower()
631+
available_nodes,
632+
self.sched_flex_alloc_nodes.lower(),
633+
self.scheduler
624634
)
625635
getlogger().debug(
626636
f'[F] Total available in state='
627637
f'{self.sched_flex_alloc_nodes.lower()}: {len(available_nodes)}'
628638
)
629-
available_nodes = self.scheduler.filternodes(self, available_nodes)
630-
getlogger().debug(
631-
f'[F] Total available after scheduler filter: '
632-
f'{len(available_nodes)}'
633-
)
634639
return len(available_nodes) * num_tasks_per_node
635640

636641
def submit(self):
@@ -694,17 +699,6 @@ def in_state(self, state):
694699
:class:`False` otherwise.
695700
'''
696701

697-
@abc.abstractmethod
698-
def is_avail(self):
699-
'''Check whether the node is available for scheduling jobs.'''
700-
701-
def is_down(self):
702-
'''Check whether node is down.
703-
704-
This is the inverse of :func:`is_avail`.
705-
'''
706-
return not self.is_avail()
707-
708702

709703
class AlwaysIdleNode(Node):
710704
def __init__(self, name):
@@ -715,9 +709,6 @@ def __init__(self, name):
715709
def name(self):
716710
return self._name
717711

718-
def is_avail(self):
719-
return True
720-
721712
def in_statex(self, state):
722713
return state.lower() == self._state
723714

reframe/core/schedulers/slurm.py

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,13 @@ def __init__(self):
147147
self._sched_access_in_submit = self.get_option(
148148
'sched_access_in_submit'
149149
)
150-
self.addl_avail_states = set()
150+
self.node_available_states = {
151+
'ALLOCATED',
152+
'COMPLETING',
153+
'IDLE',
154+
'PLANNED',
155+
'RESERVED'
156+
}
151157

152158
def make_job(self, *args, **kwargs):
153159
return _SlurmJob(*args, **kwargs)
@@ -324,7 +330,7 @@ def allnodes(self):
324330
'could not retrieve node information') from e
325331

326332
node_descriptions = completed.stdout.splitlines()
327-
return _create_nodes(node_descriptions, self.addl_avail_states)
333+
return _create_nodes(node_descriptions)
328334

329335
def _get_default_partition(self):
330336
completed = _run_strict('scontrol -a show -o partitions')
@@ -440,20 +446,20 @@ def _get_reservation_nodes(self, reservation):
440446
flags_match = re.search(r'Flags=(\S+)', completed.stdout)
441447
if flags_match:
442448
if 'MAINT' in flags_match[1].split(','):
443-
self.addl_avail_states.add('MAINTENANCE')
449+
self.node_available_states.add('MAINTENANCE')
444450
else:
445451
self.log(f"could not extract the reservation flags for "
446452
f"reservation '{reservation}'")
447453

448454
completed = _run_strict('scontrol -a show -o %s' % reservation_nodes)
449455
node_descriptions = completed.stdout.splitlines()
450-
return _create_nodes(node_descriptions, self.addl_avail_states)
456+
return _create_nodes(node_descriptions)
451457

452458
def _get_nodes_by_name(self, nodespec):
453459
completed = osext.run_command('scontrol -a show -o node %s' %
454460
nodespec)
455461
node_descriptions = completed.stdout.splitlines()
456-
return _create_nodes(node_descriptions, self.addl_avail_states)
462+
return _create_nodes(node_descriptions)
457463

458464
def _update_completion_time(self, job, timestamps):
459465
if job._completion_time is not None:
@@ -603,7 +609,7 @@ def _do_cancel_if_blocked(self, job, reason_descr):
603609
self.log(f'Checking if nodes {node_names!r} '
604610
f'are indeed unavailable')
605611
nodes = self._get_nodes_by_name(node_names)
606-
if not any(n.is_down() for n in nodes):
612+
if not any(self.is_node_down(n) for n in nodes):
607613
return
608614

609615
self.cancel(job)
@@ -639,6 +645,12 @@ def cancel(self, job):
639645
def finished(self, job):
640646
return slurm_state_completed(job.state)
641647

648+
def is_node_avail(self, node):
649+
return node.states <= self.node_available_states
650+
651+
def is_node_down(self, node):
652+
return not self.is_node_avail(node)
653+
642654

643655
@register_scheduler('squeue')
644656
class SqueueJobScheduler(SlurmJobScheduler):
@@ -700,19 +712,19 @@ def poll(self, *jobs):
700712
self._cancel_if_pending_too_long(job)
701713

702714

703-
def _create_nodes(descriptions, addl_avail_states=None):
715+
def _create_nodes(descriptions):
704716
nodes = set()
705717
for descr in descriptions:
706718
with suppress(JobSchedulerError):
707-
nodes.add(_SlurmNode(descr, addl_avail_states=addl_avail_states))
719+
nodes.add(_SlurmNode(descr))
708720

709721
return nodes
710722

711723

712724
class _SlurmNode(sched.Node):
713725
'''Class representing a Slurm node.'''
714726

715-
def __init__(self, node_descr, addl_avail_states=None):
727+
def __init__(self, node_descr):
716728
self._name = self._extract_attribute('NodeName', node_descr)
717729
if not self._name:
718730
raise JobSchedulerError(
@@ -727,15 +739,6 @@ def __init__(self, node_descr, addl_avail_states=None):
727739
'State', node_descr, sep='+') or set()
728740
self._descr = node_descr
729741

730-
self.addl_avail_states = addl_avail_states or set()
731-
self.available_states = {
732-
'ALLOCATED',
733-
'COMPLETING',
734-
'IDLE',
735-
'PLANNED',
736-
'RESERVED'
737-
} | self.addl_avail_states
738-
739742
def __eq__(self, other):
740743
if not isinstance(other, type(self)):
741744
return NotImplemented
@@ -752,12 +755,6 @@ def in_state(self, state):
752755
def in_statex(self, state):
753756
return self._states == set(state.upper().split('+'))
754757

755-
def is_avail(self):
756-
return self._states <= self.available_states
757-
758-
def is_down(self):
759-
return not self.is_avail()
760-
761758
def satisfies(self, slurm_constraint):
762759
# Convert the Slurm constraint to a Python expression and evaluate it,
763760
# but restrict our syntax to accept only AND or OR constraints and

reframe/frontend/testgenerators.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ def getallnodes(state, jobs_cli_options=None):
3838
f'Total available nodes for {part.name}: {len(available_nodes)}'
3939
)
4040

41-
available_nodes = filter_nodes_by_state(available_nodes, state)
41+
available_nodes = filter_nodes_by_state(
42+
available_nodes,
43+
state,
44+
part.scheduler
45+
)
4246
nodes[part.fullname] = [n.name for n in available_nodes]
4347

4448
return nodes

unittests/test_schedulers.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ def slurm_only(scheduler):
4141
if scheduler.registered_name not in ('slurm', 'squeue'):
4242
pytest.skip('test is relevant only for Slurm backends')
4343

44+
return scheduler
45+
4446

4547
@pytest.fixture
4648
def local_only(scheduler):
@@ -1455,7 +1457,8 @@ def test_slurm_node_in_state(slurm_node_allocated,
14551457

14561458
def test_slurm_node_is_down(slurm_node_allocated,
14571459
slurm_node_idle,
1458-
slurm_node_nopart):
1459-
assert not slurm_node_allocated.is_down()
1460-
assert not slurm_node_idle.is_down()
1461-
assert slurm_node_nopart.is_down()
1460+
slurm_node_nopart,
1461+
slurm_only):
1462+
assert not slurm_only().is_node_down(slurm_node_allocated)
1463+
assert not slurm_only().is_node_down(slurm_node_idle)
1464+
assert slurm_only().is_node_down(slurm_node_nopart)

0 commit comments

Comments
 (0)