Select Git revision
Forked from
Video AG Infrastruktur / website
Source project has a limited visibility.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
util.py 5.50 KiB
import json
import math
import datetime
from datetime import datetime as Datetime
from pathlib import Path
from types import ModuleType
from typing import Callable, TypeVar, Iterable
from hashlib import sha256
import re
JsonTypes = (dict[str, "JsonTypes"] | list["JsonTypes"] | str | int | bool | None)
def check_json_type(val):
if isinstance(val, str) or isinstance(val, int) or isinstance(val, bool) or val is None:
return True
elif isinstance(val, list):
for v in val:
check_json_type(v)
elif isinstance(val, dict):
for key, v in val.items():
if not isinstance(key, str):
raise TypeError("Key of json dict must be str")
check_json_type(v)
else:
raise TypeError(f"Value is not a json value: {val}")
ID_STRING_REGEX_NO_LENGTH = "(?=.*[a-z_-])[a-z0-9_-]*"
ID_STRING_PATTERN_NO_LENGTH = re.compile(ID_STRING_REGEX_NO_LENGTH)
def pad_string(val: str, padding: str, length: int):
return (padding * math.ceil( (length - len(val)) / len(padding) )) + val
def format_standard_datetime(dt: Datetime):
def zero_pad(val: int, length: int):
return pad_string(str(val), "0", length)
# Python's strftime can't handle milliseconds and is unreliably regarding the length (e.g. sometimes microseconds or
# years are not padded to full length) https://stackoverflow.com/a/35643540
# So we just format it ourselves
dt = dt.astimezone(datetime.UTC)
return (f"{zero_pad(dt.year, 4)}-{zero_pad(dt.month, 2)}-{zero_pad(dt.day, 2)}"
f"T{zero_pad(dt.hour, 2)}:{zero_pad(dt.minute, 2)}:{zero_pad(dt.second, 2)}"
f".{zero_pad(dt.microsecond // 1000, 3)}Z")
def parse_standard_datetime(val: str):
if not val.endswith("Z"):
raise ValueError("datetime must have Z (UTC) designator")
val = val[:-1]
return Datetime.strptime(val + "000", "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=datetime.UTC)
# ALWAYS use this method. Everything needs to be in UTC
def get_standard_datetime_now() -> Datetime:
return Datetime.now(datetime.UTC)
class ArgAttributeObject:
def __init__(self, **kwargs):
super().__init__()
for name, value in kwargs.items():
setattr(self, name, value)
def _raise(exception: Exception):
raise exception
def truncate_string(value: str, limit: int = 50) -> str:
if len(value) <= limit:
return value
if limit < 5:
raise ValueError("Truncate limit too small") # pragma: no cover
first = (limit-3)//2
second = (limit-3) - first
return value[:first] + "..." + value[(len(value)-second):len(value)]
_T = TypeVar("_T")
_R = TypeVar("_R")
def flat_map(func: Callable[[_T], Iterable[_R]], it: Iterable[_T]) -> Iterable[_R]:
for e in it:
for sub_e in func(e):
yield sub_e
def recursive_flat_map(extractor: Callable[[_T], Iterable[_T]], it: Iterable[_T]) -> Iterable[_T]:
for e in it:
yield e
for sub_e in recursive_flat_map(extractor, extractor(e)):
yield sub_e
def recursive_flat_map_single(extractor: Callable[[_T], Iterable[_T]], e: _T) -> Iterable[_T]:
yield e
for sub_e in recursive_flat_map(extractor, extractor(e)):
yield sub_e
def dict_get_check_type(dict: dict, key, type: type, default=None):
if key not in dict:
if default is not None:
return default
raise ValueError(f"Missing value of type '{type}' for key '{key}'")
val = dict[key]
if not isinstance(val, type):
raise TypeError(f"Value for key '{key}' is '{type(val)}' but expected '{type}'")
return val
def alphanum_camel_case_to_snake_case(val: str):
if not val.isascii():
raise ValueError(f"Value '{val}' contains non ascii character")
res = ""
prev_upper = False
for i in range(0, len(val)):
char = val[i]
if char.islower() or char.isnumeric():
res += char
prev_upper = False
continue
if not char.isupper():
raise ValueError(f"Value '{val}' contains invalid character '{char}'")
next_upper = i+1 < len(val) and val[i+1].isupper()
if i > 0 and (not prev_upper or not next_upper):
res += "_"
res += char.lower()
prev_upper = True
return res
def hash_sha256(data: str) -> str:
return sha256(data.encode()).hexdigest()
def hash_json_sha256(data: JsonTypes) -> str:
return sha256(json.dumps(data, sort_keys=True).encode()).hexdigest()
def load_config_file(path: Path):
config_module = ModuleType("config")
config_module.__file__ = str(path.absolute())
try:
exec(compile(path.read_text(), path.name, "exec"), config_module.__dict__)
except Exception as e:
raise Exception(f"Exception while loading config file '{path}'") from e
config = {}
for key in dir(config_module):
if key.isupper():
config[key] = getattr(config_module, key)
return config
def merge_config_into(config: dict, to_merge: dict, overwrite_non_mergeable: bool = False):
for key, item in to_merge.items():
if key not in config:
config[key] = item
elif isinstance(item, dict) and isinstance(config[key], dict):
merge_config_into(config[key], item, overwrite_non_mergeable=overwrite_non_mergeable)
elif overwrite_non_mergeable:
config[key] = item
else:
# Don't leak config values, might contain passwords, etc.
raise ValueError(f"Can't merge {type(item)} into {type(config[key])} for key {key}")