|
14 | 14 | import itertools |
15 | 15 | import re |
16 | 16 | import time |
| 17 | +import json |
17 | 18 |
|
18 | 19 | import reframe.core.schedulers as sched |
19 | 20 | import reframe.utility.osext as osext |
|
47 | 48 | 'W': 'WAITING', |
48 | 49 | 'S': 'SUSPENDED', |
49 | 50 | 'C': 'COMPLETED', |
| 51 | + "F": "COMPLETED", |
50 | 52 | } |
51 | 53 |
|
52 | 54 |
|
@@ -323,3 +325,83 @@ def _query_exit_code(self, job): |
323 | 325 |
|
324 | 326 | # Torque does not provide a way to retrieve the history of jobs |
325 | 327 | return None |
| 328 | + |
| 329 | +@register_scheduler('pbspro') |
| 330 | +class PbsProJobScheduler(PbsJobScheduler): |
| 331 | + def poll(self, *jobs): |
| 332 | + '''Poll checking PBS jobs for updated states.''' |
| 333 | + if jobs: |
| 334 | + # Filter out non-jobs |
| 335 | + jobs = [job for job in jobs if job is not None] |
| 336 | + if not jobs: |
| 337 | + return |
| 338 | + |
| 339 | + # query status of all jobs |
| 340 | + job_status = osext.run_command( |
| 341 | + f"qstat -xf -F json {' '.join(job.jobid for job in jobs)}" |
| 342 | + ) |
| 343 | + |
| 344 | + # from Table 14-1: Error Codes in |
| 345 | + # https://help.altair.com/2024.1.0/PBS%20Professional/PBSReferenceGuide2024.1.pdf, |
| 346 | + # we have the codes PBS returns in case of an error with exit(error_code), |
| 347 | + # like exit(15001) for unknown Job ID. however, only the last 8 bits |
| 348 | + # of the exit code are returned, so what we get as the actual error code |
| 349 | + # is exit_code % 256, which is for example 153 for Unknown Job Identifier. |
| 350 | + # 153 is returned if any job id in the list is unknown, even if some others |
| 351 | + # are known. these unknown jobids will be caught in the loop over jobs |
| 352 | + # below so we can pass on for now. previously 35 was checked here, |
| 353 | + # but we only get that for a "History job ID" (when qstat -f is used |
| 354 | + # on a jobid that has already ended. Since above we use "-x" we should not |
| 355 | + # get exit code 35 anymore) |
| 356 | + if job_status.returncode in [153, 0]: |
| 357 | + pass |
| 358 | + elif job_status.returncode == 255: |
| 359 | + |
| 360 | + # try again, qstat is having a problem |
| 361 | + self.log(f'qstat failed with exit code {completed.returncode} ' |
| 362 | + f'(standard error follows):\n{completed.stderr}\n retrying') |
| 363 | + return |
| 364 | + else: |
| 365 | + raise JobSchedulerError( |
| 366 | + f'qstat failed with exit code {completed.returncode} ' |
| 367 | + f'(standard error follows):\n{completed.stderr}' |
| 368 | + ) |
| 369 | + |
| 370 | + job_status_json = json.loads(job_status.stdout) |
| 371 | + |
| 372 | + # loop over each job |
| 373 | + for job in jobs: |
| 374 | + |
| 375 | + # check if the job is in the json |
| 376 | + if job.jobid in job_status_json["Jobs"]: |
| 377 | + |
| 378 | + # get information (status and nodelist) about the job |
| 379 | + job_info = job_status_json["Jobs"][job.jobid] |
| 380 | + state = job_info["job_state"] |
| 381 | + self.log(f"Job {job.jobid} known to scheduler, state: {state}") |
| 382 | + job._state = JOB_STATES[state] |
| 383 | + |
| 384 | + # check if exec_host is in the ouput since exec_host is only in |
| 385 | + # the output if job has started to run (not if it's just queued) |
| 386 | + if "exec_host" in job_info: |
| 387 | + nodespec = job_info["exec_host"] |
| 388 | + self._update_nodelist(job, nodespec) |
| 389 | + |
| 390 | + # set state of job depending on the output from qstat |
| 391 | + if job.state == "COMPLETED": |
| 392 | + exit_code = job_info["Exit_status"] |
| 393 | + job._exitcode = int(exit_code) |
| 394 | + job._completed = True |
| 395 | + elif job.state in ["QUEUED", "HELD", "WAITING"]: |
| 396 | + if (job.max_pending_time and |
| 397 | + (time.time() - job.submit_time) >= job.max_pending_time): |
| 398 | + self.cancel(job) |
| 399 | + job._exception = JobError( |
| 400 | + "maximum pending time exceeded", job.jobid |
| 401 | + ) |
| 402 | + else: |
| 403 | + self.log(f"Job {job.jobid} not known to scheduler") |
| 404 | + job._state = "COMPLETED" |
| 405 | + self.log(f"Assuming job {job.jobid} completed") |
| 406 | + job._completed = True |
| 407 | + |
0 commit comments