Source code for varats.data.data_manager

"""
The DataManager module handles the loading, creation, and caching of data
classes.

With the DataManager in the background, we can load files from multiple
locations within the tool suite, without loading the same file twice. In
addition, this speeds up reloading of files, for example, in interactive plots,
like in jupyter notebooks, where we sometimes re-execute triggers a file load.
"""

import hashlib
import os
import typing as tp
from functools import partial
from multiprocessing import Pool
from pathlib import Path
from threading import Lock

from PyQt5.QtCore import QObject, QRunnable, QThreadPool, pyqtSignal, pyqtSlot

from varats.report.report import BaseReport, ReportFilepath

LoadableTy = tp.TypeVar('LoadableTy', bound=BaseReport)
PathLikeTy = tp.TypeVar('PathLikeTy', Path, ReportFilepath)


[docs] def sha256_checksum(file_path: Path, block_size: int = 65536) -> str: """ Compute sha256 checksum of file. Args: file_path: path to the file block_size: amount of bytes read per cycle Returns: sha256 hash of the file """ sha256 = hashlib.sha256() with open(file_path, "rb") as file_h: for block in iter(lambda: file_h.read(block_size), b''): sha256.update(block) sha256.update(bytes(file_path.name, 'utf-8')) return sha256.hexdigest()
[docs] class FileBlob(tp.Generic[LoadableTy]): """ A FileBlob is a keyed data blob for everything that is loadable from a file and can be converted to a VaRA DataClass. Args: key: identifier for the file file_path: path to the file data: a blob of data in memory """ def __init__(self, key: str, file_path: Path, data: LoadableTy) -> None: self.__key = key self.__file_path = file_path self.__class_object = data @property def key(self) -> str: """The key used as an index to the blob.""" return self.__key @property def file_path(self) -> Path: """File path to the loaded file.""" return self.__file_path @property def data(self) -> LoadableTy: """The loaded DataClass from the file.""" return self.__class_object
[docs] class FileSignal(QObject): """Emit signals after the file was loaded.""" finished = pyqtSignal(object) clean = pyqtSignal()
[docs] class FileLoader(QRunnable): """Manages concurrent file loading in the background of the application.""" def __init__( self, func: tp.Callable[[Path, tp.Type[LoadableTy]], LoadableTy], file_path: Path, class_type: tp.Type[LoadableTy] ) -> None: super().__init__() self.func = func self.file_path = file_path self.class_type = class_type self.signal = FileSignal()
[docs] @pyqtSlot() def run(self) -> None: """Run the file loading method.""" loaded_data_class = self.func(self.file_path, self.class_type) self.signal.finished.emit(loaded_data_class) self.signal.clean.emit()
[docs] class DataManager(): """ Manages data over the lifetime of the tool suite. The DataManager handles the concurrent file loading, creation of DataClasses and caching of loaded files. """ def __init__(self) -> None: self.file_map: tp.Dict[str, FileBlob[tp.Any]] = {} self.thread_pool = QThreadPool() self.loader_lock = Lock() def __load_data_class( self, file_path: Path, DataClassTy: tp.Type[LoadableTy] ) -> LoadableTy: # pylint: disable=invalid-name """Load a DataClass of type <DataClassTy> from a file.""" key = sha256_checksum(file_path) self.loader_lock.acquire() # pylint: disable=consider-using-with if key in self.file_map: return tp.cast(LoadableTy, self.file_map[key].data) self.loader_lock.release() try: new_blob = FileBlob(key, file_path, DataClassTy(file_path)) except Exception as e: raise e self.loader_lock.acquire() # pylint: disable=consider-using-with # unlocking in the happy path is performed by the loading function self.file_map[key] = new_blob return new_blob.data
[docs] def load_data_class( self, file_path: PathLikeTy, DataClassTy: tp.Type[LoadableTy], loaded_callback: tp.Callable[[LoadableTy], None] ) -> None: # pylint: disable=invalid-name """ Load a DataClass of type <DataClassTy> from a file asynchronosly. Args: file_path: to the file DataClassTy: type of the report class to be loaded loaded_callback: that gets called after loading has finished """ if isinstance(file_path, ReportFilepath): py_file_path: Path = file_path.full_path() else: py_file_path = file_path if not os.path.isfile(py_file_path): raise FileNotFoundError worker = FileLoader(self.__load_data_class, py_file_path, DataClassTy) worker.signal.finished.connect(loaded_callback) worker.signal.clean.connect(self._release_lock) self.thread_pool.start(worker)
[docs] def load_data_class_sync( self, file_path: PathLikeTy, DataClassTy: tp.Type[LoadableTy] ) -> LoadableTy: # pylint: disable=invalid-name """ Load a DataClass of type <DataClassTy> from a file synchronosly. Args: file_path: to the file DataClassTy: type of the report class to be loaded Returns: the loaded report file """ if isinstance(file_path, ReportFilepath): py_file_path: Path = file_path.full_path() else: py_file_path = file_path if not os.path.isfile(py_file_path): raise FileNotFoundError loaded_file = self.__load_data_class(py_file_path, DataClassTy) self._release_lock() return loaded_file
[docs] def clean_cache(self) -> None: with self.loader_lock: self.file_map.clear()
def _release_lock(self) -> None: self.loader_lock.release()
def _load_data_class_pool( file_path: Path, report_type: tp.Type[LoadableTy] ) -> LoadableTy: return VDM.load_data_class_sync(file_path, report_type)
[docs] def load_multiple_reports( file_paths: tp.List[Path], report_type: tp.Type[BaseReport] ) -> tp.List[tp.Any]: """ Args: file_paths: list of files to load report_type: type of the report class to be loaded Returns: a list of loaded reports """ loaded_reports = [] with Pool() as process_pool: loaded_reports = process_pool.map( partial(_load_data_class_pool, report_type=report_type), file_paths ) return loaded_reports
VDM = DataManager()