Source code for varats.utils.filesystem_util
"""Utility functions for handling filesystem related tasks."""
import fcntl
import os.path
import typing as tp
from contextlib import contextmanager
from pathlib import Path
[docs]
class FolderAlreadyPresentError(Exception):
"""Exception raised if an operation could not be performed because a folder
was already present, e.g., when creating folders with a script."""
def __init__(self, folder: tp.Union[Path, str]) -> None:
super().__init__(
f"Folder: '{str(folder)}' should be created "
"but was already present."
)
[docs]
@contextmanager
def lock_file(lock_path: Path,
lock_mode: int = fcntl.LOCK_EX) -> tp.Generator[None, None, None]:
# Create directories until lock file if required
os.makedirs(os.path.dirname(lock_path), exist_ok=True)
open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
lock_fd = os.open(lock_path, open_mode)
try:
fcntl.flock(lock_fd, lock_mode)
yield
finally:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
os.close(lock_fd)