Skip to content
Snippets Groups Projects
Select Git revision
  • 2b9def6bcaa7462039d59f6e621bb1be397decb8
  • master default protected
2 results

server.py

Blame
  • Forked from Video AG Infrastruktur / website
    Source project has a limited visibility.
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    util.py 5.50 KiB
    import json
    import math
    import datetime
    from datetime import datetime as Datetime
    from pathlib import Path
    from types import ModuleType
    from typing import Callable, TypeVar, Iterable
    from hashlib import sha256
    import re
    
    
    JsonTypes = (dict[str, "JsonTypes"] | list["JsonTypes"] | str | int | bool | None)
    
    
    def check_json_type(val):
        if isinstance(val, str) or isinstance(val, int) or isinstance(val, bool) or val is None:
            return True
        elif isinstance(val, list):
            for v in val:
                check_json_type(v)
        elif isinstance(val, dict):
            for key, v in val.items():
                if not isinstance(key, str):
                    raise TypeError("Key of json dict must be str")
                check_json_type(v)
        else:
            raise TypeError(f"Value is not a json value: {val}")
    
    
    ID_STRING_REGEX_NO_LENGTH = "(?=.*[a-z_-])[a-z0-9_-]*"
    ID_STRING_PATTERN_NO_LENGTH = re.compile(ID_STRING_REGEX_NO_LENGTH)
    
    
    def pad_string(val: str, padding: str, length: int):
        return (padding * math.ceil( (length - len(val)) / len(padding) )) + val
    
    
    def format_standard_datetime(dt: Datetime):
        def zero_pad(val: int, length: int):
            return pad_string(str(val), "0", length)
        
        # Python's strftime can't handle milliseconds and is unreliably regarding the length (e.g. sometimes microseconds or
        # years are not padded to full length) https://stackoverflow.com/a/35643540
        # So we just format it ourselves
        dt = dt.astimezone(datetime.UTC)
        return (f"{zero_pad(dt.year, 4)}-{zero_pad(dt.month, 2)}-{zero_pad(dt.day, 2)}"
                f"T{zero_pad(dt.hour, 2)}:{zero_pad(dt.minute, 2)}:{zero_pad(dt.second, 2)}"
                f".{zero_pad(dt.microsecond // 1000, 3)}Z")
    
    
    def parse_standard_datetime(val: str):
        if not val.endswith("Z"):
            raise ValueError("datetime must have Z (UTC) designator")
        val = val[:-1]
        return Datetime.strptime(val + "000", "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=datetime.UTC)
    
    
    # ALWAYS use this method. Everything needs to be in UTC
    def get_standard_datetime_now() -> Datetime:
        return Datetime.now(datetime.UTC)
    
    
    class ArgAttributeObject:
        def __init__(self, **kwargs):
            super().__init__()
            for name, value in kwargs.items():
                setattr(self, name, value)
    
    
    def _raise(exception: Exception):
        raise exception
    
    
    def truncate_string(value: str, limit: int = 50) -> str:
        if len(value) <= limit:
            return value
        if limit < 5:
            raise ValueError("Truncate limit too small")  # pragma: no cover
        first = (limit-3)//2
        second = (limit-3) - first
        return value[:first] + "..." + value[(len(value)-second):len(value)]
    
    
    _T = TypeVar("_T")
    _R = TypeVar("_R")
    
    
    def flat_map(func: Callable[[_T], Iterable[_R]], it: Iterable[_T]) -> Iterable[_R]:
        for e in it:
            for sub_e in func(e):
                yield sub_e
    
    
    def recursive_flat_map(extractor: Callable[[_T], Iterable[_T]], it: Iterable[_T]) -> Iterable[_T]:
        for e in it:
            yield e
            for sub_e in recursive_flat_map(extractor, extractor(e)):
                yield sub_e
    
    
    def recursive_flat_map_single(extractor: Callable[[_T], Iterable[_T]], e: _T) -> Iterable[_T]:
        yield e
        for sub_e in recursive_flat_map(extractor, extractor(e)):
            yield sub_e
    
    
    def dict_get_check_type(dict: dict, key, type: type, default=None):
        if key not in dict:
            if default is not None:
                return default
            raise ValueError(f"Missing value of type '{type}' for key '{key}'")
        val = dict[key]
        if not isinstance(val, type):
            raise TypeError(f"Value for key '{key}' is '{type(val)}' but expected '{type}'")
        return val
    
    
    def alphanum_camel_case_to_snake_case(val: str):
        if not val.isascii():
            raise ValueError(f"Value '{val}' contains non ascii character")
        res = ""
        prev_upper = False
        for i in range(0, len(val)):
            char = val[i]
            if char.islower() or char.isnumeric():
                res += char
                prev_upper = False
                continue
            if not char.isupper():
                raise ValueError(f"Value '{val}' contains invalid character '{char}'")
            next_upper = i+1 < len(val) and val[i+1].isupper()
            if i > 0 and (not prev_upper or not next_upper):
                res += "_"
            res += char.lower()
            prev_upper = True
        return res
    
    
    def hash_sha256(data: str) -> str:
        return sha256(data.encode()).hexdigest()
    
    
    def hash_json_sha256(data: JsonTypes) -> str:
        return sha256(json.dumps(data, sort_keys=True).encode()).hexdigest()
    
    
    def load_config_file(path: Path):
        config_module = ModuleType("config")
        config_module.__file__ = str(path.absolute())
        try:
            exec(compile(path.read_text(), path.name, "exec"), config_module.__dict__)
        except Exception as e:
            raise Exception(f"Exception while loading config file '{path}'") from e
        config = {}
        for key in dir(config_module):
            if key.isupper():
                config[key] = getattr(config_module, key)
        return config
    
    
    def merge_config_into(config: dict, to_merge: dict, overwrite_non_mergeable: bool = False):
        for key, item in to_merge.items():
            if key not in config:
                config[key] = item
            elif isinstance(item, dict) and isinstance(config[key], dict):
                merge_config_into(config[key], item, overwrite_non_mergeable=overwrite_non_mergeable)
            elif overwrite_non_mergeable:
                config[key] = item
            else:
                # Don't leak config values, might contain passwords, etc.
                raise ValueError(f"Can't merge {type(item)} into {type(config[key])} for key {key}")