Source code for varats.paper_mgmt.paper_config_manager

"""Module for interacting and managing paper configs and case studies, e.g.,
this modules provides functionality to visualize the status of case studies or
to package a whole paper config into a zip folder."""

import re
import typing as tp
from collections import defaultdict
from pathlib import Path
from zipfile import ZIP_DEFLATED, ZipFile

from plumbum import colors

import varats.paper_mgmt.paper_config as PC
from varats.mapping.commit_map import create_lazy_commit_map_loader
from varats.paper.case_study import CaseStudy
from varats.paper_mgmt.case_study import (
    get_revisions_status_for_case_study,
    get_newest_result_files_for_case_study,
)
from varats.report.report import FileStatusExtension, MetaReport
from varats.revision.revisions import get_all_revisions_files
from varats.utils.settings import vara_cfg


[docs]def show_status_of_case_studies( report_name: str, filter_regex: str, short_status: bool, sort: bool, print_rev_list: bool, sep_stages: bool, print_legend: bool ) -> None: """ Prints the status of all matching case studies to the console. Args: report_name: name of the report whose files will be considered filter_regex: applied to a ``name_version`` string for filtering the amount of case studies to be shown short_status: print only a short version of the status information sort: sort the output order of the case studies print_rev_list: print a list of revisions for every case study sep_stages: print each stage separeted print_legend: print a legend for the different types """ current_config = PC.get_paper_config() longest_cs_name = 0 output_case_studies = [] for case_study in sorted( current_config.get_all_case_studies(), key=lambda cs: (cs.project_name, cs.version) ): match = re.match( filter_regex, "{name}_{version}".format( name=case_study.project_name, version=case_study.version ) ) if match is not None: output_case_studies.append(case_study) longest_cs_name = max( longest_cs_name, len(case_study.project_name) + len(str(case_study.version)) ) if print_legend: print(get_legend(True)) report_type = MetaReport.REPORT_TYPES[report_name] total_status_occurrences: tp.DefaultDict[FileStatusExtension, tp.Set[str]] = defaultdict(set) for case_study in output_case_studies: if print_rev_list: print(get_revision_list(case_study)) elif short_status: print( get_short_status( case_study, report_type, longest_cs_name, True, total_status_occurrences ) ) else: print( get_status( case_study, report_type, longest_cs_name, sep_stages, sort, True, total_status_occurrences ) ) print(get_total_status(total_status_occurrences, longest_cs_name, True))
[docs]def get_revision_list(case_study: CaseStudy) -> str: """Returns a string with a list of revsion from the case-study, group by case- study stages. Args: case_study: to print revisions for Returns: formated string that lists all revisions """ res_str = "CS: {project}_{version}:\n".format( project=case_study.project_name, version=case_study.version ) for idx, stage in enumerate(case_study.stages): res_str += " Stage {idx}\n".format(idx=idx) for rev in stage.revisions: res_str += " {rev}\n".format(rev=rev) return res_str
[docs]def get_result_files( result_file_type: MetaReport, project_name: str, commit_hash: str, only_newest: bool ) -> tp.List[Path]: """ Returns a list of result files that (partially) match the given commit hash. Args: result_file_type: the type of the result file project_name: target project commit_hash: the commit hash to search result files for only_newest: whether to include all result files, or only the newest; if ``False``, result files for the same revision are sorted descending by the file's mtime Returns: a list of matching result file paths; result files for the same revision are sorted descending by the file's mtime """ def file_name_filter(file_name: str) -> bool: file_commit_hash = MetaReport.get_commit_hash_from_result_file( file_name ) return not file_commit_hash.startswith(commit_hash) return get_all_revisions_files( project_name, result_file_type, file_name_filter, only_newest )
[docs]def get_occurrences( status_occurrences: tp.DefaultDict[FileStatusExtension, tp.Set[str]], use_color: bool = False ) -> str: """ Returns a string with all status occurrences of a case study. Args: status_occurrences: mapping from all occured status to a set of revisions use_color: add color escape sequences for highlighting Returns: a string with all status occurrences of a case study """ status = "" num_succ_rev = len(status_occurrences[FileStatusExtension.Success]) num_rev = sum(map(len, status_occurrences.values())) color = None if use_color: if num_succ_rev == num_rev: color = colors.green elif num_succ_rev == 0: color = colors.red else: color = colors.orange3 if color is not None: status += "(" + color["{processed:3}/{total}".format( processed=num_succ_rev, total=num_rev )] + ") processed " else: status += "(" + "{processed:3}/{total}".format( processed=num_succ_rev, total=num_rev ) + ") processed " status += "[" for file_status in FileStatusExtension: if use_color: status += file_status.status_color[str( len(status_occurrences[file_status]) )] + "/" else: status += str(len(status_occurrences[file_status])) + "/" status = status[:-1] status += "]" return status
[docs]def get_total_status( total_status_occurrences: tp.DefaultDict[FileStatusExtension, tp.Set[str]], longest_cs_name: int, use_color: bool = False ) -> str: """ Returns a status string showing the total amount of occurrences. Args: total_status_occurrences: mapping from all occured status to a set of all revisions (total amount of revisions) longest_cs_name: amount of chars that should be considered for use_color: add color escape sequences for highlighting Returns: a string with all status occurrences of all case studies """ status = "-" * 80 status += "\n" status += "Total: ".ljust(longest_cs_name, ' ') status += get_occurrences(total_status_occurrences, use_color) return status
[docs]def get_short_status( case_study: CaseStudy, result_file_type: MetaReport, longest_cs_name: int, use_color: bool = False, total_status_occurrences: tp.Optional[tp.DefaultDict[FileStatusExtension, tp.Set[str]]] = None ) -> str: """ Return a short string representation that describes the current status of the case study. Args: case_study: to print result_file_type: report type to print longest_cs_name: amount of chars that should be considered for offsetting to allow case study name alignment use_color: add color escape sequences for highlighting total_status_occurrences: mapping from all occured status to a set of all revisions (total amount of revisions) Returns: a short string representation of a case study """ status = "CS: {project}_{version}: ".format( project=case_study.project_name, version=case_study.version ) + "".ljust( longest_cs_name - (len(case_study.project_name) + len(str(case_study.version))), ' ' ) status_occurrences: tp.DefaultDict[FileStatusExtension, tp.Set[str]] = defaultdict(set) for tagged_rev in get_revisions_status_for_case_study( case_study, result_file_type ): status_occurrences[tagged_rev[1]].add(tagged_rev[0]) if total_status_occurrences is not None: for file_status, rev_set in status_occurrences.items(): total_status_occurrences[file_status].update(rev_set) status += get_occurrences(status_occurrences, use_color) return status
[docs]def get_status( case_study: CaseStudy, result_file_type: MetaReport, longest_cs_name: int, sep_stages: bool, sort: bool, use_color: bool = False, total_status_occurrences: tp.Optional[tp.DefaultDict[FileStatusExtension, tp.Set[str]]] = None ) -> str: """ Return a string representation that describes the current status of the case study. Args: case_study: to print the status for result_file_type: report type to print longest_cs_name: amount of chars that should be considered for sep_stages: print each stage separeted sort: sort the output order of the case studies use_color: add color escape sequences for highlighting total_status_occurrences: mapping from all occured status to a set of all revisions (total amount of revisions) Returns: a full string representation of all case studies """ status = get_short_status( case_study, result_file_type, longest_cs_name, use_color, total_status_occurrences ) + "\n" if sort: cmap = create_lazy_commit_map_loader(case_study.project_name)() def rev_time(rev: tp.Tuple[str, FileStatusExtension]) -> int: return cmap.short_time_id(rev[0]) if sep_stages: stages = case_study.stages for stage_num in range(0, case_study.num_stages): status += " Stage {idx}".format(idx=stage_num) stage_name = stages[stage_num].name if stage_name: status += " ({})".format(stage_name) status += "\n" tagged_revs = get_revisions_status_for_case_study( case_study, result_file_type, stage_num ) if sort: tagged_revs = sorted(tagged_revs, key=rev_time, reverse=True) for tagged_rev_state in tagged_revs: status += " {rev} [{status}]\n".format( rev=tagged_rev_state[0], status=tagged_rev_state[1].get_colored_status() ) else: tagged_revs = list( dict.fromkeys( get_revisions_status_for_case_study( case_study, result_file_type ) ) ) if sort: tagged_revs = sorted(tagged_revs, key=rev_time, reverse=True) for tagged_rev_state in tagged_revs: status += " {rev} [{status}]\n".format( rev=tagged_rev_state[0], status=tagged_rev_state[1].get_colored_status() ) return status
[docs]def get_legend(use_color: bool = False) -> str: """ Builds up a complete legend that explains all status numbers and their colors. Args: use_color: add color escape sequences for highlighting Returns: a legend to explain different status """ legend_str = "CS: project_42: (Success / Total) processed [" for file_status in FileStatusExtension: if use_color: legend_str += file_status.status_color[file_status.name] + "/" else: legend_str += file_status.name + "/" legend_str = legend_str[:-1] legend_str += "]\n" return legend_str
[docs]def package_paper_config( output_file: Path, cs_filter_regex: tp.Pattern[str], report_names: tp.List[str] ) -> None: """ Package all files from a paper config into a zip folder. Args: output_file: file to write to cs_filter_regex: applied to a ``name_version`` string for filtering the case studies to be included in the zip archive report_names: list of report names that should be added """ current_config = PC.get_paper_config() result_dir = Path(str(vara_cfg()['result_dir'])) report_types = [ MetaReport.REPORT_TYPES[report_name] for report_name in report_names ] if report_names else list(MetaReport.REPORT_TYPES.values()) files_to_store: tp.Set[Path] = set() for case_study in current_config.get_all_case_studies(): match = re.match( cs_filter_regex, "{name}_{version}".format( name=case_study.project_name, version=case_study.version ) ) if match is not None: for report_type in report_types: files_to_store.update( get_newest_result_files_for_case_study( case_study, result_dir, report_type ) ) case_study_files_to_include: tp.List[Path] = [] for cs_file in current_config.path.iterdir(): match = re.match(cs_filter_regex, cs_file.name) if match is not None: case_study_files_to_include.append(cs_file) vara_root = Path(str(vara_cfg()['config_file'])).parent # TODO(python3.7): add ZipFile(compresslevel=9) with ZipFile(output_file, "w", compression=ZIP_DEFLATED) as pc_zip: for file_path in files_to_store: pc_zip.write(file_path.relative_to(vara_root)) for case_study_file in case_study_files_to_include: pc_zip.write(case_study_file.relative_to(vara_root))