Skip to content

Commit 07ed672

Browse files
committed
Use polars for performance analytics and add more statistics
1 parent 2546434 commit 07ed672

File tree

7 files changed

+421
-318
lines changed

7 files changed

+421
-318
lines changed

reframe/frontend/printer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,8 @@ def table(self, data, **kwargs):
286286

287287
table_format = rt.runtime().get_option('general/0/table_format')
288288
if table_format == 'csv':
289-
return self._table_as_csv(data)
289+
self._table_as_csv(data)
290+
return
290291

291292
# Map our options to tabulate
292293
if table_format == 'plain':

reframe/frontend/reporting/__init__.py

Lines changed: 91 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import lxml.etree as etree
1212
import math
1313
import os
14+
import polars as pl
1415
import re
1516
import socket
1617
import time
@@ -27,7 +28,7 @@
2728
from reframe.core.warnings import suppress_deprecations
2829
from reframe.utility import nodelist_abbrev, OrderedSet
2930
from .storage import StorageBackend
30-
from .utility import Aggregator, parse_cmp_spec, parse_query_spec
31+
from .utility import parse_cmp_spec, parse_query_spec
3132

3233
# The schema data version
3334
# Major version bumps are expected to break the validation of previous schemas
@@ -564,54 +565,53 @@ class _TCProxy(UserDict):
564565
_required_keys = ['name', 'system', 'partition', 'environ']
565566

566567
def __init__(self, testcase, include_only=None):
568+
# Define the derived attributes
569+
def _basename():
570+
return testcase['name'].split()[0]
571+
572+
def _sysenv():
573+
return _format_sysenv(testcase['system'],
574+
testcase['partition'],
575+
testcase['environ'])
576+
577+
def _job_nodelist():
578+
nodelist = testcase['job_nodelist']
579+
if isinstance(nodelist, str):
580+
return nodelist
581+
else:
582+
return nodelist_abbrev(testcase['job_nodelist'])
583+
567584
if isinstance(testcase, _TCProxy):
568585
testcase = testcase.data
569586

570587
if include_only is not None:
571588
self.data = {}
572-
for k in include_only + self._required_keys:
573-
if k in testcase:
574-
self.data.setdefault(k, testcase[k])
575-
else:
576-
self.data = testcase
589+
for key in include_only + self._required_keys:
590+
# Computed attributes
591+
if key == 'basename':
592+
val = _basename()
593+
elif key == 'sysenv':
594+
val = _sysenv()
595+
elif key == 'job_nodelist':
596+
val = _job_nodelist()
597+
else:
598+
val = testcase.get(key)
577599

578-
def __getitem__(self, key):
579-
val = super().__getitem__(key)
580-
if key == 'job_nodelist':
581-
val = nodelist_abbrev(val)
582-
583-
return val
584-
585-
def __missing__(self, key):
586-
if key == 'basename':
587-
return self.data['name'].split()[0]
588-
elif key == 'sysenv':
589-
return _format_sysenv(self.data['system'],
590-
self.data['partition'],
591-
self.data['environ'])
592-
elif key == 'pdiff':
593-
return None
600+
self.data.setdefault(key, val)
594601
else:
595-
raise KeyError(key)
596-
597-
598-
def _group_key(groups, testcase: _TCProxy):
599-
key = []
600-
for grp in groups:
601-
with reraise_as(ReframeError, (KeyError,), 'no such group'):
602-
val = testcase[grp]
603-
if not isinstance(val, Hashable):
604-
val = str(val)
605-
606-
key.append(val)
607-
608-
return tuple(key)
602+
# Include the derived attributes too
603+
testcase.update({
604+
'basename': _basename(),
605+
'sysenv': _sysenv(),
606+
'job_nodelist': _job_nodelist()
607+
})
608+
self.data = testcase
609609

610610

611611
@time_function
612-
def _group_testcases(testcases, groups, columns):
613-
grouped = {}
614-
record_cols = groups + [c for c in columns if c not in groups]
612+
def _create_dataframe(testcases, groups, columns):
613+
record_cols = list(OrderedSet(groups) | OrderedSet(columns))
614+
data = []
615615
for tc in map(_TCProxy, testcases):
616616
for pvar, reftuple in tc['perfvalues'].items():
617617
pvar = pvar.split(':')[-1]
@@ -636,127 +636,54 @@ def _group_testcases(testcases, groups, columns):
636636
'punit': punit,
637637
'presult': presult
638638
})
639-
key = _group_key(groups, record)
640-
grouped.setdefault(key, [])
641-
grouped[key].append(record)
642-
643-
return grouped
639+
data.append(record)
644640

645-
646-
@time_function
647-
def _aggregate_perf(grouped_testcases, aggr_fn, cols):
648-
# Update delimiter for joining unique values based on the table format
649-
table_format = runtime().get_option('general/0/table_format')
650-
if table_format == 'pretty':
651-
delim = '\n'
641+
if data:
642+
return pl.DataFrame(data)
652643
else:
653-
delim = '|'
654-
655-
other_aggr = Aggregator.create('join_uniq', delim)
656-
count_aggr = Aggregator.create('count')
657-
aggr_data = {}
658-
for key, seq in grouped_testcases.items():
659-
aggr_data.setdefault(key, {})
660-
with reraise_as(ReframeError, (KeyError,), 'no such column'):
661-
for c in cols:
662-
if c == 'pval':
663-
fn = aggr_fn
664-
elif c == 'psamples':
665-
fn = count_aggr
666-
else:
667-
fn = other_aggr
644+
return pl.DataFrame(schema=record_cols)
668645

669-
if fn is count_aggr:
670-
aggr_data[key][c] = fn(seq)
671-
else:
672-
aggr_data[key][c] = fn(tc[c] for tc in seq)
673646

674-
return aggr_data
647+
@time_function
648+
def _aggregate_data(testcases, query):
649+
df = _create_dataframe(testcases, query.group_by, query.attributes)
650+
df = df.group_by(query.group_by).agg(
651+
query.aggregation.col_spec(query.aggregated_attributes)
652+
).sort(query.group_by)
653+
return df
675654

676655

677656
@time_function
678-
def compare_testcase_data(base_testcases, target_testcases, base_fn, target_fn,
679-
groups=None, columns=None):
680-
groups = groups or []
681-
682-
# Clean up columns and store those for which we want explicitly the A or B
683-
# variants
684-
cols = []
685-
variants_A = set()
686-
variants_B = set()
687-
for c in columns:
688-
if c.endswith('_A'):
689-
variants_A.add(c[:-2])
690-
cols.append(c[:-2])
691-
elif c.endswith('_B'):
692-
variants_B.add(c[:-2])
693-
cols.append(c[:-2])
694-
else:
695-
variants_A.add(c)
696-
variants_B.add(c)
697-
cols.append(c)
698-
699-
grouped_base = _group_testcases(base_testcases, groups, cols)
700-
grouped_target = _group_testcases(target_testcases, groups, cols)
701-
pbase = _aggregate_perf(grouped_base, base_fn, cols)
702-
ptarget = _aggregate_perf(grouped_target, target_fn, cols)
703-
704-
# For visual purposes if `name` is in `groups`, consider also its
705-
# derivative `basename` to be in, so as to avoid duplicate columns
706-
if 'name' in groups:
707-
groups.append('basename')
708-
709-
# Build the final table data
710-
extra_cols = set(cols) - set(groups) - {'pdiff'}
711-
712-
# Header line
713-
header = []
714-
for c in cols:
715-
if c in extra_cols:
716-
if c in variants_A:
717-
header.append(f'{c}_A')
718-
719-
if c in variants_B:
720-
header.append(f'{c}_B')
721-
else:
722-
header.append(c)
723-
724-
data = [header]
725-
for key, aggr_data in pbase.items():
726-
pdiff = None
727-
line = []
728-
for c in cols:
729-
base = aggr_data.get(c)
730-
try:
731-
target = ptarget[key][c]
732-
except KeyError:
733-
target = None
734-
735-
if c == 'pval':
736-
line.append('n/a' if base is None else base)
737-
line.append('n/a' if target is None else target)
738-
739-
# compute diff for later usage
740-
if base is not None and target is not None:
741-
if base == 0 and target == 0:
742-
pdiff = math.nan
743-
elif target == 0:
744-
pdiff = math.inf
745-
else:
746-
pdiff = (base - target) / target
747-
pdiff = '{:+7.2%}'.format(pdiff)
748-
elif c == 'pdiff':
749-
line.append('n/a' if pdiff is None else pdiff)
750-
elif c in extra_cols:
751-
if c in variants_A:
752-
line.append('n/a' if base is None else base)
753-
754-
if c in variants_B:
755-
line.append('n/a' if target is None else target)
756-
else:
757-
line.append('n/a' if base is None else base)
657+
def compare_testcase_data(base_testcases, target_testcases, query):
658+
df_base = _aggregate_data(base_testcases, query).with_columns(
659+
pl.col(query.aggregated_columns).name.suffix(query.lhs_column_suffix)
660+
)
661+
df_target = _aggregate_data(target_testcases, query).with_columns(
662+
pl.col(query.aggregated_columns).name.suffix(query.rhs_column_suffix)
663+
)
664+
pval = query.aggregation.column_names('pval')[0]
665+
pval_lhs = f'{pval}{query.lhs_column_suffix}'
666+
pval_rhs = f'{pval}{query.rhs_column_suffix}'
667+
cols = OrderedSet(query.group_by) | OrderedSet(query.aggregated_variants)
668+
if not df_base.is_empty() and not df_target.is_empty():
669+
cols |= {query.diff_column}
670+
df = df_base.join(df_target, on=query.group_by).with_columns(
671+
(100*(pl.col(pval_lhs) - pl.col(pval_rhs)) / pl.col(pval_rhs))
672+
.round(2).alias(query.diff_column)
673+
).select(cols)
674+
elif df_base.is_empty():
675+
df = pl.DataFrame(schema=list(cols))
676+
else:
677+
# df_target is empty; add an empty col for all `rhs` variants
678+
df = df_base.select(
679+
pl.col(col)
680+
if col in df_base.columns else pl.lit('<no data>').alias(col)
681+
for col in cols
682+
)
758683

759-
data.append(line)
684+
data = [df.columns]
685+
for row in df.iter_rows():
686+
data.append(row)
760687

761688
return data
762689

@@ -766,10 +693,10 @@ def performance_compare(cmp, report=None, namepatt=None,
766693
filterA=None, filterB=None):
767694
with reraise_as(ReframeError, (ValueError,),
768695
'could not parse comparison spec'):
769-
match = parse_cmp_spec(cmp)
696+
query = parse_cmp_spec(cmp)
770697

771698
backend = StorageBackend.default()
772-
if match.base is None:
699+
if query.lhs is None:
773700
if report is None:
774701
raise ValueError('report cannot be `None` '
775702
'for current run comparisons')
@@ -785,11 +712,10 @@ def performance_compare(cmp, report=None, namepatt=None,
785712
except IndexError:
786713
tcs_base = []
787714
else:
788-
tcs_base = backend.fetch_testcases(match.base, namepatt, filterA)
715+
tcs_base = backend.fetch_testcases(query.lhs, namepatt, filterA)
789716

790-
tcs_target = backend.fetch_testcases(match.target, namepatt, filterB)
791-
return compare_testcase_data(tcs_base, tcs_target, match.aggregator,
792-
match.aggregator, match.groups, match.columns)
717+
tcs_target = backend.fetch_testcases(query.rhs, namepatt, filterB)
718+
return compare_testcase_data(tcs_base, tcs_target, query)
793719

794720

795721
@time_function
@@ -837,22 +763,20 @@ def session_data(query):
837763
def testcase_data(spec, namepatt=None, test_filter=None):
838764
with reraise_as(ReframeError, (ValueError,),
839765
'could not parse comparison spec'):
840-
match = parse_cmp_spec(spec, default_extra_cols=['pval'])
766+
query = parse_cmp_spec(spec)
841767

842-
if match.base is not None:
768+
if query.lhs is not None:
843769
raise ReframeError('only one time period or session are allowed: '
844770
'if you want to compare performance, '
845771
'use the `--performance-compare` option')
846772

847773
storage = StorageBackend.default()
848-
testcases = storage.fetch_testcases(match.target, namepatt, test_filter)
849-
aggregated = _aggregate_perf(
850-
_group_testcases(testcases, match.groups, match.columns),
851-
match.aggregator, match.columns
774+
df = _aggregate_data(
775+
storage.fetch_testcases(query.rhs, namepatt, test_filter), query
852776
)
853-
data = [match.columns]
854-
for aggr_data in aggregated.values():
855-
data.append([aggr_data[c] for c in match.columns])
777+
data = [df.columns]
778+
for row in df.iter_rows():
779+
data.append(row)
856780

857781
return data
858782

0 commit comments

Comments
 (0)