"""Module for BlameReport, a collection of blame interactions."""
import logging
import typing as tp
from collections import defaultdict
from copy import deepcopy
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
import numpy as np
import pygit2
import yaml
from varats.base.version_header import VersionHeader
from varats.report.report import BaseReport
from varats.utils.git_util import (
map_commits,
CommitRepoPair,
CommitLookupTy,
FullCommitHash,
ShortCommitHash,
UNCOMMITTED_COMMIT_HASH,
)
LOG = logging.getLogger(__name__)
[docs]
class BlameInstInteractions():
"""
An interaction between a base region/taint, attached to an instruction, and
other regions/taints.
For the blame analysis, these taints stem from data flows into the
instruction.
"""
def __init__(
self, base_taint: BlameTaintData,
interacting_taints: tp.List[BlameTaintData], amount: int
) -> None:
self.__base_taint = base_taint
self.__interacting_taints = sorted(interacting_taints)
self.__amount = amount
[docs]
@staticmethod
def create_blame_inst_interactions(
raw_inst_entry: tp.Dict[str, tp.Any]
) -> 'BlameInstInteractions':
"""Creates a :class:`BlameInstInteractions` entry from the corresponding
yaml document section."""
def create_taint_data(
raw_data: tp.Union[str, tp.Dict[str, tp.Any]]
) -> BlameTaintData:
# be backwards compatible with blame report version 4
if isinstance(raw_data, str):
commit_hash, *repo = raw_data.split('-', maxsplit=1)
crp = CommitRepoPair(
FullCommitHash(commit_hash), repo[0] if repo else "Unknown"
)
return BlameTaintData(crp)
return BlameTaintData.create_taint_data(raw_data)
base_taint = create_taint_data(raw_inst_entry['base-hash'])
interacting_taints: tp.List[BlameTaintData] = []
for raw_inst_taint in raw_inst_entry['interacting-hashes']:
interacting_taints.append(create_taint_data(raw_inst_taint))
amount = int(raw_inst_entry['amount'])
return BlameInstInteractions(base_taint, interacting_taints, amount)
@property
def base_taint(self) -> BlameTaintData:
"""Base taint of the analyzed instruction."""
return self.__base_taint
@property
def interacting_taints(self) -> tp.List[BlameTaintData]:
"""List of taints that interact with the base."""
return self.__interacting_taints
@property
def amount(self) -> int:
"""Number of same interactions found in this function."""
return self.__amount
def __str__(self) -> str:
str_representation = f"{self.base_taint} <-(# {self.amount:4})- ["
sep = ""
for interacting_taint in self.interacting_taints:
str_representation += sep + str(interacting_taint)
sep = ", "
str_representation += "]\n"
return str_representation
def __eq__(self, other: tp.Any) -> bool:
if isinstance(other, BlameInstInteractions):
if self.base_taint == other.base_taint:
return self.interacting_taints == other.interacting_taints
return False
def __lt__(self, other: tp.Any) -> bool:
if isinstance(other, BlameInstInteractions):
if self.base_taint != other.base_taint:
return self.base_taint < other.base_taint
return self.interacting_taints < other.interacting_taints
return False
[docs]
class BlameResultFunctionEntry():
"""Collection of all interactions for a specific function."""
def __init__(
self, name: str, demangled_name: str, file_name: tp.Optional[str],
blame_insts: tp.List[BlameInstInteractions], num_instructions: int,
callees: tp.List[str], commits: tp.List[CommitRepoPair]
) -> None:
self.__name = name
self.__demangled_name = demangled_name
self.__file_name = file_name
self.__inst_list = blame_insts
self.__num_instructions = num_instructions
self.__callees = callees
self.__commits = commits
[docs]
@staticmethod
def create_blame_result_function_entry(
name: str, raw_function_entry: tp.Dict[str, tp.Any]
) -> 'BlameResultFunctionEntry':
"""Creates a :class:`BlameResultFunctionEntry` from the corresponding
yaml document section."""
demangled_name = str(raw_function_entry['demangled-name'])
num_instructions = int(raw_function_entry['num-instructions'])
file_name = tp.cast(tp.Optional[str], raw_function_entry.get('file'))
inst_list: tp.List[BlameInstInteractions] = []
for raw_inst_entry in raw_function_entry['insts']:
inst_list.append(
BlameInstInteractions.
create_blame_inst_interactions(raw_inst_entry)
)
callees = [
str(callee) for callee in raw_function_entry.get("callees", [])
]
commits = [
CommitRepoPair(
FullCommitHash(raw_commit["commit"]), raw_commit["repository"]
) for raw_commit in raw_function_entry.get("commits", [])
]
return BlameResultFunctionEntry(
name, demangled_name, file_name, inst_list, num_instructions,
callees, commits
)
@property
def name(self) -> str:
"""
Name of the function.
The name is mangled for C++ code, either with the itanium or windows
mangling schema.
"""
return self.__name
@property
def demangled_name(self) -> str:
"""Demangled name of the function."""
return self.__demangled_name
@property
def file_name(self) -> tp.Optional[str]:
"""Name of file containing the function if available."""
return self.__file_name
@property
def num_instructions(self) -> int:
"""Number of instructions in this function."""
return self.__num_instructions
@property
def interactions(self) -> tp.List[BlameInstInteractions]:
"""List of found instruction blame-interactions."""
return self.__inst_list
@property
def callees(self) -> tp.List[str]:
"""List of functions called by this function."""
return self.__callees
@property
def commits(self) -> tp.List[CommitRepoPair]:
"""List of commits that modified this function."""
return self.__commits
def __str__(self) -> str:
str_representation = f"{self.name} ({self.demangled_name})\n"
for inst in self.__inst_list:
str_representation += f" - {inst}"
return str_representation
def _calc_diff_between_func_entries(
base_func_entry: BlameResultFunctionEntry,
prev_func_entry: BlameResultFunctionEntry
) -> BlameResultFunctionEntry:
diff_interactions: tp.List[BlameInstInteractions] = []
# copy lists to avoid side effects
base_interactions = list(base_func_entry.interactions)
prev_interactions = list(prev_func_entry.interactions)
# num instructions diff
diff_num_instructions = abs(
base_func_entry.num_instructions - prev_func_entry.num_instructions
)
for base_inter in base_interactions:
if base_inter in prev_interactions:
prev_inter_idx = prev_interactions.index(base_inter)
prev_inter = prev_interactions.pop(prev_inter_idx)
# create new blame inst interaction with the absolute difference
# between base and prev
difference = base_inter.amount - prev_inter.amount
if difference != 0:
diff_interactions.append(
BlameInstInteractions(
base_inter.base_taint,
deepcopy(base_inter.interacting_taints), difference
)
)
else:
# append new interaction from base report
diff_interactions.append(deepcopy(base_inter))
# append left over interactions from previous blame report
diff_interactions += prev_interactions
# TODO (se-sic/VaRA#959): consider callgraph info in blame report diff
return BlameResultFunctionEntry(
base_func_entry.name, base_func_entry.demangled_name,
base_func_entry.file_name, diff_interactions, diff_num_instructions, [],
[]
)
[docs]
class BlameReport(BaseReport, shorthand="BR", file_type="yaml"):
"""Full blame report containing all blame interactions."""
def __init__(self, path: Path) -> None:
super().__init__(path)
with open(path, 'r') as stream:
documents = yaml.load_all(stream, Loader=yaml.CLoader)
version_header = VersionHeader(next(documents))
version_header.raise_if_not_type("BlameReport")
version_header.raise_if_version_is_less_than(4)
if version_header.version < 5:
LOG.warning(
"You are using an outdated blame report format "
"that might not be supported in the future."
)
self.__meta_data = BlameReportMetaData \
.create_blame_report_meta_data(next(documents))
self.__function_entries: tp.Dict[str, BlameResultFunctionEntry] = {}
raw_blame_report = next(documents)
self.__blame_taint_scope = BlameTaintScope.from_string(
# be backwards compatible with blame report version 4
raw_blame_report.get('scope', "COMMIT")
)
for raw_func_entry in raw_blame_report['result-map']:
new_function_entry = (
BlameResultFunctionEntry.create_blame_result_function_entry(
raw_func_entry,
raw_blame_report['result-map'][raw_func_entry]
)
)
self.__function_entries[new_function_entry.name
] = new_function_entry
[docs]
def get_blame_result_function_entry(
self, mangled_function_name: str
) -> tp.Optional[BlameResultFunctionEntry]:
"""
Get the result entry for a specific function.
Args:
mangled_function_name: mangled name of the function to look up
"""
return self.__function_entries.get(mangled_function_name, None)
@property
def blame_taint_scope(self) -> BlameTaintScope:
return self.__blame_taint_scope
@property
def function_entries(self) -> tp.ValuesView[BlameResultFunctionEntry]:
"""Iterate over all function entries."""
return self.__function_entries.values()
@property
def head_commit(self) -> ShortCommitHash:
"""The current HEAD commit under which this CommitReport was created."""
return self.filename.commit_hash
@property
def meta_data(self) -> BlameReportMetaData:
"""Access the meta data that was gathered with the ``BlameReport``."""
return self.__meta_data
def __str__(self) -> str:
str_representation = ""
for function in self.__function_entries.values():
str_representation += str(function) + "\n"
return str_representation
[docs]
class BlameReportDiff():
"""Diff class that contains all interactions that changed between two report
revisions."""
def __init__(
self, base_report: BlameReport, prev_report: BlameReport
) -> None:
self.__function_entries: tp.Dict[str, BlameResultFunctionEntry] = {}
self.__base_head = base_report.head_commit
self.__prev_head = prev_report.head_commit
self.__calc_diff_br(base_report, prev_report)
if base_report.blame_taint_scope != prev_report.blame_taint_scope:
raise AssertionError(
"Cannot diff blame reports with different scopes."
)
self.__blame_taint_scope = base_report.blame_taint_scope
@property
def blame_taint_scope(self) -> BlameTaintScope:
return self.__blame_taint_scope
@property
def base_head_commit(self) -> ShortCommitHash:
return self.__base_head
@property
def prev_head_commit(self) -> ShortCommitHash:
return self.__prev_head
@property
def function_entries(self) -> tp.ValuesView[BlameResultFunctionEntry]:
"""Iterate over all function entries in the diff."""
return self.__function_entries.values()
[docs]
def get_blame_result_function_entry(
self, mangled_function_name: str
) -> BlameResultFunctionEntry:
"""
Get the result entry for a specific function in the diff.
Args:
mangled_function_name: mangled name of the function to look up
"""
return self.__function_entries[mangled_function_name]
[docs]
def has_function(self, mangled_function_name: str) -> bool:
return mangled_function_name in self.__function_entries
def __calc_diff_br(
self, base_report: BlameReport, prev_report: BlameReport
) -> None:
function_names = {
base_func_entry.name
for base_func_entry in base_report.function_entries
} | {
prev_func_entry.name
for prev_func_entry in prev_report.function_entries
}
for func_name in function_names:
base_func_entry = None
prev_func_entry = None
try:
base_func_entry = base_report.get_blame_result_function_entry(
func_name
)
except LookupError:
pass
try:
prev_func_entry = prev_report.get_blame_result_function_entry(
func_name
)
except LookupError:
pass
# Only base report has the function
if prev_func_entry is None and base_func_entry is not None:
if base_func_entry.interactions:
self.__function_entries[func_name] = deepcopy(
base_func_entry
)
# Only prev report has the function
elif base_func_entry is None and prev_func_entry is not None:
if prev_func_entry.interactions:
self.__function_entries[func_name] = deepcopy(
prev_func_entry
)
# Both reports have the same function
elif base_func_entry is not None and prev_func_entry is not None:
diff_entry = _calc_diff_between_func_entries(
base_func_entry, prev_func_entry
)
if diff_entry.interactions:
self.__function_entries[func_name] = diff_entry
else:
raise AssertionError(
"The function name should be at least in one of the reports"
)
def __str__(self) -> str:
str_representation = ""
for function in self.__function_entries.values():
str_representation += str(function) + "\n"
return str_representation
ElementTy = tp.TypeVar('ElementTy')
def __count_elements(
report: tp.Union[BlameReport, BlameReportDiff],
get_elements_from_interaction: tp.Callable[[BlameInstInteractions],
tp.Iterable[ElementTy]]
) -> int:
elements: tp.Set[ElementTy] = set()
for func_entry in report.function_entries:
for interaction in func_entry.interactions:
elements.update(get_elements_from_interaction(interaction))
return len(elements)
[docs]
def count_interactions(report: tp.Union[BlameReport, BlameReportDiff]) -> int:
"""
Counts the number of interactions.
Args:
report: the blame report or diff
Returns:
the number of interactions in this report or diff
"""
amount = 0
for func_entry in report.function_entries:
for interaction in func_entry.interactions:
amount += abs(interaction.amount)
return amount
[docs]
def count_interacting_commits(
report: tp.Union[BlameReport, BlameReportDiff],
) -> int:
"""
Counts the number of unique interacting commits.
Args:
report: the blame report or diff
Returns:
the number unique interacting commits in this report or diff
"""
return __count_elements(
report, lambda interaction: interaction.interacting_taints
)
[docs]
def count_interacting_authors(
report: tp.Union[BlameReport, BlameReportDiff],
commit_lookup: CommitLookupTy
) -> int:
"""
Counts the number of unique interacting authors.
Args:
report: the blame report or diff
commit_lookup: function to look up commits
Returns:
the number unique interacting authors in this report or diff
"""
def extract_interacting_authors(
interaction: BlameInstInteractions
) -> tp.Iterable[str]:
return map_commits(
# Issue (se-sic/VaRA#647): improve author uniquifying
lambda c: tp.cast(str, c.author.name),
[btd.commit for btd in interaction.interacting_taints],
commit_lookup
)
return __count_elements(report, extract_interacting_authors)
[docs]
def generate_degree_tuples(
report: tp.Union[BlameReport, BlameReportDiff]
) -> tp.List[tp.Tuple[int, int]]:
"""
Generates a list of tuples (degree, amount) where degree is the interaction
degree of a blame interaction, e.g., the number of incoming interactions,
and amount is the number of times an interaction with this degree was found
in the report.
Args:
report: the blame report
Returns:
list of tuples (degree, amount)
"""
degree_dict: DegreeAmountMappingTy = defaultdict(int)
for func_entry in report.function_entries:
for interaction in func_entry.interactions:
degree = len(interaction.interacting_taints)
degree_dict[degree] += interaction.amount
return list(degree_dict.items())
[docs]
def gen_base_to_inter_commit_repo_pair_mapping(
report: tp.Union[BlameReport, BlameReportDiff]
) -> tp.Dict[BlameTaintData, tp.Dict[BlameTaintData, int]]:
"""
Maps the base CommitRepoPair of a blame interaction to each distinct
interacting CommitRepoPair, which maps to the amount of the interaction.
Args:
report: blame report
Returns:
A mapping from base CommitRepoPairs to a mapping of the corresponding
interacting CommitRepoPairs to their amount.
"""
grouped_interactions: tp.Dict[BlameTaintData, tp.Dict[
BlameTaintData, int]] = defaultdict(lambda: defaultdict(lambda: 0))
for func_entry in report.function_entries:
for interaction in func_entry.interactions:
base_taint = interaction.base_taint
for interacting_taint in interaction.interacting_taints:
grouped_interactions[base_taint][interacting_taint
] += interaction.amount
return grouped_interactions
DegreeAmountMappingTy = tp.Dict[int, int]
[docs]
def generate_lib_dependent_degrees(
report: tp.Union[BlameReport, BlameReportDiff]
) -> tp.Dict[str, tp.Dict[str, tp.List[tp.Tuple[int, int]]]]:
"""
Args:
report: blame report
Returns:
Map of tuples (degree, amount) categorised by their corresponding
library name to their corresponding base library name.
"""
base_inter_lib_degree_amount_mapping: tp.Dict[str, tp.Dict[
str, DegreeAmountMappingTy]] = {}
for func_entry in report.function_entries:
for interaction in func_entry.interactions:
base_repo_name = interaction.base_taint.commit.repository_name
tmp_degree_of_libs: tp.Dict[str, int] = {}
if base_repo_name not in base_inter_lib_degree_amount_mapping:
base_inter_lib_degree_amount_mapping[base_repo_name] = {}
for inter_taint in interaction.interacting_taints:
inter_hash = inter_taint.commit
inter_hash_repo_name = inter_hash.repository_name
if (
inter_hash_repo_name
not in base_inter_lib_degree_amount_mapping[base_repo_name]
):
base_inter_lib_degree_amount_mapping[base_repo_name][
inter_hash_repo_name] = {}
if inter_hash_repo_name not in tmp_degree_of_libs:
tmp_degree_of_libs[inter_hash_repo_name] = 1
else:
tmp_degree_of_libs[inter_hash_repo_name] += 1
for repo_name, degree in tmp_degree_of_libs.items():
if (
degree
not in base_inter_lib_degree_amount_mapping[base_repo_name]
[repo_name]
):
base_inter_lib_degree_amount_mapping[base_repo_name][
repo_name][degree] = 0
base_inter_lib_degree_amount_mapping[base_repo_name][repo_name][
degree] += interaction.amount
# Transform to tuples (degree, amount)
result_dict: tp.Dict[str, tp.Dict[str, tp.List[tp.Tuple[int, int]]]] = {}
for base_name, inter_lib_dict in base_inter_lib_degree_amount_mapping.items(
):
result_dict[base_name] = {}
for inter_lib_name, degree_amount_dict in inter_lib_dict.items():
result_dict[base_name][inter_lib_name] = list(
degree_amount_dict.items()
)
return result_dict
[docs]
def generate_author_degree_tuples(
report: tp.Union[BlameReport, BlameReportDiff],
commit_lookup: CommitLookupTy
) -> tp.List[tp.Tuple[int, int]]:
"""
Generates a list of tuples (author_degree, amount) where author_degree is
the number of unique authors for all blame interaction, e.g., the number of
unique authors of incoming interactions, and amount is the number of times
an interaction with this degree was found in the report.
Args:
report: the blame report
commit_lookup: function to look up commits
Returns:
list of tuples (author_degree, amount)
"""
degree_dict: tp.DefaultDict[int, int] = defaultdict(int)
for func_entry in report.function_entries:
for interaction in func_entry.interactions:
author_list = map_commits(
# Issue (se-sic/VaRA#647): improve author uniquifying
lambda c: tp.cast(str, c.author.name),
[btd.commit for btd in interaction.interacting_taints],
commit_lookup
)
degree = len(set(author_list))
degree_dict[degree] += interaction.amount
return list(degree_dict.items())
[docs]
def generate_time_delta_distribution_tuples(
report: tp.Union[BlameReport, BlameReportDiff],
commit_lookup: CommitLookupTy, bucket_size: int,
aggregate_function: tp.Callable[[tp.Sequence[tp.Union[int, float]]],
tp.Union[int, float]]
) -> tp.List[tp.Tuple[int, int]]:
"""
Generates a list of tuples that represent the distribution of time delta
interactions. The first value in the tuple represents the degree of the time
delta, bucketed according to ``bucket_size``. The second value is the time
delta, aggregated over all interacting commits by the passed
``aggregate_function``.
Args:
report: to analyze
commit_lookup: function to look up commits
bucket_size: size of a time bucket in days
aggregate_function: to aggregate the delta values of all
interacting commits
Returns:
list of (degree, amount) tuples
"""
degree_dict: tp.DefaultDict[int, int] = defaultdict(int)
for func_entry in report.function_entries:
for interaction in func_entry.interactions:
base_crp: CommitRepoPair = interaction.base_taint.commit
if base_crp.commit_hash == UNCOMMITTED_COMMIT_HASH:
continue
base_commit = commit_lookup(interaction.base_taint.commit)
base_c_time = datetime.fromtimestamp(
base_commit.commit_time, timezone.utc
)
def translate_to_time_deltas2(
commit: pygit2.Commit,
base_time: datetime = base_c_time
) -> int:
other_c_time = datetime.fromtimestamp(
commit.commit_time, timezone.utc
)
return abs((base_time - other_c_time).days)
author_list = map_commits(
translate_to_time_deltas2,
[btd.commit for btd in interaction.interacting_taints],
commit_lookup
)
degree = aggregate_function(author_list) if author_list else 0
bucket = round(degree / bucket_size)
degree_dict[bucket] += interaction.amount
return list(degree_dict.items())
[docs]
def generate_avg_time_distribution_tuples(
report: tp.Union[BlameReport, BlameReportDiff],
commit_lookup: CommitLookupTy, bucket_size: int
) -> tp.List[tp.Tuple[int, int]]:
"""
Generates a list of tuples that represent the distribution of average time
delta interactions. The first value in the tuple represents the degree of
the time delta, bucketed according to ``bucket_size``. The second value is
the time delta, averaged over all interacting commits.
Args:
report: to analyze
commit_lookup: function to look up commits
bucket_size: size of a time bucket in days
Returns:
list of (degree, avg_time) tuples
"""
return generate_time_delta_distribution_tuples(
report, commit_lookup, bucket_size, np.average
)
[docs]
def generate_max_time_distribution_tuples(
report: tp.Union[BlameReport, BlameReportDiff],
commit_lookup: CommitLookupTy, bucket_size: int
) -> tp.List[tp.Tuple[int, int]]:
"""
Generates a list of tuples that represent the distribution of maximal time
delta interactions. The first value in the tuple represents the degree of
the time delta, bucketed according to ``bucket_size``. The second value is
the max time delta, i.e., the maximal time distance between the base commit
and one of the all interacting commits.
Args:
report: to analyze
commit_lookup: function to look up commits
bucket_size: size of a time bucket in days
Returns:
list of (degree, max_time) tuples
"""
return generate_time_delta_distribution_tuples(
report, commit_lookup, bucket_size, max
)
[docs]
def generate_in_head_interactions(
report: BlameReport
) -> tp.List[BlameInstInteractions]:
"""
Generate a list of interactions where the base_hash of the interaction is
the same as the HEAD of the report.
Args:
report: BlameReport to get the interactions from
"""
head_interactions = []
for func_entry in report.function_entries:
for interaction in func_entry.interactions:
if interaction.base_taint.commit.commit_hash.startswith(
report.head_commit
):
head_interactions.append(interaction)
continue
return head_interactions
[docs]
def generate_out_head_interactions(
report: BlameReport
) -> tp.List[BlameInstInteractions]:
"""
Generate a list of interactions where one of the interacting hashes is the
same as the HEAD of the report.
Args:
report: BlameReport to get the interactions from
"""
head_interactions = []
for func_entry in report.function_entries:
for interaction in func_entry.interactions:
for interacting_taint in interaction.interacting_taints:
if interacting_taint.commit.commit_hash.startswith(
report.head_commit
):
head_interactions.append(interaction)
break
return head_interactions
[docs]
def get_interacting_commits_for_commit(
report: BlameReport, commit: CommitRepoPair
) -> tp.Tuple[tp.Set[CommitRepoPair], tp.Set[CommitRepoPair]]:
"""
Get all commits a given commits interacts with separated by incoming and
outgoing interactions.
Args:
report: BlameReport to get the interactions from
commit: commit to get the interacting commits for
Returns:
two sets for the interacting commits seperated by incoming and outgoing
interactions
"""
in_commits: tp.Set[CommitRepoPair] = set()
out_commits: tp.Set[CommitRepoPair] = set()
for func_entry in report.function_entries:
for interaction in func_entry.interactions:
interacting_commits = [
taint.commit for taint in interaction.interacting_taints
]
if commit == interaction.base_taint.commit:
out_commits.update(interacting_commits)
if commit in interacting_commits:
in_commits.add(interaction.base_taint.commit)
return in_commits, out_commits