Source code for varats.report.gnu_time_report
"""
Simple report module to create and handle the standard timing output of GNU
time.
Examples to produce a ``TimeReport``:
Commandline usage:
.. code-block:: bash
export REPORT_FILE="Path/To/MyFile"
/usr/bin/time -v -o $REPORT_FILE sleep 2
Experiment code:
.. code-block:: python
from benchbuild.utils.cmd import time, sleep
report_file = "Path/To/MyFile"
command_to_measure = sleep["2"]
time("-v", "-o", f"{report_file}", command_to_measure)
"""
import re
import typing as tp
from datetime import timedelta
from pathlib import Path
from varats.experiment.workload_util import WorkloadSpecificReportAggregate
from varats.report.report import BaseReport, ReportAggregate
[docs]
class TimeReport(BaseReport, shorthand="TR", file_type="txt"):
"""Report class to access GNU time output."""
def __init__(self, path: Path) -> None:
super().__init__(path)
self.__filesystem_io = (-1, -1)
with open(self.path, 'r') as stream:
for line in stream:
line = line.strip()
if line.startswith("Command being timed"):
self.__command_name: str = TimeReport._parse_command(line)
continue
if line.startswith("Maximum resident set size"):
self.__max_resident_size: int = \
TimeReport._parse_max_resident_size(line)
continue
if line.startswith("User time"):
self.__user_time = TimeReport._parse_user_time(line)
continue
if line.startswith("System time"):
self.__system_time = TimeReport._parse_system_time(line)
continue
if line.startswith("Elapsed (wall clock) time"):
self.__wall_clock_time: timedelta = \
TimeReport._parse_wall_clock_time(line)
continue
if line.startswith("Major (requiring I/O) page faults"):
self.__major_page_faults: int = \
TimeReport._parse_major_page_faults(line)
continue
if line.startswith("Minor (reclaiming a frame) page faults"):
self.__minor_page_faults: int = \
TimeReport._parse_minor_page_faults(line)
continue
if line.startswith("Voluntary context switches"):
self.__voluntary_ctx_switches: int = \
TimeReport._parse_voluntary_ctx_switches(line)
continue
if line.startswith("Involuntary context switches"):
self.__involuntary_ctx_switches: int = \
TimeReport._parse_involuntary_ctx_switches(line)
continue
if line.startswith("File system inputs"):
self.__filesystem_io = (
TimeReport._parse_filesystem_io(line),
self.__filesystem_io[1]
)
continue
if line.startswith("File system outputs"):
self.__filesystem_io = (
self.__filesystem_io[0],
TimeReport._parse_filesystem_io(line)
)
continue
@property
def command_name(self) -> str:
"""Name of the command that was executed."""
return self.__command_name
@property
def user_time(self) -> timedelta:
"""Measured user time in seconds."""
return self.__user_time
@property
def system_time(self) -> timedelta:
"""Measured system time in seconds."""
return self.__system_time
@property
def wall_clock_time(self) -> timedelta:
"""Elapsed wall clock time."""
return self.__wall_clock_time
@property
def max_res_size(self) -> int:
"""Maximum resident size."""
return self.__max_resident_size
@property
def major_page_faults(self) -> int:
"""Major page faults (require I/O)."""
return self.__major_page_faults
@property
def minor_page_faults(self) -> int:
"""Minor page faults (reclaim a frame)."""
return self.__minor_page_faults
@property
def filesystem_io(self) -> tp.Tuple[int, int]:
"""
Filesystem inputs/outputs.
Returns: a tuple of (#inputs, #outputs)
"""
return self.__filesystem_io
@property
def voluntary_ctx_switches(self) -> int:
"""Number of voluntary context switches."""
return self.__voluntary_ctx_switches
@property
def involuntary_ctx_switches(self) -> int:
"""Number of involuntary context switches."""
return self.__involuntary_ctx_switches
def __str__(self) -> str:
return repr(self)
def __repr__(self) -> str:
str_repr = f"Command: {self.command_name}\n"
str_repr += f"User time: {self.user_time}\n"
str_repr += f"System time: {self.system_time}\n"
str_repr += f"Elapsed wall clock time: {self.wall_clock_time}\n"
str_repr += f"Max Resident Size (kbytes): {self.max_res_size}\n"
str_repr += \
f"Voluntary context switches: {self.voluntary_ctx_switches}\n"
str_repr += \
f"Involuntary context switches: {self.involuntary_ctx_switches}"
return str_repr
__COMMAND_REGEX = re.compile(r'Command being timed: "(?P<command>.*)"')
__WALL_CLOCK_REGEX = re.compile(r".*\):(?P<time>.*)")
__MAXRES_REGEX = re.compile(
r"Maximum resident set size \((?P<size_type>.*)\): (?P<amount>\d*)"
)
@staticmethod
def _parse_command(line: str) -> str:
"""
>>> TimeReport._parse_command('Command being timed: "echo"')
'echo'
"""
if (match := TimeReport.__COMMAND_REGEX.search(line)):
return str(match.group("command"))
raise WrongTimeReportFormat("Could not parse command: ", line)
@staticmethod
def _parse_user_time(line: str) -> timedelta:
"""
>>> import datetime
>>> TimeReport._parse_user_time("User time (seconds): 42.12")
datetime.timedelta(seconds=42, microseconds=120000)
"""
if line.startswith("User time"):
return timedelta(seconds=float(line.split(":")[1]))
raise WrongTimeReportFormat("Could not parse user time: ", line)
@staticmethod
def _parse_system_time(line: str) -> timedelta:
"""
>>> import datetime
>>> TimeReport._parse_system_time("System time (seconds): 42.12")
datetime.timedelta(seconds=42, microseconds=120000)
"""
if line.startswith("System time"):
return timedelta(seconds=float(line.split(":")[1]))
raise WrongTimeReportFormat("Could not parse system time: ", line)
@staticmethod
def _parse_wall_clock_time(line: str) -> timedelta:
"""
>>> import datetime
>>> TimeReport._parse_wall_clock_time(\
"Elapsed (wall clock) time (h:mm:ss or m:ss): 1:42:1.12")
datetime.timedelta(seconds=6121, microseconds=120000)
>>> import datetime
>>> TimeReport._parse_wall_clock_time(\
"Elapsed (wall clock) time (h:mm:ss or m:ss): 42:1.12")
datetime.timedelta(seconds=2521, microseconds=120000)
"""
if (match := TimeReport.__WALL_CLOCK_REGEX.search(line)):
time_str = str(match.group("time"))
if time_str.count(":") > 1:
time_split = time_str.split(":")
return timedelta(
hours=int(time_split[0]),
minutes=int(time_split[1]),
seconds=float(time_split[2])
)
time_split = time_str.split(":")
return timedelta(
minutes=int(time_split[0]), seconds=float(time_split[1])
)
raise WrongTimeReportFormat("Could not prase wall clock time: ", line)
@staticmethod
def _parse_max_resident_size(line: str) -> int:
"""
>>> TimeReport._parse_max_resident_size(\
"Maximum resident set size (kbytes): 1804")
1804
"""
if (match := TimeReport.__MAXRES_REGEX.search(line)):
if match.group("size_type") != "kbytes":
raise AssertionError(
"Type confusion when parsing GNU time file"
)
return int(match.group("amount"))
raise WrongTimeReportFormat(
"Could not parse max resident set size: ", line
)
@staticmethod
def _parse_major_page_faults(line: str) -> int:
if line.startswith("Major (requiring I/O) page faults"):
return int(line.split(":")[1])
raise WrongTimeReportFormat("Could not parse major page faults: ", line)
@staticmethod
def _parse_minor_page_faults(line: str) -> int:
if line.startswith("Minor (reclaiming a frame) page faults"):
return int(line.split(":")[1])
raise WrongTimeReportFormat("Could not parse minor page faults: ", line)
@staticmethod
def _parse_voluntary_ctx_switches(line: str) -> int:
if line.startswith("Voluntary context switches"):
return int(line.split(":")[1])
raise WrongTimeReportFormat(
"Could not parse voluntary context switches: ", line
)
@staticmethod
def _parse_involuntary_ctx_switches(line: str) -> int:
if line.startswith("Involuntary context switches"):
return int(line.split(":")[1])
raise WrongTimeReportFormat(
"Could not parse involuntary context switches: ", line
)
@staticmethod
def _parse_filesystem_io(line: str) -> int:
if line.startswith("File system "):
return int(line.split(":")[1])
raise WrongTimeReportFormat(
"Could not parse filesystem inputs/outputs: ", line
)
[docs]
class TimeReportAggregate(
ReportAggregate[TimeReport],
shorthand=TimeReport.SHORTHAND + ReportAggregate.SHORTHAND,
file_type=ReportAggregate.FILE_TYPE
):
"""Context Manager for parsing multiple time reports stored inside a zip
file."""
def __init__(self, path: Path) -> None:
super().__init__(path, TimeReport)
self._measurements_wall_clock_time = [
report.wall_clock_time.total_seconds() for report in self.reports()
]
self._measurements_ctx_switches = [
report.voluntary_ctx_switches + report.involuntary_ctx_switches
for report in self.reports()
]
@property
def measurements_wall_clock_time(self) -> tp.List[float]:
"""Wall clock time measurements of all aggregated reports."""
return self._measurements_wall_clock_time
@property
def measurements_ctx_switches(self) -> tp.List[int]:
"""Context switches measurements of all aggregated reports."""
return self._measurements_ctx_switches
@property
def max_resident_sizes(self) -> tp.List[int]:
return [report.max_res_size for report in self.reports()]
@property
def major_page_faults(self) -> tp.List[int]:
return [report.major_page_faults for report in self.reports()]
@property
def minor_page_faults(self) -> tp.List[int]:
return [report.minor_page_faults for report in self.reports()]
@property
def filesystem_io(self) -> tp.List[tp.Tuple[int, int]]:
return [report.filesystem_io for report in self.reports()]
@property
def summary(self) -> str:
import numpy as np # pylint: disable=import-outside-toplevel
return (
f"num_reports = {len(self.reports())}\n"
"mean (std) of wall clock time = "
f"{np.mean(self.measurements_wall_clock_time):.2f}"
f" ({np.std(self.measurements_wall_clock_time):.2f})\n"
"mean (std) of context switches = "
f"{np.mean(self.measurements_ctx_switches):.2f}"
f" ({np.std(self.measurements_ctx_switches):.2f})\n"
)
[docs]
class WLTimeReportAggregate(
WorkloadSpecificReportAggregate[TimeReport],
shorthand="WL" + TimeReport.SHORTHAND + ReportAggregate.SHORTHAND,
file_type=ReportAggregate.FILE_TYPE
):
"""Context Manager for parsing multiple time reports stored inside a zip
file and grouping them based on the workload they belong to."""
def __init__(self, path: Path) -> None:
super().__init__(path, TimeReport)
[docs]
def measurements_wall_clock_time(self,
workload_name: str) -> tp.List[float]:
"""Wall clock time measurements of all aggregated reports."""
return [
report.wall_clock_time.total_seconds()
for report in self.reports(workload_name)
]
[docs]
def measurements_ctx_switches(self, workload_name: str) -> tp.List[int]:
"""Context switches measurements of all aggregated reports."""
return [
report.voluntary_ctx_switches + report.involuntary_ctx_switches
for report in self.reports(workload_name)
]
[docs]
def max_resident_sizes(self, workload_name: str) -> tp.List[int]:
return [report.max_res_size for report in self.reports(workload_name)]
[docs]
def summary(self) -> str:
return (
f"num_reports = {len(self.reports())}\n"
f"num_workloads = {len(self.workload_names())}\n"
)