Source code for varats.report.report

"""The Report module implements basic report functionalities and provides a
minimal interface ``BaseReport`` to implement own reports."""
import re
import shutil
import typing as tp
import weakref
from collections import defaultdict
from enum import Enum
from os import stat_result
from pathlib import Path
from tempfile import TemporaryDirectory

from plumbum import colors
from plumbum.colorlib.styles import Color

from varats.utils.git_util import ShortCommitHash


[docs] class FileStatusExtension(Enum): """ Enum to abstract the status of a file. Specific report files can map these to their own specific representation. """ value: tp.Tuple[str, Color] # pylint: disable=invalid-name SUCCESS = ("success", colors.green) PARTIAL = ("partial", colors.darkturquoise) INCOMPLETE = ("incomplete", colors.orangered1) FAILED = ("failed", colors.lightred) COMPILE_ERROR = ("cerror", colors.red) MISSING = ("###", colors.yellow3a) BLOCKED = ("blocked", colors.blue)
[docs] def get_status_extension(self) -> str: """Returns the corresponding file ending to the status.""" return str(self.value[0])
[docs] def nice_name(self) -> str: """Returns a nicely formatted name.""" if self == FileStatusExtension.COMPILE_ERROR: return "CompileError" return self.name.lower().capitalize()
@property def status_color(self) -> Color: """Returns the corresponding color to the status.""" return self.value[1]
[docs] def get_colored_status(self) -> str: """Returns the corresponding file status, colored in the specific status color.""" return tp.cast(str, self.status_color[self.nice_name()])
[docs] def num_color_characters(self) -> int: """Returns the number of non printable color characters.""" return len(self.status_color[''])
[docs] @staticmethod def get_physical_file_statuses() -> tp.Set['FileStatusExtension']: """Returns the set of file status extensions that are associated with real result files.""" return { FileStatusExtension.SUCCESS, FileStatusExtension.FAILED, FileStatusExtension.COMPILE_ERROR }
[docs] @staticmethod def get_virtual_file_statuses() -> tp.Set['FileStatusExtension']: """Returns the set of file status extensions that are not associated with real result files.""" return {FileStatusExtension.MISSING, FileStatusExtension.BLOCKED}
[docs] @staticmethod def get_regex_grp() -> str: """Returns a regex group that can match all file stati.""" regex_grp = r"(?P<status_ext>(" for status in FileStatusExtension: regex_grp += fr"{status.get_status_extension()}" + '|' # Remove the '|' at the end regex_grp = regex_grp[:-1] regex_grp += "))" return regex_grp
[docs] @staticmethod def get_file_status_from_str(status_name: str) -> 'FileStatusExtension': """ Converts the name of a status extensions to the specific enum value. Args: status_name: name of the status extension Returns: FileStatusExtension enum with the specified name Test: >>> FileStatusExtension.get_file_status_from_str('success') <FileStatusExtension.SUCCESS: ('success', <ANSIStyle: Green>)> >>> FileStatusExtension.get_file_status_from_str('SUCCESS') <FileStatusExtension.SUCCESS: ('success', <ANSIStyle: Green>)> >>> FileStatusExtension.get_file_status_from_str('###') <FileStatusExtension.MISSING: ('###', <ANSIStyle: Full: Yellow3A>)> >>> FileStatusExtension.get_file_status_from_str('CompileError') <FileStatusExtension.COMPILE_ERROR: ('cerror', <ANSIStyle: Red>)> """ for fs_enum in FileStatusExtension: if status_name.upper( ) == fs_enum.name or status_name == fs_enum.value[ 0] or status_name == fs_enum.nice_name(): return fs_enum raise ValueError(f"Unknown file status extension name: {status_name}")
[docs] @staticmethod def combine( lhs: 'FileStatusExtension', rhs: 'FileStatusExtension' ) -> 'FileStatusExtension': """ Combines two FileStatusExtension into one. Should no specific combination rule apply, the lhs is used as a default. """ if ( lhs == FileStatusExtension.SUCCESS and rhs != FileStatusExtension.SUCCESS ) or ( rhs == FileStatusExtension.SUCCESS and lhs != FileStatusExtension.SUCCESS ): if FileStatusExtension.PARTIAL in (lhs, rhs): return FileStatusExtension.PARTIAL return FileStatusExtension.INCOMPLETE return lhs
[docs] class ReportFilename(): """ReportFilename wraps special semantics about our report filenames around strings and paths.""" __RESULT_FILE_REGEX = re.compile( r"(?P<experiment_shorthand>.*)-" + r"(?P<report_shorthand>.*)-" + r"(?P<project_name>.*)-(?P<binary_name>.*)-" + r"(?P<file_commit_hash>.*)[_\/](?P<UUID>[0-9a-fA-F\-]*)" r"(_config-(?P<config_id>\d+))?" + "_" + FileStatusExtension.get_regex_grp() + r"?" + r"(?P<file_ext>\..*)?" + "$" ) __RESULT_FILE_TEMPLATE = ( "{experiment_shorthand}-" + "{report_shorthand}-" + "{project_name}-" + "{binary_name}-" + "{project_revision}_" + "{project_uuid}_" + "{status_ext}" + "{file_ext}" ) __CONFIG_SPECIFIC_RESULT_FILE_TEMPLATE = ( "{experiment_shorthand}-" + "{report_shorthand}-" + "{project_name}-" + "{binary_name}-" + "{project_revision}/" + "{project_uuid}" + "_config-{config_id}_" + "{status_ext}" + "{file_ext}" ) def __init__(self, file_name: tp.Union[str, Path]) -> None: self.__filename = str(file_name)
[docs] @staticmethod def construct( filepath: Path, base_folder: tp.Optional[Path] ) -> 'ReportFilename': """ Constructs a `ReportFilename` from a given path and a base folder. The base folder can be omitted should the filepath only contain the file. """ if base_folder: return ReportFilename(filepath.relative_to(base_folder)) return ReportFilename(filepath)
@property def filename(self) -> str: """Literal file name.""" return self.__filename @property def project_name(self) -> str: """Name of the analyzed project.""" if (match := ReportFilename.__RESULT_FILE_REGEX.search(self.filename)): return str(match.group("project_name")) raise ValueError(f'File {self.filename} name was wrongly formatted.') @property def binary_name(self) -> str: """Name of the analyzed binary.""" if (match := ReportFilename.__RESULT_FILE_REGEX.search(self.filename)): return str(match.group("binary_name")) raise ValueError(f'File {self.filename} name was wrongly formatted.')
[docs] def has_status_success(self) -> bool: """ Checks if the file name is a (Success) result file. Returns: True, if the file name is for a success file """ return ReportFilename.result_file_has_status( self.filename, FileStatusExtension.SUCCESS )
[docs] def has_status_failed(self) -> bool: """ Check if the file name is a (Failed) result file. Returns: True, if the file name is for a failed file """ return ReportFilename.result_file_has_status( self.filename, FileStatusExtension.FAILED )
[docs] def has_status_compileerror(self) -> bool: """ Check if the filename is a (CompileError) result file. Returns: True, if the file name is for a compile error file """ return ReportFilename.result_file_has_status( self.filename, FileStatusExtension.COMPILE_ERROR )
[docs] def has_status_missing(self) -> bool: """ Check if the filename is a (Missing) result file. Returns: True, if the file name is for a missing file """ return ReportFilename.result_file_has_status( self.filename, FileStatusExtension.MISSING )
[docs] def has_status_blocked(self) -> bool: """ Check if the filename is a (Blocked) result file. Returns: True, if the file name is for a blocked file """ return ReportFilename.result_file_has_status( self.filename, FileStatusExtension.BLOCKED )
[docs] @staticmethod def result_file_has_status( file_name: str, extension_type: FileStatusExtension ) -> bool: """ Check if the passed file name is of the expected file status. Args: file_name: name of the file to check extension_type: expected file status extension Returns: True, if the file name is for a file with the the specified ``extension_type`` """ if (match := ReportFilename.__RESULT_FILE_REGEX.search(file_name)): return match.group("status_ext") == ( FileStatusExtension.get_status_extension(extension_type) ) return False
[docs] def is_result_file(self) -> bool: """ Check if the file name is formatted like a result file. Returns: True, if the file name is correctly formatted """ match = ReportFilename.__RESULT_FILE_REGEX.search(self.filename) return match is not None
@property def commit_hash(self) -> ShortCommitHash: """ Commit hash of the result file. Returns: the commit hash from a result file name """ if (match := ReportFilename.__RESULT_FILE_REGEX.search(self.filename)): return ShortCommitHash(match.group("file_commit_hash")) raise ValueError(f'File {self.filename} name was wrongly formatted.') @property def experiment_shorthand(self) -> str: """ Experiment shorthand of the result file. Returns: the experiment shorthand from a result file """ if (match := ReportFilename.__RESULT_FILE_REGEX.search(self.filename)): return match.group("experiment_shorthand").split('/')[-1] raise ValueError(f'File {self.filename} name was wrongly formatted.') @property def report_shorthand(self) -> str: """ Report shorthand of the result file. Returns: the report shorthand from a result file """ if (match := ReportFilename.__RESULT_FILE_REGEX.search(self.filename)): return match.group("report_shorthand") raise ValueError(f'File {self.filename} name was wrongly formatted.') @property def file_status(self) -> FileStatusExtension: """ Get the FileStatusExtension from a result file. Returns: the FileStatusExtension of the result file """ if (match := ReportFilename.__RESULT_FILE_REGEX.search(self.filename)): return FileStatusExtension.get_file_status_from_str( match.group("status_ext") ) raise ValueError('File {file_name} name was wrongly formatted.') @property def config_id(self) -> tp.Optional[int]: """ Configuration ID of the result file. A configuartion ID is only present in configuration specific reports, for others, no ID exists. Returns: the configuration ID from a result file """ if (match := ReportFilename.__RESULT_FILE_REGEX.search(self.filename)): config_id_group = match.group("config_id") if config_id_group: return int(config_id_group) return None
[docs] def is_configuration_specific_file(self) -> bool: """ Check if the file name contains configuration specific information. Returns: True, if the file name is configuration specific """ return self.config_id is not None
@property def uuid(self) -> str: """Report UUID of the result file, genereated by BenchBuild during the experiment.""" if (match := ReportFilename.__RESULT_FILE_REGEX.search(self.filename)): return match.group("UUID") raise ValueError(f'File {self.filename} name was wrongly formatted.') @property def file_suffix(self) -> str: """File suffix, commonly known as file ending/type (in the codebase referred to as file_ext).""" if (match := ReportFilename.__RESULT_FILE_REGEX.search(self.filename)): return match.group("file_ext") raise ValueError(f'File {self.filename} name was wrongly formatted.')
[docs] @staticmethod def get_file_name( experiment_shorthand: str, report_shorthand: str, project_name: str, binary_name: str, project_revision: ShortCommitHash, project_uuid: str, extension_type: FileStatusExtension, file_ext: str = ".txt", config_id: tp.Optional[int] = None ) -> 'ReportFilename': """ Generates a filename for a report file out the different parts. Args: experiment_shorthand: unique shorthand of the experiment report_shorthand: unique shorthand of the report project_name: name of the project for which the report was generated binary_name: name of the binary for which the report was generated project_revision: revision of the project, i.e., commit hash project_uuid: benchbuild uuid for the experiment run extension_type: to specify the status of the generated report file_ext: file extension of the report file Returns: name for the report file that can later be uniquely identified """ status_ext = FileStatusExtension.get_status_extension(extension_type) # Add the missing '.' if none was given by the report if file_ext and not file_ext.startswith("."): file_ext = "." + file_ext if config_id is not None: return ReportFilename( ReportFilename.__CONFIG_SPECIFIC_RESULT_FILE_TEMPLATE.format( experiment_shorthand=experiment_shorthand, report_shorthand=report_shorthand, project_name=project_name, binary_name=binary_name, project_revision=project_revision, project_uuid=project_uuid, status_ext=status_ext, config_id=config_id, file_ext=file_ext ) ) return ReportFilename( ReportFilename.__RESULT_FILE_TEMPLATE.format( experiment_shorthand=experiment_shorthand, report_shorthand=report_shorthand, project_name=project_name, binary_name=binary_name, project_revision=project_revision, project_uuid=project_uuid, status_ext=status_ext, file_ext=file_ext ) )
[docs] def with_status(self, new_status: FileStatusExtension) -> 'ReportFilename': """Returns a new report filename, adapted with the new file extension `new_status`.""" return self.get_file_name( self.experiment_shorthand, self.report_shorthand, self.project_name, self.binary_name, self.commit_hash, self.uuid, new_status, self.file_suffix, self.config_id )
def __eq__(self, other: object) -> bool: if isinstance(other, ReportFilename): return self.filename == other.filename return NotImplemented def __str__(self) -> str: return self.filename def __repr__(self) -> str: return self.filename
[docs] class ReportFilepath(): """ReportFilepath combines report filenames with path semantics and presents the file as a full path.""" def __init__( self, base_path: Path, report_filename: ReportFilename ) -> None: self.__base_path = base_path self.__report_filename = report_filename
[docs] @staticmethod def construct(full_filepath: Path, base_folder: Path) -> 'ReportFilepath': """Constructs a `ReportFilepath` from a given full path, ideally a fully qualified path but this is not strictly required, and a base folder.""" return ReportFilepath( base_folder, ReportFilename.construct(full_filepath, base_folder) )
@property def base_path(self) -> Path: return self.__base_path @property def report_filename(self) -> ReportFilename: return self.__report_filename
[docs] def full_path(self) -> Path: return self.base_path / str(self.report_filename)
[docs] def with_status(self, new_status: FileStatusExtension) -> 'ReportFilepath': return ReportFilepath( self.base_path, self.report_filename.with_status(new_status) )
[docs] def stat(self) -> stat_result: return self.full_path().stat()
def __eq__(self, other: object) -> bool: if isinstance(other, ReportFilepath): return self.base_path == other.base_path and self.report_filename == other.report_filename return NotImplemented def __str__(self) -> str: return str(self.full_path()) def __repr__(self) -> str: return str(self)
[docs] class BaseReport(): """Report base class to add general report properties and helper functions.""" REPORT_TYPES: tp.Dict[str, tp.Type['BaseReport']] = {} SHORTHAND: str FILE_TYPE: str def __init__(self, path: Path) -> None: self.__path = path self.__filename = ReportFilename(path) @classmethod def __init_subclass__( cls, shorthand: str, file_type: str, *args: tp.Any, **kwargs: tp.Any ) -> None: super().__init_subclass__(*args, **kwargs) cls.SHORTHAND = shorthand cls.FILE_TYPE = file_type name = cls.__name__ if name not in cls.REPORT_TYPES: cls.REPORT_TYPES[name] = cls
[docs] @staticmethod def lookup_report_type_from_file_name( file_name: str ) -> tp.Optional[tp.Type['BaseReport']]: """ Looks-up the correct report class from a given `file_name`. Args: file_name: of the report file Returns: corresponding report class """ try: shorthand = ReportFilename(file_name).report_shorthand except ValueError: # Return nothing if we cannot correctly identify a shothand for the # specified file name return None return BaseReport.lookup_report_type_by_shorthand(shorthand)
[docs] @staticmethod def lookup_report_type_by_shorthand( shorthand: str ) -> tp.Optional[tp.Type['BaseReport']]: """ Looks-up the correct report class from a given report `shorthand`. Args: shorthand: of the report file Returns: corresponding report class """ try: for report_type in BaseReport.REPORT_TYPES.values(): if getattr(report_type, "SHORTHAND") == shorthand: return report_type except ValueError: return None return None
[docs] @classmethod def get_file_name( cls, experiment_shorthand: str, project_name: str, binary_name: str, project_revision: ShortCommitHash, project_uuid: str, extension_type: FileStatusExtension, config_id: tp.Optional[int] = None ) -> ReportFilename: """ Generates a filename for a report file. Args: experiment_shorthand: unique shorthand of the experiment project_name: name of the project for which the report was generated binary_name: name of the binary for which the report was generated project_revision: version of the analyzed project, i.e., commit hash project_uuid: benchbuild uuid for the experiment run extension_type: to specify the status of the generated report Returns: name for the report file that can later be uniquly identified """ return ReportFilename.get_file_name( experiment_shorthand, cls.SHORTHAND, project_name, binary_name, project_revision, project_uuid, extension_type, cls.FILE_TYPE, config_id )
@property def path(self) -> Path: """Path to the report file.""" return self.__path @property def filename(self) -> ReportFilename: """Filename of the report.""" return self.__filename
[docs] @classmethod def shorthand(cls) -> str: """Shorthand for this report.""" return cls.SHORTHAND
[docs] @classmethod def file_type(cls) -> str: """File type of this report.""" return cls.FILE_TYPE
[docs] @classmethod def is_correct_report_type(cls, file_name: str) -> bool: """ Check if the passed file belongs to this report type. Args: file_name: name of the file to check Returns: True, if the file belongs to this report type """ try: short_hand = ReportFilename(file_name).report_shorthand return short_hand == cls.shorthand() except ValueError: return False
[docs] class ReportSpecification(): """Groups together multiple report types into a specification that can be used, e.g., by experiments, to request multiple reports.""" def __init__(self, *report_types: tp.Type[BaseReport]) -> None: self.__reports_types = list(report_types) @property def report_types(self) -> tp.List[tp.Type[BaseReport]]: """Report types in this report specification.""" return list(self.__reports_types) @property def main_report(self) -> tp.Type[BaseReport]: """Main report of this specification.""" return self.__reports_types[0]
[docs] def in_spec(self, report_type: tp.Type[BaseReport]) -> bool: """Checks if a report type is specified in this spec.""" return report_type in self.report_types
[docs] def get_report_type(self, shorthand: str) -> tp.Type[BaseReport]: """ Look up a report type by its shorthand. Args: shorthand: notation for the report Returns: the report if it is part of this spec """ report_type = BaseReport.lookup_report_type_by_shorthand(shorthand) if report_type and self.in_spec(report_type): return report_type raise LookupError( f"Report corresponding to {shorthand} was not specified." )
def __contains__(self, report_type: tp.Type[BaseReport]) -> bool: return self.in_spec(report_type) def __iter__(self) -> tp.Iterator[tp.Type[BaseReport]]: return iter(self.report_types)
ReportTy = tp.TypeVar('ReportTy', bound=BaseReport) KeyTy = tp.TypeVar('KeyTy')
[docs] class KeyedReportAggregate( BaseReport, tp.Generic[KeyTy, ReportTy], shorthand="Agg", file_type="zip" ): """ Parses and categories multiple reports of the same type stored inside a zip file. The `key_func` is used to divide the parsed reports into different categories/buckets. """ def __init__( self, path: Path, report_type: tp.Type[ReportTy], key_func: tp.Callable[[Path], KeyTy], default_key: tp.Optional[KeyTy] = None ) -> None: super().__init__(path) # Create a temporary directory for extraction and register finalizer, # which will clean it up. self.__tmpdir = TemporaryDirectory() # pylint: disable=R1732 self.__finalizer = weakref.finalize(self, self.__tmpdir.cleanup) # Extract archive and parse reports. if self.path.exists(): shutil.unpack_archive(self.path, self.__tmpdir.name) self.__default_key = default_key self.__reports: tp.Dict[KeyTy, tp.List[ReportTy]] = defaultdict(list) for file in Path(self.__tmpdir.name).iterdir(): self.__reports[key_func(file)].append(report_type(file))
[docs] def remove(self) -> None: self.__finalizer()
@property def removed(self) -> bool: return not self.__finalizer.alive
[docs] def keys(self) -> tp.Collection[KeyTy]: return self.__reports.keys()
[docs] def reports(self, key: tp.Optional[KeyTy] = None) -> tp.List[ReportTy]: """Returns the list of parsed reports.""" if key: return self.__reports[key] if self.__default_key is None: raise AssertionError("No key or default key was provided.") return self.__reports[self.__default_key]
def _key_id(_: Path) -> int: return 0
[docs] class ReportAggregate( KeyedReportAggregate[int, ReportTy], tp.Generic[ReportTy], shorthand="Agg", file_type="zip" ): def __init__(self, path: Path, report_type: tp.Type[ReportTy]) -> None: super().__init__(path, report_type, _key_id, 0)