|
14 | 14 | import itertools |
15 | 15 | import re |
16 | 16 | import time |
| 17 | +import json |
17 | 18 |
|
18 | 19 | import reframe.core.schedulers as sched |
19 | 20 | import reframe.utility.osext as osext |
|
47 | 48 | 'W': 'WAITING', |
48 | 49 | 'S': 'SUSPENDED', |
49 | 50 | 'C': 'COMPLETED', |
| 51 | + "F": "COMPLETED", |
50 | 52 | } |
51 | 53 |
|
52 | 54 |
|
@@ -323,3 +325,86 @@ def _query_exit_code(self, job): |
323 | 325 |
|
324 | 326 | # Torque does not provide a way to retrieve the history of jobs |
325 | 327 | return None |
| 328 | + |
| 329 | + |
| 330 | +@register_scheduler('pbspro') |
| 331 | +class PbsProJobScheduler(PbsJobScheduler): |
| 332 | + def poll(self, *jobs): |
| 333 | + '''Poll checking PBS jobs for updated states.''' |
| 334 | + if jobs: |
| 335 | + # Filter out non-jobs |
| 336 | + jobs = [job for job in jobs if job is not None] |
| 337 | + if not jobs: |
| 338 | + return |
| 339 | + |
| 340 | + # query status of all jobs |
| 341 | + completed = osext.run_command( |
| 342 | + f"qstat -xf -F json {' '.join(job.jobid for job in jobs)}" |
| 343 | + ) |
| 344 | + |
| 345 | + # from Table 14-1: Error Codes in |
| 346 | + # https://help.altair.com/2024.1.0/PBS%20Professional/PBSReferenceGuide2024.1.pdf, |
| 347 | + # we have the codes PBS returns in case of an error with |
| 348 | + # exit(error_code), like exit(15001) for unknown Job ID. However, only |
| 349 | + # the last 8 bits of the exit code are returned, so what we get as the |
| 350 | + # actual error code is `exit_code % 256`, which is for example 153 for |
| 351 | + # "Unknown Job Identifier". 153 is returned if any job id in the list |
| 352 | + # is unknown, even if some others are known. These unknown job ids |
| 353 | + # will be caught in the loop over jobs below so we can pass on for |
| 354 | + # now. previously 35 was checked here, but we only get that for a |
| 355 | + # "History job ID" (when qstat -f is used on a jobid that has already |
| 356 | + # ended. Since above we use "-x" we should not get exit code 35 |
| 357 | + # anymore) |
| 358 | + if completed.returncode in [153, 0]: |
| 359 | + pass |
| 360 | + elif completed.returncode == 255: |
| 361 | + |
| 362 | + # try again, qstat is having a problem |
| 363 | + self.log(f'qstat failed with exit code {completed.returncode} ' |
| 364 | + f'(standard error follows):\n{completed.stderr}\n' |
| 365 | + 'retrying') |
| 366 | + return |
| 367 | + else: |
| 368 | + raise JobSchedulerError( |
| 369 | + f'qstat failed with exit code {completed.returncode} ' |
| 370 | + f'(standard error follows):\n{completed.stderr}' |
| 371 | + ) |
| 372 | + |
| 373 | + job_status_json = json.loads(completed.stdout) |
| 374 | + |
| 375 | + # loop over each job |
| 376 | + for job in jobs: |
| 377 | + # check if the job is in the json |
| 378 | + if job.jobid in job_status_json["Jobs"]: |
| 379 | + |
| 380 | + # get information (status and nodelist) about the job |
| 381 | + job_info = job_status_json["Jobs"][job.jobid] |
| 382 | + state = job_info["job_state"] |
| 383 | + self.log(f"Job {job.jobid} known to scheduler, state: {state}") |
| 384 | + job._state = JOB_STATES[state] |
| 385 | + |
| 386 | + # check if exec_host is in the ouput since exec_host is only |
| 387 | + # in the output if job has started to run (not if it's just |
| 388 | + # queued) |
| 389 | + if "exec_host" in job_info: |
| 390 | + nodespec = job_info["exec_host"] |
| 391 | + self._update_nodelist(job, nodespec) |
| 392 | + |
| 393 | + # set state of job depending on the output from qstat |
| 394 | + if job.state == "COMPLETED": |
| 395 | + exit_code = job_info["Exit_status"] |
| 396 | + job._exitcode = int(exit_code) |
| 397 | + job._completed = True |
| 398 | + elif job.state in ["QUEUED", "HELD", "WAITING"]: |
| 399 | + pending_time = time.time() - job.submit_time |
| 400 | + if (job.max_pending_time and |
| 401 | + pending_time >= job.max_pending_time): |
| 402 | + self.cancel(job) |
| 403 | + job._exception = JobError( |
| 404 | + "maximum pending time exceeded", job.jobid |
| 405 | + ) |
| 406 | + else: |
| 407 | + self.log(f"Job {job.jobid} not known to scheduler") |
| 408 | + job._state = "COMPLETED" |
| 409 | + self.log(f"Assuming job {job.jobid} completed") |
| 410 | + job._completed = True |
0 commit comments