Source code for varats.data.cache_helper

"""Utility functions and class to allow easier caching of pandas dataframes and
other data."""
import logging
import pickle
import typing as tp
from pathlib import Path

import networkx as nx
import pandas as pd

from varats.utils.settings import vara_cfg

LOG = logging.getLogger(__name__)

CACHE_ID_COL = 'cache_revision'
CACHE_TIMESTAMP_COL = 'cache_timestamp'
CACHE_COL_TYPES = {CACHE_ID_COL: 'str', CACHE_TIMESTAMP_COL: 'str'}


[docs] def get_data_file_path(data_id: str, project_name: str) -> Path: """ Compose the identifier and project into a file path that points to the corresponding cache file in the cache directory. Args: data_id: identifier or identifier_name of the dataframe project_name: name of the project """ return Path( str(vara_cfg()["data_cache"]) ) / f"{data_id}-{project_name}.csv.gz"
[docs] def load_cached_df_or_none( data_id: str, project_name: str, data_types: tp.Dict[str, str] ) -> tp.Optional[pd.DataFrame]: """ Load cached dataframe from disk, otherwise return None. Args: data_id: identifier or identifier_name of the dataframe project_name: name of the project data_types: dict of columns and types to pass to the dataframe loading """ file_path = get_data_file_path(data_id, project_name) if not file_path.exists(): # fall back to uncompressed file if present for seamless migration # to cache file compression if Path(str(file_path)[:-3]).exists(): file_path = Path(str(file_path)[:-3]) else: return None return pd.read_csv( str(file_path), index_col=0, compression='infer', dtype=data_types )
[docs] def cache_dataframe( data_id: str, project_name: str, dataframe: pd.DataFrame ) -> None: """ Cache a dataframe by persisting it to disk. Args: data_id: identifier or identifier_name of the dataframe project_name: name of the project dataframe: pandas dataframe to store """ file_path = get_data_file_path(data_id, project_name) # TODO: we should add quoting = csv.QUOTE_NONNUMERIC here, # so that int/float casted to str are written as strings in csv dataframe.to_csv(str(file_path), compression='infer')
InDataTy = tp.TypeVar("InDataTy") def __create_cache_entry( create_df_from_report: tp.Callable[[InDataTy], tp.Tuple[pd.DataFrame, str, str]], data: InDataTy ) -> pd.DataFrame: new_df, entry_id, entry_timestamp = create_df_from_report(data) new_df[CACHE_ID_COL] = entry_id new_df[CACHE_TIMESTAMP_COL] = entry_timestamp return new_df
[docs] def build_cached_report_table( data_id: str, project_name: str, data_to_load: tp.List[InDataTy], data_to_drop: tp.List[InDataTy], create_empty_df: tp.Callable[[], pd.DataFrame], create_cache_entry_data: tp.Callable[[InDataTy], tp.Tuple[pd.DataFrame, str, str]], get_entry_id: tp.Callable[[InDataTy], str], get_entry_timestamp: tp.Callable[[InDataTy], str], is_newer_timestamp: tp.Callable[[str, str], bool] ) -> pd.DataFrame: """ Build up an automatically cached dataframe. Args: data_id: graph cache identifier project_name: name of the project to work with data_to_load: list of data items to be loaded data_to_drop: list of data items to be discarded create_empty_df: creates an empty layout of the dataframe create_cache_entry_data: creates a dataframe from a data item get_entry_id: returns a unique identifier for one data item get_entry_timestamp: returns a string with information that can be used to determine which of two data items is newer is_newer_timestamp: checks whether one data item is newer than another based on their timestamps """ # mypy needs this empty_df = create_empty_df() df_types = empty_df.dtypes.to_dict() df_types.update(CACHE_COL_TYPES) optional_cached_df = load_cached_df_or_none(data_id, project_name, df_types) if optional_cached_df is None: cached_df = empty_df cached_df[CACHE_ID_COL] = "" cached_df[CACHE_TIMESTAMP_COL] = "" cached_df = cached_df.astype(df_types) else: cached_df = optional_cached_df def is_missing_file(report_file: InDataTy) -> bool: return not (cached_df[CACHE_ID_COL] == get_entry_id(report_file)).any() def is_newer_file(report_file: InDataTy) -> bool: cached_entry = cached_df[cached_df[CACHE_ID_COL] == get_entry_id(report_file)][CACHE_TIMESTAMP_COL] if len(cached_entry) > 0: return is_newer_timestamp( get_entry_timestamp(report_file), cached_entry.iloc[0] ) # We found no existing entry, so it will never be considered for # updating and does not need to be deleted. return False missing_entries = [ entry for entry in data_to_load if is_missing_file(entry) ] updated_entries = [entry for entry in data_to_load if is_newer_file(entry)] failed_entries = [ get_entry_id(entry) for entry in data_to_drop if is_newer_file(entry) ] new_data_frames = [] for num, data_entry in enumerate(missing_entries): LOG.info( f"Creating missing entry ({(num + 1)}/" f"{len(missing_entries)}): {data_entry}" ) new_data_frames.append( __create_cache_entry(create_cache_entry_data, data_entry) ) new_df = pd.concat([cached_df] + new_data_frames, ignore_index=True, sort=False) new_df.set_index(CACHE_ID_COL, inplace=True) for num, data_entry in enumerate(updated_entries): LOG.info( f"Updating outdated entry " f"({(num + 1)}/{len(updated_entries)}): {data_entry}" ) updated_entry = __create_cache_entry( create_cache_entry_data, data_entry ) updated_entry.set_index(CACHE_ID_COL, inplace=True) new_df.update(updated_entry) new_df.reset_index(inplace=True) if len(failed_entries) > 0: LOG.info(f"Dropping {len(failed_entries)} entries") new_df.drop( new_df[new_df[CACHE_ID_COL].isin(failed_entries)].index, inplace=True ) cache_dataframe(data_id, project_name, new_df) return tp.cast( pd.DataFrame, new_df.loc[:, [ col for col in new_df.columns if col not in [CACHE_ID_COL, CACHE_TIMESTAMP_COL] ]] )
GraphTy = tp.TypeVar("GraphTy", bound=nx.Graph)
[docs] def build_cached_graph( graph_id: str, create_graph: tp.Callable[[], GraphTy] ) -> GraphTy: """ Create an automatically cached networkx graph. Args: graph_id: graph cache identifier create_graph: function that creates the graph Returns: the cached or created graph """ path = Path(str(vara_cfg()["data_cache"])) / f"graph-{graph_id}.gz" if path.exists(): with open(path, "rb") as graph_file: return tp.cast(GraphTy, pickle.load(graph_file)) graph = create_graph() with open(path, "wb") as graph_file: pickle.dump(graph, graph_file, pickle.HIGHEST_PROTOCOL) return graph