From b33c015ebaf7332ade25e7395668e8a1dbfe525e Mon Sep 17 00:00:00 2001 From: Bogdan-Alexandru Stoica Date: Tue, 23 Dec 2025 00:42:07 -0600 Subject: [PATCH 1/4] feat: apply patch that enables Acto to support loading operator images from pre-packaged archives --- .../sosp23_acto/acto/acto/__main__.py | 224 +++-- .../benchmark/sosp23_acto/acto/acto/engine.py | 884 +++++++++++------- .../acto/acto/post_process/post_diff_test.py | 563 +++++++---- .../acto/acto/utils/image_helper.py | 49 + .../sosp23_acto/acto/requirements.txt | 3 +- .../benchmark/sosp23_acto/acto/test/utils.py | 31 - 6 files changed, 1100 insertions(+), 654 deletions(-) create mode 100644 benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/utils/image_helper.py diff --git a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/__main__.py b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/__main__.py index 52228cd..19e3487 100644 --- a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/__main__.py +++ b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/__main__.py @@ -1,5 +1,5 @@ import argparse -from datetime import datetime +import importlib import json import logging import os @@ -7,87 +7,111 @@ import sys import threading import time -import importlib - +from datetime import datetime start_time = time.time() -workdir_path = 'testrun-%s' % datetime.now().strftime('%Y-%m-%d-%H-%M') +workdir_path = "testrun-%s" % datetime.now().strftime("%Y-%m-%d-%H-%M") parser = argparse.ArgumentParser( - description='Automatic, Continuous Testing for k8s/openshift Operators') -parser.add_argument('--workdir', - dest='workdir_path', - type=str, - default=workdir_path, - help='Working directory') -parser.add_argument('--config', '-c', dest='config', help='Operator port config path') + description="Automatic, Continuous Testing for k8s/openshift Operators" +) parser.add_argument( - '--cluster-runtime', - '-r', - dest='cluster_runtime', + "--workdir", + dest="workdir_path", + type=str, + default=workdir_path, + help="Working directory", +) +parser.add_argument("--config", "-c", dest="config", help="Operator port config path") +parser.add_argument( + "--cluster-runtime", + "-r", + dest="cluster_runtime", default="KIND", - help='Cluster runtime for kubernetes, can be KIND (Default), K3D or MINIKUBE') -parser.add_argument('--duration', - '-d', - dest='duration', - required=False, - help='Number of hours to run') -parser.add_argument('--preload-images', - dest='preload_images', - nargs='*', - help='Docker images to preload into Kind cluster') + help="Cluster runtime for kubernetes, can be KIND (Default), K3D or MINIKUBE", +) +parser.add_argument( + "--duration", "-d", dest="duration", required=False, help="Number of hours to run" +) +parser.add_argument( + "--preload-images", + dest="preload_images", + nargs="*", + help="Docker images to preload into Kind cluster", +) # Temporary solution before integrating controller-gen -parser.add_argument('--helper-crd', - dest='helper_crd', - help='generated CRD file that helps with the input generation') -parser.add_argument('--context', dest='context', help='Cached context data') -parser.add_argument('--num-workers', - dest='num_workers', - type=int, - default=1, - help='Number of concurrent workers to run Acto with') -parser.add_argument('--num-cases', - dest='num_cases', - type=int, - default=1, - help='Number of testcases to bundle each time') -parser.add_argument('--learn', dest='learn', action='store_true', help='Learn mode') - -parser.add_argument('--additional-semantic', - dest='additional_semantic', - action='store_true', - help='Run additional semantic testcases') -parser.add_argument('--delta-from', dest='delta_from', help='Delta from') -parser.add_argument('--notify-crash', - dest='notify_crash', - action='store_true', - help='Submit a google form response to notify') -parser.add_argument('--learn-analysis', - dest='learn_analysis_only', - action='store_true', - help='Only learn analysis') -parser.add_argument('--dryrun', - dest='dryrun', - action='store_true', - help='Only generate test cases without executing them') -parser.add_argument('--checkonly', action='store_true') +parser.add_argument( + "--helper-crd", + dest="helper_crd", + help="generated CRD file that helps with the input generation", +) +parser.add_argument("--context", dest="context", help="Cached context data") +parser.add_argument( + "--num-workers", + dest="num_workers", + type=int, + default=1, + help="Number of concurrent workers to run Acto with", +) +parser.add_argument( + "--num-cases", + dest="num_cases", + type=int, + default=1, + help="Number of testcases to bundle each time", +) +parser.add_argument("--learn", dest="learn", action="store_true", help="Learn mode") + +parser.add_argument( + "--additional-semantic", + dest="additional_semantic", + action="store_true", + help="Run additional semantic testcases", +) +parser.add_argument("--delta-from", dest="delta_from", help="Delta from") +parser.add_argument( + "--notify-crash", + dest="notify_crash", + action="store_true", + help="Submit a google form response to notify", +) +parser.add_argument( + "--learn-analysis", + dest="learn_analysis_only", + action="store_true", + help="Only learn analysis", +) +parser.add_argument( + "--dryrun", + dest="dryrun", + action="store_true", + help="Only generate test cases without executing them", +) +parser.add_argument( + "--images-archive", + dest="images_archive", + type=str, + help="Prebuilt images archive to preload into the cluster", +) +parser.add_argument("--checkonly", action="store_true") args = parser.parse_args() os.makedirs(args.workdir_path, exist_ok=True) # Setting up log infra logging.basicConfig( - filename=os.path.join(args.workdir_path, 'test.log'), + filename=os.path.join(args.workdir_path, "test.log"), level=logging.DEBUG, - filemode='w', - format='%(asctime)s %(levelname)-7s, %(name)s, %(filename)-9s:%(lineno)d, %(message)s') + filemode="w", + format="%(asctime)s %(levelname)-7s, %(name)s, %(filename)-9s:%(lineno)d, %(message)s", +) logging.getLogger("kubernetes").setLevel(logging.ERROR) logging.getLogger("sh").setLevel(logging.ERROR) -with open(args.config, 'r') as config_file: +with open(args.config, "r") as config_file: config = json.load(config_file) - if 'monkey_patch' in config: - importlib.import_module(config['monkey_patch']) + if "monkey_patch" in config: + importlib.import_module(config["monkey_patch"]) from acto import common from acto.engine import Acto, apply_testcase @@ -95,7 +119,6 @@ from acto.post_process import PostDiffTest from acto.utils.config import OperatorConfig from acto.utils.error_handler import handle_excepthook, thread_excepthook - from acto.utils.thread_logger import get_thread_logger logger = get_thread_logger(with_prefix=False) @@ -105,19 +128,19 @@ threading.excepthook = thread_excepthook if args.notify_crash: - logger.critical('Crash notification should be enabled in config.yaml') + logger.critical("Crash notification should be enabled in config.yaml") -with open(args.config, 'r') as config_file: +with open(args.config, "r") as config_file: config = json.load(config_file) - if 'monkey_patch' in config: - del config['monkey_patch'] + if "monkey_patch" in config: + del config["monkey_patch"] config = OperatorConfig(**config) -logger.info('Acto started with [%s]' % sys.argv) -logger.info('Operator config: %s', config) +logger.info("Acto started with [%s]" % sys.argv) +logger.info("Operator config: %s", config) # Preload frequently used images to amid ImagePullBackOff if args.preload_images: - logger.info('%s will be preloaded into Kind cluster', args.preload_images) + logger.info("%s will be preloaded into Kind cluster", args.preload_images) # register timeout to automatically stop after # hours if args.duration != None: @@ -125,7 +148,9 @@ signal.alarm(int(args.duration) * 60 * 60) if args.context == None: - context_cache = os.path.join(os.path.dirname(config.seed_custom_resource), 'context.json') + context_cache = os.path.join( + os.path.dirname(config.seed_custom_resource), "context.json" + ) else: context_cache = args.context @@ -137,38 +162,45 @@ is_reproduce = False start_time = datetime.now() -acto = Acto(workdir_path=args.workdir_path, - operator_config=config, - cluster_runtime=args.cluster_runtime, - enable_analysis=False, - preload_images_=args.preload_images, - context_file=context_cache, - helper_crd=args.helper_crd, - num_workers=args.num_workers, - num_cases=args.num_cases, - dryrun=args.dryrun, - analysis_only=args.learn_analysis_only, - is_reproduce=is_reproduce, - input_model=input_model, - apply_testcase_f=apply_testcase_f, - delta_from=args.delta_from, - focus_fields=config.focus_fields,) +acto = Acto( + workdir_path=args.workdir_path, + operator_config=config, + cluster_runtime=args.cluster_runtime, + enable_analysis=False, + preload_images_=args.preload_images, + context_file=context_cache, + helper_crd=args.helper_crd, + num_workers=args.num_workers, + num_cases=args.num_cases, + dryrun=args.dryrun, + analysis_only=args.learn_analysis_only, + is_reproduce=is_reproduce, + input_model=input_model, + apply_testcase_f=apply_testcase_f, + delta_from=args.delta_from, + focus_fields=config.focus_fields, + images_archive=args.images_archive, +) generation_time = datetime.now() -logger.info('Acto initialization finished in %s', generation_time - start_time) +logger.info("Acto initialization finished in %s", generation_time - start_time) if args.additional_semantic: acto.run(modes=[InputModel.ADDITIONAL_SEMANTIC]) elif not args.learn: - acto.run(modes=['normal']) + acto.run(modes=["normal"]) normal_finish_time = datetime.now() -logger.info('Acto normal run finished in %s', normal_finish_time - start_time) -logger.info('Start post processing steps') +logger.info("Acto normal run finished in %s", normal_finish_time - start_time) +logger.info("Start post processing steps") # Post processing -post_diff_test_dir = os.path.join(args.workdir_path, 'post_diff_test') -p = PostDiffTest(testrun_dir=args.workdir_path, config=config) +post_diff_test_dir = os.path.join(args.workdir_path, "post_diff_test") +p = PostDiffTest( + testrun_dir=args.workdir_path, + config=config, + images_archive=args.images_archive, +) if not args.checkonly: p.post_process(post_diff_test_dir, num_workers=args.num_workers) p.check(post_diff_test_dir, num_workers=args.num_workers) end_time = datetime.now() -logger.info('Acto end to end finished in %s', end_time - start_time) \ No newline at end of file +logger.info("Acto end to end finished in %s", end_time - start_time) diff --git a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/engine.py b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/engine.py index ba83556..9ab6274 100644 --- a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/engine.py +++ b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/engine.py @@ -1,4 +1,3 @@ -import functools import importlib import os import tempfile @@ -6,9 +5,10 @@ import time from copy import deepcopy from types import FunctionType +from typing import Optional -import yaml import jsonpatch +import yaml from acto.checker.checker_set import CheckerSet from acto.common import * @@ -18,11 +18,10 @@ from acto.input import InputModel from acto.input.input import OverSpecifiedField from acto.input.known_schemas.base import K8sField -from acto.input.known_schemas.known_schema import find_all_matched_schemas, find_all_matched_schemas_type +from acto.input.known_schemas.known_schema import find_all_matched_schemas_type from acto.input.testcase import TestCase from acto.input.testplan import TreeNode -from acto.input.value_with_schema import (ValueWithSchema, - attach_schema_to_value) +from acto.input.value_with_schema import ValueWithSchema, attach_schema_to_value from acto.input.valuegenerator import ArrayGenerator from acto.kubectl_client import KubectlClient from acto.kubernetes_engine import base, k3d, kind @@ -30,58 +29,71 @@ from acto.runner import Runner from acto.serialization import ActoEncoder, ContextEncoder from acto.snapshot import EmptySnapshot, Snapshot -from acto.utils import (add_acto_label, delete_operator_pod, process_crd, - update_preload_images) +from acto.utils import delete_operator_pod, process_crd, update_preload_images from acto.utils.config import OperatorConfig -from acto.utils.thread_logger import (get_thread_logger, - set_thread_logger_prefix) +from acto.utils.image_helper import ImageHelper +from acto.utils.thread_logger import get_thread_logger, set_thread_logger_prefix from ssa.analysis import analyze -def save_result(trial_dir: str, trial_result: RunResult, num_tests: int, trial_elapsed, time_breakdown): + +def save_result( + trial_dir: str, + trial_result: RunResult, + num_tests: int, + trial_elapsed, + time_breakdown, +): logger = get_thread_logger(with_prefix=False) result_dict = {} try: - trial_num = '-'.join(trial_dir.split('-')[-2:]) - result_dict['trial_num'] = trial_num + trial_num = "-".join(trial_dir.split("-")[-2:]) + result_dict["trial_num"] = trial_num except: - result_dict['trial_num'] = trial_dir - result_dict['duration'] = trial_elapsed - result_dict['time_breakdown'] = time_breakdown - result_dict['num_tests'] = num_tests + result_dict["trial_num"] = trial_dir + result_dict["duration"] = trial_elapsed + result_dict["time_breakdown"] = time_breakdown + result_dict["num_tests"] = num_tests if trial_result == None: - logger.info('Trial %s completed without error', trial_dir) + logger.info("Trial %s completed without error", trial_dir) else: - result_dict['error'] = trial_result.to_dict() - result_path = os.path.join(trial_dir, 'result.json') - with open(result_path, 'w') as result_file: + result_dict["error"] = trial_result.to_dict() + result_path = os.path.join(trial_dir, "result.json") + with open(result_path, "w") as result_file: json.dump(result_dict, result_file, cls=ActoEncoder, indent=6) -def apply_testcase(value_with_schema: ValueWithSchema, - path: list, - testcase: TestCase, - setup: bool = False) -> jsonpatch.JsonPatch: +def apply_testcase( + value_with_schema: ValueWithSchema, + path: list, + testcase: TestCase, + setup: bool = False, +) -> jsonpatch.JsonPatch: logger = get_thread_logger(with_prefix=True) prev = value_with_schema.raw_value() field_curr_value = value_with_schema.get_value_by_path(list(path)) if setup: value_with_schema.create_path(list(path)) - value_with_schema.set_value_by_path(testcase.setup(field_curr_value), list(path)) + value_with_schema.set_value_by_path( + testcase.setup(field_curr_value), list(path) + ) curr = value_with_schema.raw_value() else: if testcase.test_precondition(field_curr_value): value_with_schema.create_path(list(path)) - value_with_schema.set_value_by_path(testcase.mutator(field_curr_value), list(path)) + value_with_schema.set_value_by_path( + testcase.mutator(field_curr_value), list(path) + ) curr = value_with_schema.raw_value() patch = jsonpatch.make_patch(prev, curr) - logger.info('JSON patch: %s' % patch) + logger.info("JSON patch: %s" % patch) return patch + def check_state_equality(snapshot: Snapshot, prev_snapshot: Snapshot) -> OracleResult: - '''Check whether two system state are semantically equivalent + """Check whether two system state are semantically equivalent Args: - snapshot: a reference to a system state @@ -89,7 +101,7 @@ def check_state_equality(snapshot: Snapshot, prev_snapshot: Snapshot) -> OracleR Return value: - a dict of diff results, empty if no diff found - ''' + """ logger = get_thread_logger(with_prefix=True) curr_system_state = deepcopy(snapshot.system_state) @@ -98,48 +110,48 @@ def check_state_equality(snapshot: Snapshot, prev_snapshot: Snapshot) -> OracleR if len(curr_system_state) == 0 or len(prev_system_state) == 0: return PassResult() - del curr_system_state['endpoints'] - del prev_system_state['endpoints'] - del curr_system_state['job'] - del prev_system_state['job'] + del curr_system_state["endpoints"] + del prev_system_state["endpoints"] + del curr_system_state["job"] + del prev_system_state["job"] # remove pods that belong to jobs from both states to avoid observability problem - curr_pods = curr_system_state['pod'] - prev_pods = prev_system_state['pod'] - curr_system_state['pod'] = { + curr_pods = curr_system_state["pod"] + prev_pods = prev_system_state["pod"] + curr_system_state["pod"] = { k: v for k, v in curr_pods.items() - if v['metadata']['owner_references'][0]['kind'] != 'Job' + if v["metadata"]["owner_references"][0]["kind"] != "Job" } - prev_system_state['pod'] = { + prev_system_state["pod"] = { k: v for k, v in prev_pods.items() - if v['metadata']['owner_references'][0]['kind'] != 'Job' + if v["metadata"]["owner_references"][0]["kind"] != "Job" } - for name, obj in prev_system_state['secret'].items(): - if 'data' in obj and obj['data'] != None: - for key, data in obj['data'].items(): + for name, obj in prev_system_state["secret"].items(): + if "data" in obj and obj["data"] != None: + for key, data in obj["data"].items(): try: - obj['data'][key] = json.loads(data) + obj["data"][key] = json.loads(data) except: pass - for name, obj in curr_system_state['secret'].items(): - if 'data' in obj and obj['data'] != None: - for key, data in obj['data'].items(): + for name, obj in curr_system_state["secret"].items(): + if "data" in obj and obj["data"] != None: + for key, data in obj["data"].items(): try: - obj['data'][key] = json.loads(data) + obj["data"][key] = json.loads(data) except: pass # remove custom resource from both states - curr_system_state.pop('custom_resource_spec', None) - prev_system_state.pop('custom_resource_spec', None) - curr_system_state.pop('custom_resource_status', None) - prev_system_state.pop('custom_resource_status', None) - curr_system_state.pop('pvc', None) - prev_system_state.pop('pvc', None) + curr_system_state.pop("custom_resource_spec", None) + prev_system_state.pop("custom_resource_spec", None) + curr_system_state.pop("custom_resource_status", None) + prev_system_state.pop("custom_resource_status", None) + curr_system_state.pop("pvc", None) + prev_system_state.pop("pvc", None) # remove fields that are not deterministic exclude_paths = [ @@ -181,34 +193,66 @@ def check_state_equality(snapshot: Snapshot, prev_snapshot: Snapshot) -> OracleR r"\['deployment_pods'\].*\['metadata'\]\['owner_references'\]\[.*\]\['name'\]", ] - diff = DeepDiff(prev_system_state, - curr_system_state, - exclude_regex_paths=exclude_paths, - view='tree') + diff = DeepDiff( + prev_system_state, + curr_system_state, + exclude_regex_paths=exclude_paths, + view="tree", + ) if diff: - logger.debug(f"failed attempt recovering to seed state - system state diff: {diff}") - return RecoveryResult(delta=diff, from_=prev_system_state, to_=curr_system_state) + logger.debug( + f"failed attempt recovering to seed state - system state diff: {diff}" + ) + return RecoveryResult( + delta=diff, from_=prev_system_state, to_=curr_system_state + ) return PassResult() class TrialRunner: - def __init__(self, context: dict, input_model: InputModel, deploy: Deploy, runner_t: type, - checker_t: type, wait_time: int, custom_on_init: List[callable], - custom_oracle: List[callable], workdir: str, cluster: base.KubernetesEngine, - worker_id: int, sequence_base: int, dryrun: bool, is_reproduce: bool, - apply_testcase_f: FunctionType, acto_namespace: int) -> None: + def __init__( + self, + context: dict, + input_model: InputModel, + deploy: Deploy, + runner_t: type, + checker_t: type, + wait_time: int, + custom_on_init: List[callable], + custom_oracle: List[callable], + workdir: str, + cluster: base.KubernetesEngine, + worker_id: int, + sequence_base: int, + dryrun: bool, + is_reproduce: bool, + apply_testcase_f: FunctionType, + acto_namespace: int, + images_archive: Optional[str] = None, + ) -> None: self.context = context self.workdir = workdir self.base_workdir = workdir self.cluster = cluster - self.images_archive = os.path.join(workdir, 'images.tar') + + if images_archive is None: + self.images_archive = ImageHelper.prepare_image_archive( + self.context["preload_images"], + ) + else: + self.images_archive = images_archive + self.worker_id = worker_id self.sequence_base = sequence_base # trial number to start with - self.context_name = cluster.get_context_name(f"acto-{acto_namespace}-cluster-{worker_id}") - self.kubeconfig = os.path.join(os.path.expanduser('~'), '.kube', self.context_name) + self.context_name = cluster.get_context_name( + f"acto-{acto_namespace}-cluster-{worker_id}" + ) + self.kubeconfig = os.path.join( + os.path.expanduser("~"), ".kube", self.context_name + ) self.cluster_name = f"acto-{acto_namespace}-cluster-{worker_id}" self.input_model = input_model self.deploy = deploy @@ -240,57 +284,79 @@ def run(self, errors: List[RunResult], mode: str = InputModel.NORMAL): while True: if self.input_model.is_empty(): - logger.info('Test finished') + logger.info("Test finished") break trial_start_time = time.time() - self.cluster.restart_cluster(self.cluster_name, self.kubeconfig, CONST.K8S_VERSION) + self.cluster.restart_cluster( + self.cluster_name, self.kubeconfig, CONST.K8S_VERSION + ) apiclient = kubernetes_client(self.kubeconfig, self.context_name) self.cluster.load_images(self.images_archive, self.cluster_name) trial_k8s_bootstrap_time = time.time() - deployed = self.deploy.deploy_with_retry(self.context, self.kubeconfig, - self.context_name) + deployed = self.deploy.deploy_with_retry( + self.context, self.kubeconfig, self.context_name + ) if not deployed: - logger.info('Not deployed. Try again!') + logger.info("Not deployed. Try again!") continue operator_deploy_time = time.time() trial_dir = os.path.join( self.workdir, - 'trial-%02d-%04d' % (self.worker_id + self.sequence_base, self.curr_trial)) + "trial-%02d-%04d" + % (self.worker_id + self.sequence_base, self.curr_trial), + ) os.makedirs(trial_dir, exist_ok=True) - trial_err, num_tests = self.run_trial(trial_dir=trial_dir, curr_trial=self.curr_trial) + trial_err, num_tests = self.run_trial( + trial_dir=trial_dir, curr_trial=self.curr_trial + ) self.snapshots = [] trial_finished_time = time.time() - trial_elapsed = time.strftime("%H:%M:%S", time.gmtime(trial_finished_time - trial_start_time)) - logger.info('Trial %d finished, completed in %s' % (self.curr_trial, trial_elapsed)) - logger.info(f'Kubernetes bootstrap: {trial_k8s_bootstrap_time - trial_start_time}') - logger.info(f'Operator deploy: {operator_deploy_time - trial_k8s_bootstrap_time}') - logger.info(f'Trial run: {trial_finished_time - operator_deploy_time}') - logger.info('---------------------------------------\n') - - delete_operator_pod(apiclient, self.context['namespace']) - save_result(trial_dir, trial_err, num_tests, trial_elapsed, { - 'k8s_bootstrap': trial_k8s_bootstrap_time - trial_start_time, - 'operator_deploy': operator_deploy_time - trial_k8s_bootstrap_time, - 'trial_run': trial_finished_time - operator_deploy_time - }) + trial_elapsed = time.strftime( + "%H:%M:%S", time.gmtime(trial_finished_time - trial_start_time) + ) + logger.info( + "Trial %d finished, completed in %s" % (self.curr_trial, trial_elapsed) + ) + logger.info( + f"Kubernetes bootstrap: {trial_k8s_bootstrap_time - trial_start_time}" + ) + logger.info( + f"Operator deploy: {operator_deploy_time - trial_k8s_bootstrap_time}" + ) + logger.info(f"Trial run: {trial_finished_time - operator_deploy_time}") + logger.info("---------------------------------------\n") + + delete_operator_pod(apiclient, self.context["namespace"]) + save_result( + trial_dir, + trial_err, + num_tests, + trial_elapsed, + { + "k8s_bootstrap": trial_k8s_bootstrap_time - trial_start_time, + "operator_deploy": operator_deploy_time - trial_k8s_bootstrap_time, + "trial_run": trial_finished_time - operator_deploy_time, + }, + ) self.curr_trial = self.curr_trial + 1 errors.append(trial_err) if self.input_model.is_empty(): - logger.info('Test finished') + logger.info("Test finished") break - logger.info('Failed test cases: %s' % - json.dumps(self.discarded_testcases, cls=ActoEncoder, indent=4)) + logger.info( + "Failed test cases: %s" + % json.dumps(self.discarded_testcases, cls=ActoEncoder, indent=4) + ) - def run_trial(self, - trial_dir: str, - curr_trial: int, - num_mutation: int = 10) -> Tuple[ErrorResult, int]: - '''Run a trial starting with the initial input, mutate with the candidate_dict, + def run_trial( + self, trial_dir: str, curr_trial: int, num_mutation: int = 10 + ) -> Tuple[ErrorResult, int]: + """Run a trial starting with the initial input, mutate with the candidate_dict, and mutate for num_mutation times Args: @@ -298,30 +364,39 @@ def run_trial(self, candidate_dict: guides the mutation trial_num: how many trials have been run num_mutation: how many mutations to run at each trial - ''' - oracle_handle = OracleHandle(KubectlClient(self.kubeconfig, self.context_name), - kubernetes_client(self.kubeconfig, self.context_name), - self.context['namespace'], self.snapshots) + """ + oracle_handle = OracleHandle( + KubectlClient(self.kubeconfig, self.context_name), + kubernetes_client(self.kubeconfig, self.context_name), + self.context["namespace"], + self.snapshots, + ) # first run the on_init callbacks if any if self.custom_on_init is not None: for on_init in self.custom_on_init: on_init(oracle_handle) - runner: Runner = self.runner_t(self.context, trial_dir, self.kubeconfig, self.context_name, - self.wait_time) - checker: CheckerSet = self.checker_t(self.context, trial_dir, self.input_model, oracle_handle, self.custom_oracle) + runner: Runner = self.runner_t( + self.context, trial_dir, self.kubeconfig, self.context_name, self.wait_time + ) + checker: CheckerSet = self.checker_t( + self.context, trial_dir, self.input_model, oracle_handle, self.custom_oracle + ) curr_input = self.input_model.get_seed_input() self.snapshots.append(EmptySnapshot(curr_input)) generation = 0 - while generation < num_mutation: # every iteration gets a new list of next tests + while ( + generation < num_mutation + ): # every iteration gets a new list of next tests # update the thread logger - set_thread_logger_prefix(f'trial: {curr_trial}, gen: {generation}') + set_thread_logger_prefix(f"trial: {curr_trial}, gen: {generation}") logger = get_thread_logger(with_prefix=True) - curr_input_with_schema = attach_schema_to_value(self.snapshots[-1].input, - self.input_model.root_schema) + curr_input_with_schema = attach_schema_to_value( + self.snapshots[-1].input, self.input_model.root_schema + ) ready_testcases = [] if generation > 0: @@ -335,40 +410,55 @@ def run_trial(self, break # First make sure all the next tests are valid - for (group, testcase_with_path) in test_groups: # iterate on list of next tests + for ( + group, + testcase_with_path, + ) in test_groups: # iterate on list of next tests field_path_str, testcase = testcase_with_path field_path = json.loads(field_path_str) - testcase_signature = {'field': field_path_str, 'testcase': str(testcase)} - field_curr_value = curr_input_with_schema.get_value_by_path(list(field_path)) + testcase_signature = { + "field": field_path_str, + "testcase": str(testcase), + } + field_curr_value = curr_input_with_schema.get_value_by_path( + list(field_path) + ) if testcase.test_precondition(field_curr_value): # precondition of this testcase satisfies - logger.info('Precondition of %s satisfies', field_path) + logger.info("Precondition of %s satisfies", field_path) ready_testcases.append((group, testcase_with_path)) else: # precondition fails, first run setup - logger.info('Precondition of %s fails, try setup first', field_path_str) + logger.info( + "Precondition of %s fails, try setup first", field_path_str + ) - self.apply_testcase_f(curr_input_with_schema, - field_path, - testcase, - setup=True) + self.apply_testcase_f( + curr_input_with_schema, field_path, testcase, setup=True + ) if not testcase.test_precondition( - curr_input_with_schema.get_value_by_path(list(field_path))): + curr_input_with_schema.get_value_by_path(list(field_path)) + ): # just in case the setup does not work correctly, drop this testcase - logger.error('Setup does not work correctly') + logger.error("Setup does not work correctly") group.discard_testcase(self.discarded_testcases) continue - runResult = TrialRunner.run_and_check(runner, checker, - curr_input_with_schema.raw_value(), - self.snapshots, generation, - testcase_signature, self.dryrun) + runResult = TrialRunner.run_and_check( + runner, + checker, + curr_input_with_schema.raw_value(), + self.snapshots, + generation, + testcase_signature, + self.dryrun, + ) generation += 1 if runResult.is_connection_refused(): - logger.error('Connection refused, exiting') + logger.error("Connection refused, exiting") return runResult, generation is_invalid, _ = runResult.is_invalid() @@ -376,24 +466,28 @@ def run_trial(self, group.discard_testcase(self.discarded_testcases) # before return, run the recovery test case runResult.recovery_result = self.run_recovery( - runner, checker, generation) + runner, checker, generation + ) generation += 1 return runResult, generation elif is_invalid: - logger.info('Setup produced invalid input') + logger.info("Setup produced invalid input") self.snapshots.pop() group.discard_testcase(self.discarded_testcases) - curr_input_with_schema = self.revert(runner, checker, generation) + curr_input_with_schema = self.revert( + runner, checker, generation + ) generation += 1 elif runResult.is_unchanged(): - logger.info('Setup produced unchanged input') + logger.info("Setup produced unchanged input") group.discard_testcase(self.discarded_testcases) elif runResult.is_error(): group.discard_testcase(self.discarded_testcases) # before return, run the recovery test case runResult.recovery_result = self.run_recovery( - runner, checker, generation) + runner, checker, generation + ) generation += 1 return runResult, generation @@ -401,31 +495,40 @@ def run_trial(self, ready_testcases.append((group, testcase_with_path)) if len(ready_testcases) == 0: - logger.info('All setups failed') + logger.info("All setups failed") continue - logger.info('Running bundled testcases') + logger.info("Running bundled testcases") - t = self.run_testcases(curr_input_with_schema, ready_testcases, runner, checker, - generation) + t = self.run_testcases( + curr_input_with_schema, ready_testcases, runner, checker, generation + ) runResult, generation = t is_invalid, _ = runResult.is_invalid() if (not is_invalid and runResult.is_error()) or runResult.is_basic_error(): # before return, run the recovery test case - logger.info('Error result, running recovery') - runResult.recovery_result = self.run_recovery(runner, checker, generation) + logger.info("Error result, running recovery") + runResult.recovery_result = self.run_recovery( + runner, checker, generation + ) generation += 1 return runResult, generation if self.input_model.is_empty(): - logger.info('Input model is empty, break') + logger.info("Input model is empty, break") break return None, generation - def run_testcases(self, curr_input_with_schema, testcases: List[Tuple[TreeNode, TestCase]], - runner, checker, generation) -> Tuple[RunResult, int]: + def run_testcases( + self, + curr_input_with_schema, + testcases: List[Tuple[TreeNode, TestCase]], + runner, + checker, + generation, + ) -> Tuple[RunResult, int]: logger = get_thread_logger(with_prefix=True) testcase_patches = [] @@ -433,18 +536,24 @@ def run_testcases(self, curr_input_with_schema, testcases: List[Tuple[TreeNode, for group, testcase_with_path in testcases: field_path_str, testcase = testcase_with_path field_path = json.loads(field_path_str) - testcase_signature = {'field': field_path_str, 'testcase': str(testcase)} + testcase_signature = {"field": field_path_str, "testcase": str(testcase)} patch = self.apply_testcase_f(curr_input_with_schema, field_path, testcase) # field_node.get_testcases().pop() # finish testcase testcase_patches.append((group, testcase_with_path, patch)) - runResult = TrialRunner.run_and_check(runner, checker, curr_input_with_schema.raw_value(), - self.snapshots, generation, testcase_signature, - self.dryrun) + runResult = TrialRunner.run_and_check( + runner, + checker, + curr_input_with_schema.raw_value(), + self.snapshots, + generation, + testcase_signature, + self.dryrun, + ) generation += 1 if runResult.is_connection_refused(): - logger.error('Connection refused, exiting') + logger.error("Connection refused, exiting") return runResult, generation is_invalid, invalidResult = runResult.is_invalid() @@ -454,7 +563,7 @@ def run_testcases(self, curr_input_with_schema, testcases: List[Tuple[TreeNode, # responsible testcase and re-apply # 1. revert - logger.debug('Invalid input, revert') + logger.debug("Invalid input, revert") self.snapshots.pop() curr_input_with_schema = self.revert(runner, checker, generation) generation += 1 @@ -463,32 +572,43 @@ def run_testcases(self, curr_input_with_schema, testcases: List[Tuple[TreeNode, if len(testcase_patches) == 1: # if only one testcase, then no need to isolate testcase_patches[0][0].finish_testcase() # finish testcase - logger.debug('Only one patch, no need to isolate') + logger.debug("Only one patch, no need to isolate") return runResult, generation else: responsible_field = invalidResult.responsible_field if responsible_field == None: # Unable to pinpoint the exact responsible testcase, try one by one - logger.debug('Unable to pinpoint the exact responsible field, try one by one') + logger.debug( + "Unable to pinpoint the exact responsible field, try one by one" + ) for group, testcase_with_path, patch in testcase_patches: - iso_result, generation = self.run_testcases(curr_input_with_schema, - [(group, testcase_with_path)], - runner, checker, generation) - if (not iso_result.is_invalid()[0] and - iso_result.is_error()) or iso_result.is_basic_error(): + iso_result, generation = self.run_testcases( + curr_input_with_schema, + [(group, testcase_with_path)], + runner, + checker, + generation, + ) + if ( + not iso_result.is_invalid()[0] and iso_result.is_error() + ) or iso_result.is_basic_error(): return iso_result, generation return runResult, generation else: - jsonpatch_path = ''.join('/' + str(item) for item in responsible_field) - logger.debug('Responsible patch path: %s', jsonpatch_path) + jsonpatch_path = "".join( + "/" + str(item) for item in responsible_field + ) + logger.debug("Responsible patch path: %s", jsonpatch_path) # isolate the responsible invalid testcase and re-apply ready_testcases = [] for group, testcase_with_path, patch in testcase_patches: responsible = False for op in patch: - if op['path'] == jsonpatch_path: - logger.info('Determine the responsible field to be %s' % - jsonpatch_path) + if op["path"] == jsonpatch_path: + logger.info( + "Determine the responsible field to be %s" + % jsonpatch_path + ) responsible = True group.finish_testcase() # finish testcase break @@ -498,24 +618,36 @@ def run_testcases(self, curr_input_with_schema, testcases: List[Tuple[TreeNode, return runResult, generation if len(ready_testcases) == len(testcase_patches): - logger.error('Fail to determine the responsible patch, try one by one') + logger.error( + "Fail to determine the responsible patch, try one by one" + ) for group, testcase_with_path, patch in testcase_patches: iso_result, generation = self.run_testcases( - curr_input_with_schema, [(group, testcase_with_path)], runner, - checker, generation) - if (not iso_result.is_invalid()[0] and - iso_result.is_error()) or iso_result.is_basic_error(): + curr_input_with_schema, + [(group, testcase_with_path)], + runner, + checker, + generation, + ) + if ( + not iso_result.is_invalid()[0] and iso_result.is_error() + ) or iso_result.is_basic_error(): return iso_result, generation return runResult, generation else: - logger.debug('Rerunning the remaining ready testcases') - return self.run_testcases(curr_input_with_schema, ready_testcases, runner, - checker, generation) + logger.debug("Rerunning the remaining ready testcases") + return self.run_testcases( + curr_input_with_schema, + ready_testcases, + runner, + checker, + generation, + ) else: if not self.is_reproduce: for patch in testcase_patches: patch[0].finish_testcase() # finish testcase - ''' Commented out because no use for now + """ Commented out because no use for now if isinstance(result, UnchangedInputResult): pass elif isinstance(result, ErrorResult): @@ -526,50 +658,56 @@ def run_testcases(self, curr_input_with_schema, testcases: List[Tuple[TreeNode, else: logger.error('Unknown return value, abort') quit() - ''' + """ return runResult, generation - def run_and_check(runner: Runner, - checker: CheckerSet, - input: dict, - snapshots: list, - generation: int, - testcase_signature: dict, - dryrun: bool, - revert: bool = False) -> RunResult: + def run_and_check( + runner: Runner, + checker: CheckerSet, + input: dict, + snapshots: list, + generation: int, + testcase_signature: dict, + dryrun: bool, + revert: bool = False, + ) -> RunResult: logger = get_thread_logger(with_prefix=True) - logger.debug('Run and check') + logger.debug("Run and check") retry = 0 while True: snapshot, err = runner.run(input, generation) - runResult = checker.check(snapshot, - snapshots[-1], - revert, - generation, - testcase_signature=testcase_signature) + runResult = checker.check( + snapshot, + snapshots[-1], + revert, + generation, + testcase_signature=testcase_signature, + ) snapshots.append(snapshot) if runResult.is_connection_refused(): # Connection refused due to webhook not ready, let's wait for a bit - logger.info('Connection failed. Retry the test after 60 seconds') + logger.info("Connection failed. Retry the test after 60 seconds") time.sleep(60) retry += 1 if retry > 2: - logger.error('Connection failed too many times. Abort') + logger.error("Connection failed too many times. Abort") break else: break return runResult - def run_recovery(self, runner: Runner, checker: CheckerSet, generation: int) -> OracleResult: - '''Runs the recovery test case after an error is reported''' + def run_recovery( + self, runner: Runner, checker: CheckerSet, generation: int + ) -> OracleResult: + """Runs the recovery test case after an error is reported""" logger = get_thread_logger(with_prefix=True) RECOVERY_SNAPSHOT = -2 # the immediate snapshot before the error - logger.debug('Running recovery') + logger.debug("Running recovery") recovery_input = self.snapshots[RECOVERY_SNAPSHOT].input snapshot, err = runner.run(recovery_input, generation=-1) result = check_state_equality(snapshot, self.snapshots[RECOVERY_SNAPSHOT]) @@ -577,62 +715,77 @@ def run_recovery(self, runner: Runner, checker: CheckerSet, generation: int) -> return result def revert(self, runner, checker, generation) -> ValueWithSchema: - curr_input_with_schema = attach_schema_to_value(self.snapshots[-1].input, - self.input_model.root_schema) - - testcase_sig = {'field': '', 'testcase': 'revert'} - - result = TrialRunner.run_and_check(runner, - checker, - curr_input_with_schema.raw_value(), - self.snapshots, - generation, - testcase_sig, - self.dryrun, - revert=True) + curr_input_with_schema = attach_schema_to_value( + self.snapshots[-1].input, self.input_model.root_schema + ) + + testcase_sig = {"field": "", "testcase": "revert"} + + result = TrialRunner.run_and_check( + runner, + checker, + curr_input_with_schema.raw_value(), + self.snapshots, + generation, + testcase_sig, + self.dryrun, + revert=True, + ) return curr_input_with_schema class Acto: - def __init__(self, - workdir_path: str, - operator_config: OperatorConfig, - cluster_runtime: str, - enable_analysis: bool, - preload_images_: list, - context_file: str, - helper_crd: str, - num_workers: int, - num_cases: int, - dryrun: bool, - analysis_only: bool, - is_reproduce: bool, - input_model: type, - apply_testcase_f: FunctionType, - reproduce_dir: str = None, - delta_from: str = None, - mount: list = None, - focus_fields: list = None, - acto_namespace: int = 0) -> None: + def __init__( + self, + workdir_path: str, + operator_config: OperatorConfig, + cluster_runtime: str, + enable_analysis: bool, + preload_images_: list, + context_file: str, + helper_crd: str, + num_workers: int, + num_cases: int, + dryrun: bool, + analysis_only: bool, + is_reproduce: bool, + input_model: type, + apply_testcase_f: FunctionType, + reproduce_dir: str = None, + delta_from: str = None, + mount: list = None, + focus_fields: list = None, + acto_namespace: int = 0, + images_archive: Optional[str] = None, + ) -> None: logger = get_thread_logger(with_prefix=False) try: - with open(operator_config.seed_custom_resource, 'r') as cr_file: + with open(operator_config.seed_custom_resource, "r") as cr_file: self.seed = yaml.load(cr_file, Loader=yaml.FullLoader) except: - logger.error('Failed to read seed yaml, aborting') + logger.error("Failed to read seed yaml, aborting") quit() - if operator_config.deploy.method == 'HELM': - deploy = Deploy(DeployMethod.HELM, operator_config.deploy.file, - operator_config.deploy.init).new() - elif operator_config.deploy.method == 'YAML': - deploy = Deploy(DeployMethod.YAML, operator_config.deploy.file, - operator_config.deploy.init).new() - elif operator_config.deploy.method == 'KUSTOMIZE': - deploy = Deploy(DeployMethod.KUSTOMIZE, operator_config.deploy.file, - operator_config.deploy.init).new() + if operator_config.deploy.method == "HELM": + deploy = Deploy( + DeployMethod.HELM, + operator_config.deploy.file, + operator_config.deploy.init, + ).new() + elif operator_config.deploy.method == "YAML": + deploy = Deploy( + DeployMethod.YAML, + operator_config.deploy.file, + operator_config.deploy.init, + ).new() + elif operator_config.deploy.method == "KUSTOMIZE": + deploy = Deploy( + DeployMethod.KUSTOMIZE, + operator_config.deploy.file, + operator_config.deploy.init, + ).new() else: raise UnknownDeployMethodError() @@ -642,7 +795,8 @@ def __init__(self, cluster = k3d.K3D() else: logger.warning( - f"Cluster Runtime {cluster_runtime} is not supported, defaulted to use kind") + f"Cluster Runtime {cluster_runtime} is not supported, defaulted to use kind" + ) cluster = kind.Kind(acto_namespace=acto_namespace) self.cluster = cluster @@ -650,13 +804,17 @@ def __init__(self, self.operator_config = operator_config self.crd_name = operator_config.crd_name self.workdir_path = workdir_path - self.images_archive = os.path.join(workdir_path, 'images.tar') self.num_workers = num_workers self.dryrun = dryrun self.is_reproduce = is_reproduce self.apply_testcase_f = apply_testcase_f self.reproduce_dir = reproduce_dir self.acto_namespace = acto_namespace + self.images_archive = images_archive + + if images_archive is not None: + # early check if the archive exists + assert os.path.exists(images_archive) self.runner_type = Runner self.checker_type = CheckerSet @@ -665,32 +823,41 @@ def __init__(self, # generate configuration files for the cluster runtime self.cluster.configure_cluster(operator_config.num_nodes, CONST.K8S_VERSION) - self.__learn(context_file=context_file, helper_crd=helper_crd, analysis_only=analysis_only) + self.__learn( + context_file=context_file, + helper_crd=helper_crd, + analysis_only=analysis_only, + ) # Add additional preload images from arguments if preload_images_ != None: - self.context['preload_images'].update(preload_images_) + self.context["preload_images"].update(preload_images_) # Apply custom fields if operator_config.analysis != None: - used_fields = self.context['analysis_result']['used_fields'] + used_fields = self.context["analysis_result"]["used_fields"] else: used_fields = None - self.input_model: InputModel = input_model(self.context['crd']['body'], - used_fields, - operator_config.example_dir, num_workers, - num_cases, self.reproduce_dir, mount) + self.input_model: InputModel = input_model( + self.context["crd"]["body"], + used_fields, + operator_config.example_dir, + num_workers, + num_cases, + self.reproduce_dir, + mount, + ) self.input_model.initialize(self.seed) applied_custom_k8s_fields = False if operator_config.k8s_fields is not None: module = importlib.import_module(operator_config.k8s_fields) - if hasattr(module,'BLACKBOX') and actoConfig.mode == 'blackbox': + if hasattr(module, "BLACKBOX") and actoConfig.mode == "blackbox": applied_custom_k8s_fields = True for k8s_field in module.BLACKBOX: self.input_model.apply_k8s_schema(k8s_field) - elif hasattr(module,'WHITEBOX') and actoConfig.mode == 'whitebox': + elif hasattr(module, "WHITEBOX") and actoConfig.mode == "whitebox": applied_custom_k8s_fields = True for k8s_field in module.WHITEBOX: self.input_model.apply_k8s_schema(k8s_field) @@ -699,12 +866,12 @@ def __init__(self, # from CRD to K8s schema tuples = find_all_matched_schemas_type(self.input_model.root_schema) for tuple in tuples: - logger.debug(f'Found matched schema: {tuple[0].path} -> {tuple[1]}') + logger.debug(f"Found matched schema: {tuple[0].path} -> {tuple[1]}") k8s_schema = K8sField(tuple[0].path, tuple[1]) self.input_model.apply_k8s_schema(k8s_schema) if operator_config.custom_fields != None: - if actoConfig.mode == 'blackbox': + if actoConfig.mode == "blackbox": pruned_list = [] module = importlib.import_module(operator_config.custom_fields) for custom_field in module.custom_fields: @@ -720,7 +887,9 @@ def __init__(self, pruned_list = [] tuples = find_all_matched_schemas_type(self.input_model.root_schema) for tuple in tuples: - custom_field = OverSpecifiedField(tuple[0].path, array=isinstance(tuple[1], ArrayGenerator)) + custom_field = OverSpecifiedField( + tuple[0].path, array=isinstance(tuple[1], ArrayGenerator) + ) self.input_model.apply_custom_field(custom_field) self.sequence_base = 20 if delta_from else 0 @@ -736,10 +905,11 @@ def __init__(self, # Generate test cases testplan_path = None if delta_from != None: - testplan_path = os.path.join(delta_from, 'test_plan.json') - self.test_plan = self.input_model.generate_test_plan(testplan_path, - focus_fields=focus_fields) - with open(os.path.join(self.workdir_path, 'test_plan.json'), 'w') as plan_file: + testplan_path = os.path.join(delta_from, "test_plan.json") + self.test_plan = self.input_model.generate_test_plan( + testplan_path, focus_fields=focus_fields + ) + with open(os.path.join(self.workdir_path, "test_plan.json"), "w") as plan_file: json.dump(self.test_plan, plan_file, cls=ActoEncoder, indent=4) def __learn(self, context_file, helper_crd, analysis_only=False): @@ -748,113 +918,177 @@ def __learn(self, context_file, helper_crd, analysis_only=False): learn_start_time = time.time() if os.path.exists(context_file): - logger.info('Loading context from file') - with open(context_file, 'r') as context_fin: + logger.info("Loading context from file") + with open(context_file, "r") as context_fin: self.context = json.load(context_fin) - self.context['preload_images'] = set(self.context['preload_images']) + self.context["preload_images"] = set(self.context["preload_images"]) if analysis_only and self.operator_config.analysis != None: - logger.info('Only run learning analysis') + logger.info("Only run learning analysis") with tempfile.TemporaryDirectory() as project_src: subprocess.run( - ['git', 'clone', self.operator_config.analysis.github_link, project_src]) - subprocess.run([ - 'git', '-C', project_src, 'checkout', self.operator_config.analysis.commit - ]) + [ + "git", + "clone", + self.operator_config.analysis.github_link, + project_src, + ] + ) + subprocess.run( + [ + "git", + "-C", + project_src, + "checkout", + self.operator_config.analysis.commit, + ] + ) if self.operator_config.analysis.entrypoint != None: - entrypoint_path = os.path.join(project_src, - self.operator_config.analysis.entrypoint) + entrypoint_path = os.path.join( + project_src, self.operator_config.analysis.entrypoint + ) else: entrypoint_path = project_src - self.context['analysis_result'] = analyze(entrypoint_path, - self.operator_config.analysis.type, - self.operator_config.analysis.package) + self.context["analysis_result"] = analyze( + entrypoint_path, + self.operator_config.analysis.type, + self.operator_config.analysis.package, + ) learn_end_time = time.time() - self.context['static_analysis_time'] = learn_end_time - learn_start_time - - with open(context_file, 'w') as context_fout: - json.dump(self.context, - context_fout, - cls=ContextEncoder, - indent=4, - sort_keys=True) + self.context["static_analysis_time"] = learn_end_time - learn_start_time + + with open(context_file, "w") as context_fout: + json.dump( + self.context, + context_fout, + cls=ContextEncoder, + indent=4, + sort_keys=True, + ) else: # Run learning run to collect some information from runtime - logger.info('Starting learning run to collect information') - self.context = {'namespace': '', 'crd': None, 'preload_images': set()} - learn_context_name = self.cluster.get_context_name('learn') - learn_kubeconfig = os.path.join(os.path.expanduser('~'), '.kube', learn_context_name) + logger.info("Starting learning run to collect information") + self.context = {"namespace": "", "crd": None, "preload_images": set()} + learn_context_name = self.cluster.get_context_name("learn") + learn_kubeconfig = os.path.join( + os.path.expanduser("~"), ".kube", learn_context_name + ) while True: - self.cluster.restart_cluster('learn', learn_kubeconfig, CONST.K8S_VERSION) - deployed = self.deploy.deploy_with_retry(self.context, learn_kubeconfig, - learn_context_name) + self.cluster.restart_cluster( + "learn", learn_kubeconfig, CONST.K8S_VERSION + ) + deployed = self.deploy.deploy_with_retry( + self.context, learn_kubeconfig, learn_context_name + ) if deployed: break apiclient = kubernetes_client(learn_kubeconfig, learn_context_name) - runner = Runner(self.context, 'learn', learn_kubeconfig, learn_context_name) + runner = Runner(self.context, "learn", learn_kubeconfig, learn_context_name) runner.run_without_collect(self.operator_config.seed_custom_resource) - update_preload_images(self.context, self.cluster.get_node_list('learn')) - process_crd(self.context, apiclient, KubectlClient(learn_kubeconfig, learn_context_name), - self.crd_name, helper_crd) - self.cluster.delete_cluster('learn', learn_kubeconfig) + update_preload_images(self.context, self.cluster.get_node_list("learn")) + process_crd( + self.context, + apiclient, + KubectlClient(learn_kubeconfig, learn_context_name), + self.crd_name, + helper_crd, + ) + self.cluster.delete_cluster("learn", learn_kubeconfig) run_end_time = time.time() if self.operator_config.analysis != None: with tempfile.TemporaryDirectory() as project_src: subprocess.run( - ['git', 'clone', self.operator_config.analysis.github_link, project_src]) - subprocess.run([ - 'git', '-C', project_src, 'checkout', self.operator_config.analysis.commit - ]) + [ + "git", + "clone", + self.operator_config.analysis.github_link, + project_src, + ] + ) + subprocess.run( + [ + "git", + "-C", + project_src, + "checkout", + self.operator_config.analysis.commit, + ] + ) if self.operator_config.analysis.entrypoint != None: - entrypoint_path = os.path.join(project_src, - self.operator_config.analysis.entrypoint) + entrypoint_path = os.path.join( + project_src, self.operator_config.analysis.entrypoint + ) else: entrypoint_path = project_src - self.context['analysis_result'] = analyze(entrypoint_path, - self.operator_config.analysis.type, - self.operator_config.analysis.package) + self.context["analysis_result"] = analyze( + entrypoint_path, + self.operator_config.analysis.type, + self.operator_config.analysis.package, + ) learn_end_time = time.time() - self.context['static_analysis_time'] = learn_end_time - run_end_time - self.context['learnrun_time'] = run_end_time - learn_start_time - with open(context_file, 'w') as context_fout: - json.dump(self.context, context_fout, cls=ContextEncoder, indent=4, sort_keys=True) - - def run(self, modes: list = ['normal', 'overspecified', 'copiedover']) -> List[RunResult]: + self.context["static_analysis_time"] = learn_end_time - run_end_time + self.context["learnrun_time"] = run_end_time - learn_start_time + with open(context_file, "w") as context_fout: + json.dump( + self.context, + context_fout, + cls=ContextEncoder, + indent=4, + sort_keys=True, + ) + + def run( + self, modes: list = ["normal", "overspecified", "copiedover"] + ) -> List[RunResult]: # TODO: return the alarms here logger = get_thread_logger(with_prefix=True) # Build an archive to be preloaded - if len(self.context['preload_images']) > 0: - logger.info('Creating preload images archive') - print_event('Preparing required images...') - # first make sure images are present locally - for image in self.context['preload_images']: - subprocess.run(['docker', 'pull', image], stdout=subprocess.DEVNULL) - subprocess.run(['docker', 'image', 'save', '-o', self.images_archive] + - list(self.context['preload_images']), stdout=subprocess.DEVNULL) + if self.images_archive is not None: + print_event("Using existing image archive...") + elif len(self.context["preload_images"]) > 0: + logger.info("Creating preload images archive") + print_event("Preparing required images...") + ImageHelper.prepare_image_archive( + self.context["preload_images"], + ) start_time = time.time() errors: List[RunResult] = [] runners: List[TrialRunner] = [] for i in range(self.num_workers): - runner = TrialRunner(self.context, self.input_model, self.deploy, self.runner_type, - self.checker_type, self.operator_config.wait_time, - self.custom_on_init, self.custom_oracle, self.workdir_path, - self.cluster, i, self.sequence_base, self.dryrun, - self.is_reproduce, self.apply_testcase_f, self.acto_namespace) + runner = TrialRunner( + self.context, + self.input_model, + self.deploy, + self.runner_type, + self.checker_type, + self.operator_config.wait_time, + self.custom_on_init, + self.custom_oracle, + self.workdir_path, + self.cluster, + i, + self.sequence_base, + self.dryrun, + self.is_reproduce, + self.apply_testcase_f, + self.acto_namespace, + images_archive=self.images_archive, + ) runners.append(runner) - if 'normal' in modes: + if "normal" in modes: threads = [] for runner in runners: t = threading.Thread(target=runner.run, args=([errors])) @@ -866,10 +1100,12 @@ def run(self, modes: list = ['normal', 'overspecified', 'copiedover']) -> List[R normal_time = time.time() - if 'overspecified' in modes: + if "overspecified" in modes: threads = [] for runner in runners: - t = threading.Thread(target=runner.run, args=([errors, InputModel.OVERSPECIFIED])) + t = threading.Thread( + target=runner.run, args=([errors, InputModel.OVERSPECIFIED]) + ) t.start() threads.append(t) @@ -878,10 +1114,12 @@ def run(self, modes: list = ['normal', 'overspecified', 'copiedover']) -> List[R overspecified_time = time.time() - if 'copiedover' in modes: + if "copiedover" in modes: threads = [] for runner in runners: - t = threading.Thread(target=runner.run, args=([errors, InputModel.COPIED_OVER])) + t = threading.Thread( + target=runner.run, args=([errors, InputModel.COPIED_OVER]) + ) t.start() threads.append(t) @@ -893,7 +1131,9 @@ def run(self, modes: list = ['normal', 'overspecified', 'copiedover']) -> List[R if InputModel.ADDITIONAL_SEMANTIC in modes: threads = [] for runner in runners: - t = threading.Thread(target=runner.run, args=([errors, InputModel.ADDITIONAL_SEMANTIC])) + t = threading.Thread( + target=runner.run, args=([errors, InputModel.ADDITIONAL_SEMANTIC]) + ) t.start() threads.append(t) @@ -908,16 +1148,18 @@ def run(self, modes: list = ['normal', 'overspecified', 'copiedover']) -> List[R num_total_failed += len(testcases) testrun_info = { - 'normal_duration': normal_time - start_time, - 'overspecified_duration': overspecified_time - normal_time, - 'copied_over_duration': additional_semantic_time - overspecified_time, - 'additional_semantic_duration': end_time - additional_semantic_time, - 'num_workers': self.num_workers, - 'num_total_testcases': self.input_model.metadata, - 'num_total_failed': num_total_failed, + "normal_duration": normal_time - start_time, + "overspecified_duration": overspecified_time - normal_time, + "copied_over_duration": additional_semantic_time - overspecified_time, + "additional_semantic_duration": end_time - additional_semantic_time, + "num_workers": self.num_workers, + "num_total_testcases": self.input_model.metadata, + "num_total_failed": num_total_failed, } - with open(os.path.join(self.workdir_path, 'testrun_info.json'), 'w') as info_file: + with open( + os.path.join(self.workdir_path, "testrun_info.json"), "w" + ) as info_file: json.dump(testrun_info, info_file, cls=ActoEncoder, indent=4) - logger.info('All tests finished') + logger.info("All tests finished") return errors diff --git a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/post_process/post_diff_test.py b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/post_process/post_diff_test.py index d415e64..097fc66 100644 --- a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/post_process/post_diff_test.py +++ b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/post_process/post_diff_test.py @@ -6,12 +6,11 @@ import os import queue import re -import subprocess import sys import threading import time from copy import deepcopy -from typing import Dict, List +from typing import Dict, List, Optional import pandas as pd from deepdiff import DeepDiff @@ -19,89 +18,113 @@ from deepdiff.model import DiffLevel from deepdiff.operator import BaseOperator -sys.path.append('.') -sys.path.append('..') -from acto.common import (ErrorResult, PassResult, RecoveryResult, RunResult, - invalid_input_message_regex, kubernetes_client) +sys.path.append(".") +sys.path.append("..") +from acto.common import ( + ErrorResult, + PassResult, + RecoveryResult, + invalid_input_message_regex, + kubernetes_client, +) from acto.constant import CONST from acto.deploy import Deploy, DeployMethod from acto.kubernetes_engine import base, kind from acto.runner import Runner from acto.serialization import ActoEncoder -from acto.utils import (OperatorConfig, add_acto_label, get_thread_logger, - handle_excepthook, thread_excepthook) +from acto.utils import ( + OperatorConfig, + add_acto_label, + get_thread_logger, + handle_excepthook, + thread_excepthook, +) +from acto.utils.image_helper import ImageHelper from .post_process import PostProcessor, Step def dict_hash(d: dict) -> int: - '''Hash a dict''' + """Hash a dict""" return hash(json.dumps(d, sort_keys=True)) -def compare_system_equality(curr_system_state: Dict, - prev_system_state: Dict, - additional_exclude_paths: List[str] = []): + +def compare_system_equality( + curr_system_state: Dict, + prev_system_state: Dict, + additional_exclude_paths: List[str] = [], +): logger = get_thread_logger(with_prefix=False) curr_system_state = deepcopy(curr_system_state) prev_system_state = deepcopy(prev_system_state) try: - del curr_system_state['endpoints'] - del prev_system_state['endpoints'] - del curr_system_state['job'] - del prev_system_state['job'] + del curr_system_state["endpoints"] + del prev_system_state["endpoints"] + del curr_system_state["job"] + del prev_system_state["job"] except: return PassResult() # remove pods that belong to jobs from both states to avoid observability problem - curr_pods = curr_system_state['pod'] - prev_pods = prev_system_state['pod'] + curr_pods = curr_system_state["pod"] + prev_pods = prev_system_state["pod"] new_pods = {} for k, v in curr_pods.items(): - if 'metadata' in v and 'owner_references' in v[ - 'metadata'] and v['metadata']['owner_references'] != None and v['metadata'][ - 'owner_references'][0]['kind'] != 'Job': + if ( + "metadata" in v + and "owner_references" in v["metadata"] + and v["metadata"]["owner_references"] != None + and v["metadata"]["owner_references"][0]["kind"] != "Job" + ): new_pods[k] = v - curr_system_state['pod'] = new_pods + curr_system_state["pod"] = new_pods new_pods = {} for k, v in prev_pods.items(): - if 'metadata' in v and 'owner_references' in v[ - 'metadata'] and v['metadata']['owner_references'] != None and v['metadata'][ - 'owner_references'][0]['kind'] != 'Job': + if ( + "metadata" in v + and "owner_references" in v["metadata"] + and v["metadata"]["owner_references"] != None + and v["metadata"]["owner_references"][0]["kind"] != "Job" + ): new_pods[k] = v - prev_system_state['pod'] = new_pods + prev_system_state["pod"] = new_pods - for name, obj in prev_system_state['secret'].items(): - if 'data' in obj and obj['data'] != None: - for key, data in obj['data'].items(): + for name, obj in prev_system_state["secret"].items(): + if "data" in obj and obj["data"] != None: + for key, data in obj["data"].items(): try: - obj['data'][key] = json.loads(data) + obj["data"][key] = json.loads(data) except: pass - for name, obj in curr_system_state['secret'].items(): - if 'data' in obj and obj['data'] != None: - for key, data in obj['data'].items(): + for name, obj in curr_system_state["secret"].items(): + if "data" in obj and obj["data"] != None: + for key, data in obj["data"].items(): try: - obj['data'][key] = json.loads(data) + obj["data"][key] = json.loads(data) except: pass - - if len(curr_system_state['secret']) != len(prev_system_state['secret']): + + if len(curr_system_state["secret"]) != len(prev_system_state["secret"]): logger.debug(f"failed attempt recovering to seed state - secret count mismatch") return RecoveryResult( - delta=DeepDiff(len(curr_system_state['secret']), len(prev_system_state['secret'])), - from_=prev_system_state, to_=curr_system_state) + delta=DeepDiff( + len(curr_system_state["secret"]), len(prev_system_state["secret"]) + ), + from_=prev_system_state, + to_=curr_system_state, + ) # remove custom resource from both states - curr_system_state.pop('custom_resource_spec', None) - prev_system_state.pop('custom_resource_spec', None) - curr_system_state.pop('custom_resource_status', None) - prev_system_state.pop('custom_resource_status', None) - curr_system_state.pop('pvc', None) - prev_system_state.pop('pvc', None) + curr_system_state.pop("custom_resource_spec", None) + prev_system_state.pop("custom_resource_spec", None) + curr_system_state.pop("custom_resource_status", None) + prev_system_state.pop("custom_resource_status", None) + curr_system_state.pop("pvc", None) + prev_system_state.pop("pvc", None) # remove fields that are not deterministic exclude_paths = [ @@ -151,16 +174,20 @@ def compare_system_equality(curr_system_state: Dict, iterable_compare_func=compare_func, custom_operators=[ NameOperator(r".*\['name'\]$"), - TypeChangeOperator(r".*\['annotations'\]$") + TypeChangeOperator(r".*\['annotations'\]$"), ], - view='tree', + view="tree", ) postprocess_deepdiff(diff) if diff: - logger.debug(f"failed attempt recovering to seed state - system state diff: {diff}") - return RecoveryResult(delta=diff, from_=prev_system_state, to_=curr_system_state) + logger.debug( + f"failed attempt recovering to seed state - system state diff: {diff}" + ) + return RecoveryResult( + delta=diff, from_=prev_system_state, to_=curr_system_state + ) return PassResult() @@ -168,37 +195,37 @@ def compare_system_equality(curr_system_state: Dict, def postprocess_deepdiff(diff): # ignore PVC add/removal, because PVC can be intentially left behind logger = get_thread_logger(with_prefix=False) - if 'dictionary_item_removed' in diff: + if "dictionary_item_removed" in diff: new_removed_items = [] - for removed_item in diff['dictionary_item_removed']: - if removed_item.path(output_format='list')[0] == 'pvc': + for removed_item in diff["dictionary_item_removed"]: + if removed_item.path(output_format="list")[0] == "pvc": logger.debug(f"ignoring removed pvc {removed_item}") else: new_removed_items.append(removed_item) if len(new_removed_items) == 0: - del diff['dictionary_item_removed'] + del diff["dictionary_item_removed"] else: - diff['dictionary_item_removed'] = new_removed_items + diff["dictionary_item_removed"] = new_removed_items - if 'dictionary_item_added' in diff: + if "dictionary_item_added" in diff: new_removed_items = [] - for removed_item in diff['dictionary_item_added']: - if removed_item.path(output_format='list')[0] == 'pvc': + for removed_item in diff["dictionary_item_added"]: + if removed_item.path(output_format="list")[0] == "pvc": logger.debug(f"ignoring added pvc {removed_item}") else: new_removed_items.append(removed_item) if len(new_removed_items) == 0: - del diff['dictionary_item_added'] + del diff["dictionary_item_added"] else: - diff['dictionary_item_added'] = new_removed_items + diff["dictionary_item_added"] = new_removed_items def compare_func(x, y, level: DiffLevel = None): try: - if 'name' not in x or 'name' not in y: - return x['key'] == y['key'] and x['operator'] == y['operator'] - x_name = x['name'] - y_name = y['name'] + if "name" not in x or "name" not in y: + return x["key"] == y["key"] and x["operator"] == y["operator"] + x_name = x["name"] + y_name = y["name"] if len(x_name) < 5 or len(y_name) < 5: return x_name == y_name else: @@ -214,8 +241,9 @@ def give_up_diffing(self, level, diff_instance): y_name = level.t2 if x_name == None or y_name == None: return False - if re.search(r"^.+-([A-Za-z0-9]{5})$", x_name) and re.search(r"^.+-([A-Za-z0-9]{5})$", - y_name): + if re.search(r"^.+-([A-Za-z0-9]{5})$", x_name) and re.search( + r"^.+-([A-Za-z0-9]{5})$", y_name + ): return x_name[:5] == y_name[:5] return False @@ -230,39 +258,56 @@ def give_up_diffing(self, level, diff_instance): level.t1 = [] elif level.t2 == None: if isinstance(level.t1, dict): - logging.info('t2 is None, t1 is dict') + logging.info("t2 is None, t1 is dict") level.t2 = {} elif isinstance(level.t1, list): level.t2 = [] return False - def get_nondeterministic_fields(s1, s2, additional_exclude_paths): nondeterministic_fields = [] - result = compare_system_equality(s1, s2, additional_exclude_paths=additional_exclude_paths) + result = compare_system_equality( + s1, s2, additional_exclude_paths=additional_exclude_paths + ) if isinstance(result, RecoveryResult): diff = result.delta for diff_type, diffs in diff.items(): - if diff_type == 'dictionary_item_removed': + if diff_type == "dictionary_item_removed": for diff_field in diffs: - nondeterministic_fields.append(diff_field.path(output_format='list')) - elif diff_type == 'dictionary_item_added': + nondeterministic_fields.append( + diff_field.path(output_format="list") + ) + elif diff_type == "dictionary_item_added": for diff_field in diffs: - nondeterministic_fields.append(diff_field.path(output_format='list')) - elif diff_type == 'values_changed': + nondeterministic_fields.append( + diff_field.path(output_format="list") + ) + elif diff_type == "values_changed": for diff_field in diffs: - nondeterministic_fields.append(diff_field.path(output_format='list')) - elif diff_type == 'type_changes': + nondeterministic_fields.append( + diff_field.path(output_format="list") + ) + elif diff_type == "type_changes": for diff_field in diffs: - nondeterministic_fields.append(diff_field.path(output_format='list')) + nondeterministic_fields.append( + diff_field.path(output_format="list") + ) return nondeterministic_fields class AdditionalRunner: - def __init__(self, context: Dict, deploy: Deploy, workdir: str, cluster: base.KubernetesEngine, - worker_id, acto_namespace: int): + def __init__( + self, + context: Dict, + deploy: Deploy, + workdir: str, + cluster: base.KubernetesEngine, + worker_id, + acto_namespace: int, + images_archive: Optional[str] = None, + ): self._context = context self._deploy = deploy @@ -270,30 +315,48 @@ def __init__(self, context: Dict, deploy: Deploy, workdir: str, cluster: base.Ku self._cluster = cluster self._worker_id = worker_id self._cluster_name = f"acto-{acto_namespace}-cluster-{worker_id}" - self._context_name = cluster.get_context_name(f"acto-{acto_namespace}-cluster-{worker_id}") - self._kubeconfig = os.path.join(os.path.expanduser('~'), '.kube', self._context_name) + self._context_name = cluster.get_context_name( + f"acto-{acto_namespace}-cluster-{worker_id}" + ) + self._kubeconfig = os.path.join( + os.path.expanduser("~"), ".kube", self._context_name + ) self._generation = 0 + if images_archive is None: + self._images_archive = ImageHelper.prepare_image_archive( + self._context["preload_images"], + ) + else: + self._images_archive = images_archive + def run_cr(self, cr, trial, gen): - self._cluster.restart_cluster(self._cluster_name, self._kubeconfig, CONST.K8S_VERSION) + self._cluster.restart_cluster( + self._cluster_name, self._kubeconfig, CONST.K8S_VERSION + ) apiclient = kubernetes_client(self._kubeconfig, self._context_name) - deployed = self._deploy.deploy_with_retry(self._context, self._kubeconfig, - self._context_name) + deployed = self._deploy.deploy_with_retry( + self._context, self._kubeconfig, self._context_name + ) add_acto_label(apiclient, self._context) - trial_dir = os.path.join(self._workdir, 'trial-%02d' % self._worker_id) + trial_dir = os.path.join(self._workdir, "trial-%02d" % self._worker_id) os.makedirs(trial_dir, exist_ok=True) runner = Runner(self._context, trial_dir, self._kubeconfig, self._context_name) snapshot, err = runner.run(cr, generation=self._generation) difftest_result = { - 'input_digest': hashlib.md5(json.dumps(cr, sort_keys=True).encode("utf-8")).hexdigest(), - 'snapshot': snapshot.to_dict(), - 'originals': { - 'trial': trial, - 'gen': gen, + "input_digest": hashlib.md5( + json.dumps(cr, sort_keys=True).encode("utf-8") + ).hexdigest(), + "snapshot": snapshot.to_dict(), + "originals": { + "trial": trial, + "gen": gen, }, } - difftest_result_path = os.path.join(trial_dir, 'difftest-%03d.json' % self._generation) - with open(difftest_result_path, 'w') as f: + difftest_result_path = os.path.join( + trial_dir, "difftest-%03d.json" % self._generation + ) + with open(difftest_result_path, "w") as f: json.dump(difftest_result, f, cls=ActoEncoder, indent=6) return snapshot @@ -301,8 +364,17 @@ def run_cr(self, cr, trial, gen): class DeployRunner: - def __init__(self, workqueue: multiprocessing.Queue, context: Dict, deploy: Deploy, - workdir: str, cluster: base.KubernetesEngine, worker_id, acto_namespace: int): + def __init__( + self, + workqueue: multiprocessing.Queue, + context: Dict, + deploy: Deploy, + workdir: str, + cluster: base.KubernetesEngine, + worker_id, + acto_namespace: int, + images_archive: Optional[str] = None, + ): self._workqueue = workqueue self._context = context self._deploy = deploy @@ -310,27 +382,40 @@ def __init__(self, workqueue: multiprocessing.Queue, context: Dict, deploy: Depl self._cluster = cluster self._worker_id = worker_id self._cluster_name = f"acto-{acto_namespace}-cluster-{worker_id}" - self._context_name = cluster.get_context_name(f"acto-{acto_namespace}-cluster-{worker_id}") - self._kubeconfig = os.path.join(os.path.expanduser('~'), '.kube', self._context_name) - self._images_archive = os.path.join(workdir, 'images.tar') + self._context_name = cluster.get_context_name( + f"acto-{acto_namespace}-cluster-{worker_id}" + ) + self._kubeconfig = os.path.join( + os.path.expanduser("~"), ".kube", self._context_name + ) + + if images_archive is None: + self._images_archive = ImageHelper.prepare_image_archive( + self._context["preload_images"], + ) + else: + self._images_archive = images_archive def run(self): logger = get_thread_logger(with_prefix=True) generation = 0 - trial_dir = os.path.join(self._workdir, 'trial-%02d' % self._worker_id) + trial_dir = os.path.join(self._workdir, "trial-%02d" % self._worker_id) os.makedirs(trial_dir, exist_ok=True) before_k8s_bootstrap_time = time.time() # Start the cluster and deploy the operator - self._cluster.restart_cluster(self._cluster_name, self._kubeconfig, CONST.K8S_VERSION) + self._cluster.restart_cluster( + self._cluster_name, self._kubeconfig, CONST.K8S_VERSION + ) self._cluster.load_images(self._images_archive, self._cluster_name) apiclient = kubernetes_client(self._kubeconfig, self._context_name) after_k8s_bootstrap_time = time.time() - deployed = self._deploy.deploy_with_retry(self._context, self._kubeconfig, - self._context_name) + deployed = self._deploy.deploy_with_retry( + self._context, self._kubeconfig, self._context_name + ) after_operator_deploy_time = time.time() - trial_dir = os.path.join(self._workdir, 'trial-%02d' % self._worker_id) + trial_dir = os.path.join(self._workdir, "trial-%02d" % self._worker_id) os.makedirs(trial_dir, exist_ok=True) runner = Runner(self._context, trial_dir, self._kubeconfig, self._context_name) while True: @@ -340,80 +425,112 @@ def run(self): except queue.Empty: break - cr = group.iloc[0]['input'] + cr = group.iloc[0]["input"] snapshot, err = runner.run(cr, generation=generation) after_run_time = time.time() err = True difftest_result = { - 'input_digest': group.iloc[0]['input_digest'], - 'snapshot': snapshot.to_dict(), - 'originals': group[['trial', 'gen']].to_dict('records'), - 'time': { - 'k8s_bootstrap': after_k8s_bootstrap_time - before_k8s_bootstrap_time, - 'operator_deploy': after_operator_deploy_time - after_k8s_bootstrap_time, - 'run': after_run_time - after_operator_deploy_time, + "input_digest": group.iloc[0]["input_digest"], + "snapshot": snapshot.to_dict(), + "originals": group[["trial", "gen"]].to_dict("records"), + "time": { + "k8s_bootstrap": after_k8s_bootstrap_time + - before_k8s_bootstrap_time, + "operator_deploy": after_operator_deploy_time + - after_k8s_bootstrap_time, + "run": after_run_time - after_operator_deploy_time, }, } - difftest_result_path = os.path.join(trial_dir, 'difftest-%03d.json' % generation) - with open(difftest_result_path, 'w') as f: + difftest_result_path = os.path.join( + trial_dir, "difftest-%03d.json" % generation + ) + with open(difftest_result_path, "w") as f: json.dump(difftest_result, f, cls=ActoEncoder, indent=6) if err: before_k8s_bootstrap_time = time.time() - logger.error(f'Restart cluster due to error: {err}') + logger.error(f"Restart cluster due to error: {err}") # Start the cluster and deploy the operator - self._cluster.restart_cluster(self._cluster_name, self._kubeconfig, - CONST.K8S_VERSION) + self._cluster.restart_cluster( + self._cluster_name, self._kubeconfig, CONST.K8S_VERSION + ) self._cluster.load_images(self._images_archive, self._cluster_name) apiclient = kubernetes_client(self._kubeconfig, self._context_name) after_k8s_bootstrap_time = time.time() - deployed = self._deploy.deploy_with_retry(self._context, self._kubeconfig, - self._context_name) + deployed = self._deploy.deploy_with_retry( + self._context, self._kubeconfig, self._context_name + ) after_operator_deploy_time = time.time() - runner = Runner(self._context, trial_dir, self._kubeconfig, self._context_name) + runner = Runner( + self._context, trial_dir, self._kubeconfig, self._context_name + ) generation += 1 class PostDiffTest(PostProcessor): - def __init__(self, testrun_dir: str, config: OperatorConfig, ignore_invalid: bool = False, acto_namespace: int = 0): + def __init__( + self, + testrun_dir: str, + config: OperatorConfig, + ignore_invalid: bool = False, + acto_namespace: int = 0, + images_archive: Optional[str] = None, + ): self.acto_namespace = acto_namespace super().__init__(testrun_dir, config) logger = get_thread_logger(with_prefix=True) + if images_archive is None: + self.images_archive = ImageHelper.prepare_image_archive( + self.context["preload_images"], + ) + else: + self.images_archive = images_archive + self.all_inputs = [] for trial, steps in self.trial_to_steps.items(): for step in steps.values(): invalid, _ = step.runtime_result.is_invalid() if invalid and not ignore_invalid: continue - self.all_inputs.append({ - 'trial': trial, - 'gen': step.gen, - 'input': step.input, - 'input_digest': step.input_digest, - 'operator_log': step.operator_log, - 'system_state': step.system_state, - 'cli_output': step.cli_output, - 'runtime_result': step.runtime_result - }) - - self.df = pd.DataFrame(self.all_inputs, - columns=[ - 'trial', 'gen', 'input', 'input_digest', 'operator_log', - 'system_state', 'cli_output', 'runtime_result' - ]) + self.all_inputs.append( + { + "trial": trial, + "gen": step.gen, + "input": step.input, + "input_digest": step.input_digest, + "operator_log": step.operator_log, + "system_state": step.system_state, + "cli_output": step.cli_output, + "runtime_result": step.runtime_result, + } + ) + + self.df = pd.DataFrame( + self.all_inputs, + columns=[ + "trial", + "gen", + "input", + "input_digest", + "operator_log", + "system_state", + "cli_output", + "runtime_result", + ], + ) self.unique_inputs: Dict[str, object] = {} # input digest -> group of steps - groups = self.df.groupby('input_digest') + groups = self.df.groupby("input_digest") for digest, group in groups: self.unique_inputs[digest] = group - logger.info(f'Found {len(self.unique_inputs)} unique inputs') + logger.info(f"Found {len(self.unique_inputs)} unique inputs") print(groups.count()) - series = groups.count().sort_values('trial', ascending=False) + series = groups.count().sort_values("trial", ascending=False) print(series.head()) def post_process(self, workdir: str, num_workers: int = 1): @@ -421,23 +538,30 @@ def post_process(self, workdir: str, num_workers: int = 1): os.mkdir(workdir) cluster = kind.Kind(acto_namespace=self.acto_namespace) cluster.configure_cluster(self.config.num_nodes, CONST.K8S_VERSION) - deploy = Deploy(DeployMethod.YAML, self.config.deploy.file, self.config.deploy.init).new() + deploy = Deploy( + DeployMethod.YAML, self.config.deploy.file, self.config.deploy.init + ).new() # Build an archive to be preloaded - images_archive = os.path.join(workdir, 'images.tar') - if len(self.context['preload_images']) > 0: - # first make sure images are present locally - for image in self.context['preload_images']: - subprocess.run(['docker', 'pull', image]) - subprocess.run(['docker', 'image', 'save', '-o', images_archive] + - list(self.context['preload_images'])) - + ImageHelper.prepare_image_archive( + self.context["preload_images"], + ) + workqueue = multiprocessing.Queue() for unique_input_group in self.unique_inputs.values(): workqueue.put(unique_input_group) runners: List[DeployRunner] = [] for i in range(num_workers): - runner = DeployRunner(workqueue, self.context, deploy, workdir, cluster, i, self.acto_namespace) + runner = DeployRunner( + workqueue, + self.context, + deploy, + workdir, + cluster, + i, + self.acto_namespace, + self.images_archive, + ) runners.append(runner) processes = [] @@ -451,41 +575,50 @@ def post_process(self, workdir: str, num_workers: int = 1): def check(self, workdir: str, num_workers: int = 1): logger = get_thread_logger(with_prefix=True) - logger.info('Additional exclude paths: %s' % self.config.diff_ignore_fields) - trial_dirs = glob.glob(os.path.join(workdir, 'trial-*')) + logger.info("Additional exclude paths: %s" % self.config.diff_ignore_fields) + trial_dirs = glob.glob(os.path.join(workdir, "trial-*")) workqueue = multiprocessing.Queue() for trial_dir in trial_dirs: - for diff_test_result_path in glob.glob(os.path.join(trial_dir, 'difftest-*.json')): + for diff_test_result_path in glob.glob( + os.path.join(trial_dir, "difftest-*.json") + ): workqueue.put(diff_test_result_path) processes = [] for i in range(num_workers): - p = multiprocessing.Process(target=self.check_diff_test_result, - args=(workqueue, workdir, i)) + p = multiprocessing.Process( + target=self.check_diff_test_result, args=(workqueue, workdir, i) + ) p.start() processes.append(p) for p in processes: p.join() - def check_diff_test_result(self, workqueue: multiprocessing.Queue, workdir: str, - worker_id: int): + def check_diff_test_result( + self, workqueue: multiprocessing.Queue, workdir: str, worker_id: int + ): logger = get_thread_logger(with_prefix=True) generation = 0 # for additional runner - additional_runner_dir = os.path.join(workdir, f'additional-runner-{worker_id}') + additional_runner_dir = os.path.join(workdir, f"additional-runner-{worker_id}") cluster = kind.Kind(acto_namespace=self.acto_namespace) cluster.configure_cluster(self.config.num_nodes, CONST.K8S_VERSION) - deploy = Deploy(DeployMethod.YAML, self.config.deploy.file, self.config.deploy.init).new() + deploy = Deploy( + DeployMethod.YAML, self.config.deploy.file, self.config.deploy.init + ).new() - runner = AdditionalRunner(context=self.context, - deploy=deploy, - workdir=additional_runner_dir, - cluster=cluster, - worker_id=worker_id, - acto_namespace=self.acto_namespace) + runner = AdditionalRunner( + context=self.context, + deploy=deploy, + workdir=additional_runner_dir, + cluster=cluster, + worker_id=worker_id, + acto_namespace=self.acto_namespace, + images_archive=self.images_archive, + ) while True: try: @@ -493,40 +626,46 @@ def check_diff_test_result(self, workqueue: multiprocessing.Queue, workdir: str, except queue.Empty: break - with open(diff_test_result_path, 'r') as f: + with open(diff_test_result_path, "r") as f: diff_test_result = json.load(f) - originals = diff_test_result['originals'] + originals = diff_test_result["originals"] group_errs = [] for original in originals: - trial = original['trial'] - gen = original['gen'] + trial = original["trial"] + gen = original["gen"] if gen == 0: continue trial_basename = os.path.basename(trial) original_result = self.trial_to_steps[trial_basename][str(gen)] - step_result = PostDiffTest.check_diff_test_step(diff_test_result, original_result, - self.config, False, runner) + step_result = PostDiffTest.check_diff_test_step( + diff_test_result, original_result, self.config, False, runner + ) if step_result is None: continue else: group_errs.append(step_result) if len(group_errs) > 0: with open( - os.path.join(workdir, - f'compare-results-{diff_test_result["input_digest"]}.json'), - 'w') as result_f: + os.path.join( + workdir, + f'compare-results-{diff_test_result["input_digest"]}.json', + ), + "w", + ) as result_f: json.dump(group_errs, result_f, cls=ActoEncoder, indent=6) return None - def check_diff_test_step(diff_test_result: Dict, - original_result: Step, - config: OperatorConfig, - run_check_indeterministic: bool = False, - additional_runner: AdditionalRunner = None) -> RecoveryResult: + def check_diff_test_step( + diff_test_result: Dict, + original_result: Step, + config: OperatorConfig, + run_check_indeterministic: bool = False, + additional_runner: AdditionalRunner = None, + ) -> RecoveryResult: logger = get_thread_logger(with_prefix=True) trial_dir = original_result.trial_dir gen = original_result.gen @@ -539,51 +678,64 @@ def check_diff_test_step(diff_test_result: Dict, return original_system_state = original_result.system_state - result = compare_system_equality(diff_test_result['snapshot']['system_state'], - original_system_state, config.diff_ignore_fields) + result = compare_system_equality( + diff_test_result["snapshot"]["system_state"], + original_system_state, + config.diff_ignore_fields, + ) if isinstance(result, PassResult): - logger.info(f'Pass diff test for trial {trial_dir} gen {gen}') + logger.info(f"Pass diff test for trial {trial_dir} gen {gen}") return None elif run_check_indeterministic: - add_snapshot = additional_runner.run_cr(diff_test_result['snapshot']['input'], - trial_dir, gen) - indeterministic_fields = get_nondeterministic_fields(original_system_state, - add_snapshot.system_state, - config.diff_ignore_fields) + add_snapshot = additional_runner.run_cr( + diff_test_result["snapshot"]["input"], trial_dir, gen + ) + indeterministic_fields = get_nondeterministic_fields( + original_system_state, + add_snapshot.system_state, + config.diff_ignore_fields, + ) if len(indeterministic_fields) > 0: - logger.info(f'Got additional nondeterministic fields: {indeterministic_fields}') + logger.info( + f"Got additional nondeterministic fields: {indeterministic_fields}" + ) for delta_category in result.delta: for delta in result.delta[delta_category]: - if delta.path(output_format='list') not in indeterministic_fields: - logger.error(f'Fail diff test for trial {trial_dir} gen {gen}') + if ( + delta.path(output_format="list") + not in indeterministic_fields + ): + logger.error( + f"Fail diff test for trial {trial_dir} gen {gen}" + ) error_result = result.to_dict() - error_result['trial'] = trial_dir - error_result['gen'] = gen + error_result["trial"] = trial_dir + error_result["gen"] = gen return error_result else: - logger.error(f'Fail diff test for trial {trial_dir} gen {gen}') + logger.error(f"Fail diff test for trial {trial_dir} gen {gen}") error_result = result.to_dict() - error_result['trial'] = trial_dir - error_result['gen'] = gen + error_result["trial"] = trial_dir + error_result["gen"] = gen return error_result else: - logger.error(f'Fail diff test for trial {trial_dir} gen {gen}') + logger.error(f"Fail diff test for trial {trial_dir} gen {gen}") error_result = result.to_dict() - error_result['trial'] = trial_dir - error_result['gen'] = gen + error_result["trial"] = trial_dir + error_result["gen"] = gen return error_result -if __name__ == '__main__': +if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() - parser.add_argument('--config', type=str, required=True) - parser.add_argument('--testrun-dir', type=str, required=True) - parser.add_argument('--workdir-path', type=str, required=True) - parser.add_argument('--num-workers', type=int, default=1) - parser.add_argument('--checkonly', action='store_true') + parser.add_argument("--config", type=str, required=True) + parser.add_argument("--testrun-dir", type=str, required=True) + parser.add_argument("--workdir-path", type=str, required=True) + parser.add_argument("--num-workers", type=int, default=1) + parser.add_argument("--checkonly", action="store_true") args = parser.parse_args() # Register custom exception hook @@ -592,24 +744,25 @@ def check_diff_test_step(diff_test_result: Dict, global notify_crash_ notify_crash_ = True - log_filename = 'check.log' if args.checkonly else 'test.log' + log_filename = "check.log" if args.checkonly else "test.log" os.makedirs(args.workdir_path, exist_ok=True) # Setting up log infra logging.basicConfig( filename=os.path.join(args.workdir_path, log_filename), level=logging.DEBUG, - filemode='w', - format='%(asctime)s %(levelname)-7s, %(name)s, %(filename)-9s:%(lineno)d, %(message)s') + filemode="w", + format="%(asctime)s %(levelname)-7s, %(name)s, %(filename)-9s:%(lineno)d, %(message)s", + ) logging.getLogger("kubernetes").setLevel(logging.ERROR) logging.getLogger("sh").setLevel(logging.ERROR) start = time.time() - with open(args.config, 'r') as config_file: + with open(args.config, "r") as config_file: config = OperatorConfig(**json.load(config_file)) p = PostDiffTest(testrun_dir=args.testrun_dir, config=config) if not args.checkonly: p.post_process(args.workdir_path, num_workers=args.num_workers) p.check(args.workdir_path, num_workers=args.num_workers) - logging.info(f'Total time: {time.time() - start} seconds') \ No newline at end of file + logging.info(f"Total time: {time.time() - start} seconds") diff --git a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/utils/image_helper.py b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/utils/image_helper.py new file mode 100644 index 0000000..ef39490 --- /dev/null +++ b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/acto/utils/image_helper.py @@ -0,0 +1,49 @@ +import os +import subprocess + +from filelock import FileLock + + +class ImageHelper: + """Helper class for managing Docker images, including pulling and archiving them.""" + + image_archive_prefix = os.path.join(os.getcwd(), ".acto_images") + image_tool = os.getenv("IMAGE_TOOL", "docker") + + @staticmethod + def prepare_image_archive(images: list) -> str: + """ + Prepare an archive of images for testing. + + Args: + images (list[str]): List of image file paths to include in the archive. + + Returns: + str: Path to the created archive. + """ + + digest = hash("".join(sorted(images))) + + archive_name = f"{digest}.tar" + archive_path = os.path.join(ImageHelper.image_archive_prefix, archive_name) + + lock = FileLock(f"{archive_path}.lock") + with lock: + if os.path.exists(archive_path): + return archive_path + + for image in images: + subprocess.run( + [ImageHelper.image_tool, "pull", image], + stdout=subprocess.DEVNULL, + check=True, + ) + os.makedirs(ImageHelper.image_archive_prefix, exist_ok=True) + subprocess.run( + [ImageHelper.image_tool, "image", "save", "-o", archive_path] + + list(images), + stdout=subprocess.DEVNULL, + check=True, + ) + + return archive_path diff --git a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/requirements.txt b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/requirements.txt index 49577c7..5c6d593 100644 --- a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/requirements.txt +++ b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/requirements.txt @@ -9,4 +9,5 @@ requests~=2.31.0 pytest~=7.4.0 pydantic~=1.10.9 pytest-cov~=4.1.0 -tabulate~=0.9.0 \ No newline at end of file +tabulate~=0.9.0 +filelock \ No newline at end of file diff --git a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/test/utils.py b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/test/utils.py index cc1cf65..57829be 100644 --- a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/test/utils.py +++ b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/test/utils.py @@ -100,37 +100,6 @@ class OperatorPrettyName(str, Enum): } all_bugs = { - 'cass-operator': { - 'cassop-315': - BugConfig( - category=BugCateogry.RECOVERY_FAILURE, - dir='test/cassop-315/inputs', - recovery=True, - consequences=[BugConsequence.OPERATION_OUTAGE] - ), - 'cassop-330': - BugConfig( - category=BugCateogry.UNDESIRED_STATE, - dir='test/cassop-330/trial-demo', - declaration=True, - difftest=True, - consequences=[BugConsequence.MISCONFIGURATION] - ), - 'cassop-334': - BugConfig( - category=BugCateogry.RECOVERY_FAILURE, - dir='test/cassop-334', - recovery=True, - consequences=[BugConsequence.RELIABILITY_ISSUE, BugConsequence.OPERATION_OUTAGE] - ), - 'cassop-471': - BugConfig( - category=BugCateogry.UNDESIRED_STATE, - dir='test/cassop-471', - declaration=True, - consequences=[BugConsequence.RELIABILITY_ISSUE] - ), - }, 'cockroach-operator': { 'crdbop-918': BugConfig( From c02b3ed83cf74d32cffdcfe11ea5fee1c36d2455 Mon Sep 17 00:00:00 2001 From: Bogdan-Alexandru Stoica Date: Tue, 23 Dec 2025 00:43:25 -0600 Subject: [PATCH 2/4] fix: relax version comparison in OracleEnvSetup to allow Python 3.8 or higher instead of exactly 3.8 --- .../benchmark/sosp23_acto/_agent_eval/oracle_env_setup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/oracle_env_setup.py b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/oracle_env_setup.py index adc2028..94b56b8 100644 --- a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/oracle_env_setup.py +++ b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/oracle_env_setup.py @@ -34,14 +34,14 @@ class Dependency: name="docker", binary="docker", ), - # Python v3.10+ + # Python v3.8+ Dependency( name="python3", binary="python3", cmd=["python3", "--version"], parse_regex=r"Python\s+([0-9.]+)", - require=(3, 8), compare="eq", + require=(3, 8), compare="gte", ), - # pip3 for Python 3.10+ + # pip3 for Python 3.8+ Dependency( name="pip3", binary="pip3", ), From 0ce918c12f3b1e4c4895e049030afbf88ea3ec67 Mon Sep 17 00:00:00 2001 From: Bogdan-Alexandru Stoica Date: Tue, 23 Dec 2025 01:00:40 -0600 Subject: [PATCH 3/4] fix: add changelog notes to README; remove spurious/unused source files --- .../arteval_bench/data/benchmark/sosp23_acto/acto/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/README.md b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/README.md index 3f794a1..07cda0d 100644 --- a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/README.md +++ b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/acto/README.md @@ -8,6 +8,9 @@ The entire artifact process can take around 2 hours if run with a concurrency of If you have any questions, please contact us via email or HotCRP. +> [!NOTE] +> This artifact version has removed the Reddis and Knative operators since the originally faulty images are no longer available. The current version also applies patch [#8ecdcda](https://github.com/xlab-uiuc/acto/commit/8ecdcda5a51d7c5625e49802bbd1bc75c0cf07ef) that allows Acto to load operators from pre-packaged archives when the original images are no longer availble for download. + # 2. Prerequisites ## Setting up local environment From ebfefc1b29576c87e7e9f48e6f978f4279080cbf Mon Sep 17 00:00:00 2001 From: Bogdan-Alexandru Stoica Date: Sun, 28 Dec 2025 01:33:27 -0600 Subject: [PATCH 4/4] fix: update reference results for running Acto on a smaller subset of operators --- .../_agent_eval/refs/table5.ref.json | 42 +++---------------- .../_agent_eval/refs/table6.ref.json | 10 ++--- .../_agent_eval/refs/table7.ref.json | 16 +++---- 3 files changed, 18 insertions(+), 50 deletions(-) diff --git a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/refs/table5.ref.json b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/refs/table5.ref.json index 9d5d445..9ac0fb1 100644 --- a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/refs/table5.ref.json +++ b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/refs/table5.ref.json @@ -1,13 +1,5 @@ { "operators": [ - { - "operator": "CassOp", - "undesired_state": 2, - "system_error": 0, - "operator_error": 0, - "recovery_failure": 2, - "total": 4 - }, { "operator": "CockroachOp", "undesired_state": 3, @@ -16,22 +8,6 @@ "recovery_failure": 0, "total": 5 }, - { - "operator": "KnativeOp", - "undesired_state": 1, - "system_error": 0, - "operator_error": 2, - "recovery_failure": 0, - "total": 3 - }, - { - "operator": "OCK-RedisOp", - "undesired_state": 4, - "system_error": 1, - "operator_error": 3, - "recovery_failure": 1, - "total": 9 - }, { "operator": "OFC-MongoDBOp", "undesired_state": 3, @@ -56,14 +32,6 @@ "recovery_failure": 0, "total": 3 }, - { - "operator": "SAH-RedisOp", - "undesired_state": 2, - "system_error": 0, - "operator_error": 0, - "recovery_failure": 1, - "total": 3 - }, { "operator": "TiDBOp", "undesired_state": 2, @@ -90,10 +58,10 @@ } ], "totals": { - "undesired_state": 32, - "system_error": 4, - "operator_error": 10, - "recovery_failure": 10, - "total_all": 56 + "undesired_state": 23, + "system_error": 3, + "operator_error": 5, + "recovery_failure": 6, + "total_all": 37 } } \ No newline at end of file diff --git a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/refs/table6.ref.json b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/refs/table6.ref.json index 2eb20d9..29721ec 100644 --- a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/refs/table6.ref.json +++ b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/refs/table6.ref.json @@ -2,11 +2,11 @@ "symptoms": [ { "symptom": "System failure", - "bugs": 5 + "bugs": 2 }, { "symptom": "Reliability issue", - "bugs": 15 + "bugs": 6 }, { "symptom": "Security issue", @@ -14,15 +14,15 @@ }, { "symptom": "Resource issue", - "bugs": 9 + "bugs": 5 }, { "symptom": "Operation outage", - "bugs": 18 + "bugs": 10 }, { "symptom": "Misconfiguration", - "bugs": 15 + "bugs": 11 } ] } \ No newline at end of file diff --git a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/refs/table7.ref.json b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/refs/table7.ref.json index c03be58..a8491b6 100644 --- a/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/refs/table7.ref.json +++ b/benchmarks/arteval_bench/data/benchmark/sosp23_acto/_agent_eval/refs/table7.ref.json @@ -2,23 +2,23 @@ "test_oracles": [ { "test_oracle": "Consistency oracle", - "bugs": 23, - "percentage": 41.07 + "bugs": 14, + "percentage": 42.42 }, { "test_oracle": "Differential oracle for normal state transition", - "bugs": 25, - "percentage": 44.64 + "bugs": 19, + "percentage": 57.58 }, { "test_oracle": "Differential oracle for rollback state transition", - "bugs": 10, - "percentage": 17.86 + "bugs": 5, + "percentage": 15.15 }, { "test_oracle": "Regular error check (e.g., exceptions, error codes)", - "bugs": 14, - "percentage": 25.00 + "bugs": 7, + "percentage": 21.21 } ] } \ No newline at end of file