Source code for varats.experiment.experiment_util

"""Utility module for BenchBuild experiments."""

import os
import random
import traceback
import typing as tp
from abc import abstractmethod
from pathlib import Path

import benchbuild.source as source
from benchbuild.experiment import Experiment
from benchbuild.project import Project
from benchbuild.utils.actions import Step
from benchbuild.utils.cmd import prlimit
from plumbum.commands import ProcessExecutionError

from varats.project.project_util import ProjectBinaryWrapper
from varats.report.report import BaseReport, FileStatusExtension
from varats.revision.revisions import get_tagged_revisions
from varats.utils.settings import vara_cfg, bb_cfg


[docs]class PEErrorHandler(): """Error handler for process execution errors.""" def __init__( self, result_folder: str, error_file_name: str, timeout_duration: tp.Optional[str] = None, delete_files: tp.Optional[tp.List[Path]] = None ): self.__result_folder = result_folder self.__error_file_name = error_file_name self.__timeout_duration = timeout_duration self.__delete_files = delete_files def __call__(self, ex: Exception, func: tp.Callable[..., tp.Any]) -> None: if self.__delete_files is not None: for delete_file in self.__delete_files: try: delete_file.unlink() except FileNotFoundError: pass error_file = Path( "{res_folder}/{res_file}".format( res_folder=self.__result_folder, res_file=self.__error_file_name ) ) if not os.path.exists(self.__result_folder): os.makedirs(self.__result_folder, exist_ok=True) with open(error_file, 'w') as outfile: if isinstance(ex, ProcessExecutionError): if ex.retcode == 124: timeout_duration = str(self.__timeout_duration) extra_error = f"""Command: {str(func)} Timeout after: {timeout_duration} """ outfile.write(extra_error) outfile.flush() outfile.write("-----\nTraceback:\n") traceback.print_exc(file=outfile) raise ex
[docs]class FunctionPEErrorWrapper(): """ Wrap a function call with an exception handler. Args: func: function to be executed handler: function to handle exception """ def __init__( self, func: tp.Callable[..., tp.Any], handler: PEErrorHandler ) -> None: self.__func = func self.__handler = handler def __call__(self, *args: tp.Any, **kwargs: tp.Any) -> tp.Any: try: return self.__func(*args, **kwargs) except Exception as ex: # pylint: disable=broad-except self.__handler(ex, self.__func)
[docs]def exec_func_with_pe_error_handler( func: tp.Callable[..., tp.Any], handler: PEErrorHandler ) -> None: """ Execute a function call with an exception handler. Args: func: function to be executed handler: function to handle exception """ FunctionPEErrorWrapper(func, handler)()
[docs]def get_default_compile_error_wrapped( project: Project, report_type: tp.Type[BaseReport], result_folder_template: str ) -> FunctionPEErrorWrapper: """ Setup the default project compile function with an error handler. Args: project: that will be compiled report_type: that should be generated result_folder_template: where the results will be placed Returns: project compilation function, wrapped with automatic error handling """ result_dir = str(bb_cfg()["varats"]["outfile"]) result_folder = Path( result_folder_template.format( result_dir=result_dir, project_dir=str(project.name) ) ) return FunctionPEErrorWrapper( project.compile, create_default_compiler_error_handler( project, report_type, result_folder ) )
[docs]def create_default_compiler_error_handler( project: Project, report_type: tp.Type[BaseReport], output_folder: tp.Optional[Path] = None, binary: tp.Optional[ProjectBinaryWrapper] = None ) -> PEErrorHandler: """ Create a default PEErrorHandler for compile errors, based on the `project`, `report_type`. Args: project: currently under analysis report_type: that should be generated output_folder: where the errors will be placed binary: if only a specific binary is handled Retruns: a initialized PEErrorHandler """ return create_default_error_handler( project, report_type, FileStatusExtension.CompileError, output_folder, binary )
[docs]def create_default_analysis_failure_handler( project: Project, report_type: tp.Type[BaseReport], output_folder: tp.Optional[Path] = None, binary: tp.Optional[ProjectBinaryWrapper] = None, timeout_duration: tp.Optional[str] = None, ) -> PEErrorHandler: """ Create a default PEErrorHandler for analysis failures, based on the `project`, `report_type`. Args: project: currently under analysis report_type: that should be generated output_folder: where the errors will be placed binary: if only a specific binary is handled timeout_duration: set timeout Retruns: a initialized PEErrorHandler """ return create_default_error_handler( project, report_type, FileStatusExtension.Failed, output_folder, binary, timeout_duration )
[docs]def create_default_error_handler( project: Project, report_type: tp.Type[BaseReport], error_type: FileStatusExtension, output_folder: tp.Optional[Path] = None, binary: tp.Optional[ProjectBinaryWrapper] = None, timeout_duration: tp.Optional[str] = None, ) -> PEErrorHandler: """ Create a default PEErrorHandler based on the `project`, `report_type`. Args: project: currently under analysis report_type: that should be generated error_type: a FSE describing the problem type output_folder: where the errors will be placed timeout_duration: set timeout binary: if only a specific binary is handled Retruns: a initialized PEErrorHandler """ error_output_folder = output_folder if output_folder else Path( f"{bb_cfg()['varats']['outfile']}/{project.name}" ) return PEErrorHandler( str(error_output_folder), report_type.get_file_name( project_name=str(project.name), binary_name=binary.name if binary else "all", project_version=project.version_of_primary, project_uuid=str(project.run_uuid), extension_type=error_type, file_ext=".txt" ), timeout_duration=timeout_duration )
[docs]def wrap_unlimit_stack_size(cmd: tp.Callable[..., tp.Any]) -> tp.Any: """ Wraps a command with prlimit to be executed with max stack size, i.e., setting the soft limit to the hard limit. Args: cmd: command that should be executed with max stack size Returns: wrapped command """ max_stacksize_16gb = 17179869184 return prlimit[f"--stack={max_stacksize_16gb}:", cmd]
VersionType = tp.TypeVar('VersionType')
[docs]class VersionExperiment(Experiment): # type: ignore """Base class for experiments that want to analyze different project revisions."""
[docs] @abstractmethod def actions_for_project(self, project: Project) -> tp.List[Step]: """Get the actions a project wants to run."""
@staticmethod def _sample_num_versions( versions: tp.List[VersionType] ) -> tp.List[VersionType]: if vara_cfg()["experiment"]["sample_limit"].value is None: return versions sample_size = int(vara_cfg()["experiment"]["sample_limit"]) versions = [ versions[i] for i in sorted( random. sample(range(len(versions)), min(sample_size, len(versions))) ) ] return versions
[docs] def sample(self, prj_cls: tp.Type[Project]) -> tp.List[source.VariantContext]: """ Adapt version sampling process if needed, otherwise fallback to default implementation. Args: prj_cls: project class Returns: list of sampled versions """ variants = list(source.product(*prj_cls.SOURCE)) if bool(vara_cfg()["experiment"]["random_order"]): random.shuffle(variants) fs_blacklist = vara_cfg()["experiment"]["file_status_blacklist"].value fs_whitelist = vara_cfg()["experiment"]["file_status_whitelist"].value if fs_blacklist or fs_whitelist: fs_good = set(FileStatusExtension) if not fs_whitelist else set() fs_good -= { FileStatusExtension.get_file_status_from_str(x) for x in fs_blacklist } fs_good |= { FileStatusExtension.get_file_status_from_str(x) for x in fs_whitelist } if not hasattr(self, 'REPORT_TYPE'): raise TypeError( "Experiment sub class does not implement REPORT_TYPE." ) bad_revisions = [ revision for revision, file_status in get_tagged_revisions(prj_cls, getattr(self, 'REPORT_TYPE')) if file_status not in fs_good ] variants = list( filter(lambda var: str(var[0]) not in bad_revisions, variants) ) if not variants: print("Could not find any unprocessed variants.") return [] variants = self._sample_num_versions(variants) if bool(bb_cfg()["versions"]["full"]): return [source.context(*var) for var in variants] return [source.context(*variants[0])]