Skip to content
Snippets Groups Projects
Commit 36de7152 authored by Simon Künzel's avatar Simon Künzel
Browse files

Add error notification mails

parent 6ba7ee60
Branches
Tags
No related merge requests found
......@@ -64,7 +64,7 @@ DB_CONNECTIONS = {
DATABASE = {
"engine": "postgres",
"postgres": {
"host": "host.docker.internal",
"host": "localhost",
"port": 5432,
"user": "videoag",
"password": "videoag",
......@@ -128,6 +128,15 @@ API_DIAGNOSTICS_INTERVAL_SIZE_MINUTES = 30
LIVE_CONFIG_UPDATE_INTERVAL_SECONDS = 60
# Commented out to disable
# MAIL_SMTP_SERVER = "mail.fsmpi.rwth-aachen.de"
MAIL_SMTP_PORT = 587
MAIL_SENDER_ADDR = "videoag-it@lists.fsmpi.rwth-aachen.de"
ERROR_MAIL_RECEIVERS = ["videoag-it@lists.fsmpi.rwth-aachen.de"]
ERROR_MAIL_WINDOW_SIZE_SECONDS = 5 * 60
ERROR_MAIL_MAX_MAILS_IN_WINDOW = 3
# LDAP_HOST = "auth.fsmpi.rwth-aachen.de"
LDAP_PORT = 636
LDAP_GROUPS = ["fachschaft"]
......
......@@ -72,7 +72,7 @@ class LiveConfig:
config: LiveConfig = LiveConfig()
@scheduled_function(api.config["LIVE_CONFIG_UPDATE_INTERVAL_SECONDS"], api.config["LIVE_CONFIG_UPDATE_INTERVAL_SECONDS"])
@SCHEDULER.scheduled_function(api.config["LIVE_CONFIG_UPDATE_INTERVAL_SECONDS"], api.config["LIVE_CONFIG_UPDATE_INTERVAL_SECONDS"])
def _update_live_config():
if _LIVE_CONFIG_PATH is None:
return
......
from api.miscellaneous.util import (DEBUG_ENABLED)
from api.miscellaneous.util import (DEBUG_ENABLED, SCHEDULER, ERROR_NOTIFIER)
from api.miscellaneous.rate_limiter import IntervalRateLimiter, HostBasedCounterRateLimiter, create_configured_host_rate_limiters
from api.miscellaneous.diagnostics import DIAGNOSTICS_TRACKER, DiagnosticsCounter, DiagnosticsDataProvider
from api.miscellaneous.scheduler import scheduled_function
from videoag_common.miscellaneous import *
from abc import ABC, ABCMeta, abstractmethod
from threading import Lock
from api.miscellaneous import SCHEDULER
import api
from api.miscellaneous.scheduler import scheduled_function
class DiagnosticsCounter:
......@@ -91,6 +91,6 @@ if _history_size_min % _interval_size_min != 0: # pragma: no cover
DIAGNOSTICS_TRACKER = DiagnosticsTracker(_history_size_min / _interval_size_min)
@scheduled_function(_interval_size_min * 60, _interval_size_min * 60)
@SCHEDULER.scheduled_function(_interval_size_min * 60, _interval_size_min * 60)
def _diagnostics_tracker_end_interval():
DIAGNOSTICS_TRACKER.end_interval()
import random
import sched
import threading
import time
import traceback
from time import sleep
import api
__SCHEDULER = sched.scheduler()
_DISABLE_SCHEDULER = api.config.get("DISABLE_SCHEDULER", False)
def scheduled_function(delay_sec: int, initial_delay_sec: int or None = None, priority: int = 0):
if initial_delay_sec is None:
initial_delay_sec = random.randint(1, 120)
def decorator(func):
def execute_scheduled():
print(f"Scheduler: Executing {func.__name__}")
start_time = time.time_ns()
try:
func()
except Exception as e:
print(f"Scheduler: An exception occurred during execution of {func.__name__}:")
traceback.print_exception(e)
# TODO notify
total_time = time.time_ns() - start_time
print(f"Scheduler: Execution of {func.__name__} took {(total_time//1000)/1000}ms")
__SCHEDULER.enter(delay_sec, priority, execute_scheduled)
__SCHEDULER.enter(initial_delay_sec, priority, execute_scheduled)
print(f"Scheduler: Registered '{func.__name__}' to be executed every {delay_sec}s (first execution in {initial_delay_sec}s) {'(Scheduler is disabled)' if _DISABLE_SCHEDULER else ''}")
return func
return decorator
def __run_scheduler_thread():
while True:
__SCHEDULER.run()
sleep(5)
if not _DISABLE_SCHEDULER:
threading.Thread(target=__run_scheduler_thread, daemon=True).start()
import api
from videoag_common.miscellaneous import *
DEBUG_ENABLED: bool = api.config.get("DEBUG", False)
print(f"Debug is enabled: {DEBUG_ENABLED}")
def _on_scheduler_error(msg: str, exception: Exception):
ERROR_NOTIFIER.notify(msg, exception)
SCHEDULER = SchedulerCoordinator(
api.config.get("DISABLE_SCHEDULER", False),
_on_scheduler_error
)
MAIL_SENDER = MailSender(
api.config.get("MAIL_SMTP_SERVER", None), # Empty to disable
api.config["MAIL_SMTP_PORT"],
api.config["MAIL_SENDER_ADDR"],
(
(api.config["MAIL_LOGIN_USERNAME"], api.config["MAIL_LOGIN_PASSWORD"])
if "MAIL_LOGIN_USERNAME" in api.config or "MAIL_LOGIN_PASSWORD" in api.config
else None
)
)
ERROR_NOTIFIER = ErrorMailNotifier(
SCHEDULER,
MAIL_SENDER,
api.config["ERROR_MAIL_RECEIVERS"],
api.config["ERROR_MAIL_WINDOW_SIZE_SECONDS"],
api.config["ERROR_MAIL_MAX_MAILS_IN_WINDOW"]
)
import sys
from functools import wraps
import traceback
from typing import Callable
......@@ -90,7 +91,7 @@ def api_request_get_query_boolean(id: str, default: bool or None) -> int or None
@api.app.errorhandler(400)
def _handle_bad_request(e=None):
if _LOG_CLIENT_ERRORS and e is not None:
print(f"A client exception occurred while handling api request '{request.path}':")
print(f"A client exception occurred while handling api request '{request.path}':", file=sys.stderr)
traceback.print_exception(e)
return api_on_error(ERROR_BAD_REQUEST("Bad request"))
......@@ -98,7 +99,7 @@ def _handle_bad_request(e=None):
@api.app.errorhandler(404)
def _handle_not_found(e=None):
if _LOG_CLIENT_ERRORS and e is not None:
print(f"A client exception occurred while handling api request '{request.path}':")
print(f"A client exception occurred while handling api request '{request.path}':", file=sys.stderr)
traceback.print_exception(e)
return api_on_error(ERROR_UNKNOWN_REQUEST_PATH)
......@@ -106,7 +107,7 @@ def _handle_not_found(e=None):
@api.app.errorhandler(405)
def _handle_method_not_allowed(e=None):
if _LOG_CLIENT_ERRORS and e is not None:
print(f"A client exception occurred while handling api request '{request.path}':")
print(f"A client exception occurred while handling api request '{request.path}':", file=sys.stderr)
traceback.print_exception(e)
return api_on_error(ERROR_METHOD_NOT_ALLOWED)
......@@ -115,8 +116,9 @@ def _handle_method_not_allowed(e=None):
@api.app.errorhandler(Exception)
def _handle_internal_server_error(e=None):
if e is not None:
print(f"An exception occurred while handling api request '{truncate_string(request.path, 200)}':")
print(f"An exception occurred while handling api request '{truncate_string(request.path, 200)}':", file=sys.stderr)
traceback.print_exception(e)
ERROR_NOTIFIER.notify(f"An exception occurred while handling api request '{truncate_string(request.path, 200)}':", e)
return api_on_error(ERROR_INTERNAL_SERVER_ERROR)
......@@ -406,8 +408,9 @@ def api_function(track_in_diagnostics: bool = True,
traceback.print_exception(e)
return api_on_error(ERROR_SITE_IS_OVERLOADED)
except Exception as e:
print(f"An exception occurred while handling api request '{truncate_string(request.path, 200)}':")
print(f"An exception occurred while handling api request '{truncate_string(request.path, 200)}':", file=sys.stderr)
traceback.print_exception(e)
ERROR_NOTIFIER.notify(f"An exception occurred while handling api request '{truncate_string(request.path, 200)}':", e)
return api_on_error(ERROR_INTERNAL_SERVER_ERROR)
api_function = ApiFunction(
......
......@@ -13,7 +13,7 @@ from .util import (
alphanum_camel_case_to_snake_case,
hash_sha256,
hash_json_sha256,
load_config_file
load_config_file,
)
from .constants import *
from .errors import (
......@@ -50,5 +50,13 @@ from .json import (
from .json_condition import (
JsonCondition
)
from .scheduler import (
SchedulerCoordinator
)
from .mail import (
MailSender,
ErrorMailNotifier,
)
import json # Override videoag_common.miscellaneous.json which seems to be implicitly imported !?
import email.message
import smtplib
import socket
import sys
import threading
import traceback
from datetime import datetime
from email.mime.text import MIMEText
from .scheduler import SchedulerCoordinator
class MailSender:
def __init__(
self,
smtp_host: str or None,
smtp_port: int,
sender_addr: str,
login_data: tuple[str, str] or None,
):
"""
:param smtp_host: May be none to disable sending mails
"""
super().__init__()
self._smtp_host = smtp_host
self._smtp_port = smtp_port
self._sender_addr = sender_addr
self._login_data = login_data
def is_enabled(self) -> bool:
"""
Returns whether mails can be sent. If this returns False, all attempts to send a mail will throw an exception.
"""
return self._smtp_host is not None
def send_plain_mail(self, subject: str, message: str, receiver_addrs: list[str], sender_name: str or None = None):
mime_msg = MIMEText(message)
mime_msg["Subject"] = subject
mime_msg["From"] = sender_name
mime_msg["To"] = ", ".join(receiver_addrs)
self.send_formatted_mail(mime_msg, receiver_addrs)
def send_formatted_mail(self, message: email.message.Message, receiver_addrs: list[str] or None = None):
"""
:param message: 'From' field must NOT contain an email! May contain a name for the sender, but the actual
sender's mail address will always be appended.
:param receiver_addrs: List of receivers for the mail. Is inferred from message["To"] if missing.
Throws an exception if the mail cannot be sent
"""
if not self.is_enabled():
raise Exception("Mails are disabled")
if not message["From"]:
message.replace_header("From", self._sender_addr)
else:
message.replace_header("From", f"{message["From"]} <{self._sender_addr}>")
mail_server = smtplib.SMTP(self._smtp_host, self._smtp_port)
mail_server.starttls()
if self._login_data is not None:
mail_server.login(self._login_data[0], self._login_data[1])
mail_server.send_message(message, from_addr=self._sender_addr, to_addrs=receiver_addrs)
mail_server.quit()
class ErrorMailNotifier:
def __init__(self,
scheduler: SchedulerCoordinator,
sender: MailSender,
receiver_addr: list[str],
bulk_mail_window_size_seconds: int,
max_mails_in_window: int):
super().__init__()
self._sender = sender
self._receiver_addrs = receiver_addr
self._max_mails_in_window = max_mails_in_window
self._lock = threading.Lock()
self._used_last_bulk_mail = False
self._mails_in_window = 0
self._bulk_queue: list[tuple[datetime, str]] = []
scheduler.scheduled_function(
delay_sec=bulk_mail_window_size_seconds,
on_error=lambda msg, e: None, # Prevent notification loop if sending the mail fails
)(self._update_bulk_mail_window)
def _update_bulk_mail_window(self):
with self._lock:
print(self._bulk_queue)
if len(self._bulk_queue) != 0:
self._send_bulk_mail(self._bulk_queue)
self._bulk_queue = []
self._used_last_bulk_mail = True
else:
self._used_last_bulk_mail = False
self._mails_in_window = 0
def _send_bulk_mail(self, messages: list[tuple[datetime, str]]):
bulk_msg = f"{len(messages)} errors occurred between {messages[0][0]} and {messages[-1][0]}:\n\n"
seen_messages: dict[str, int] = {}
i = -1
for timestamp, msg in messages:
i += 1
bulk_msg += f"[{timestamp}][#{i}] "
if msg in seen_messages:
bulk_msg += f"See #{seen_messages[msg]}\n"
else:
seen_messages[msg] = i
bulk_msg += msg + "\n\n"
self._send_mail(bulk_msg)
def _append_to_bulk_queue(self, msg: str):
self._bulk_queue.append((datetime.now(), msg))
def notify(self, message: str, exception: Exception or None = None):
if exception is not None:
message += "\n" + "".join(traceback.format_exception(exception))
with self._lock:
if self._used_last_bulk_mail or self._mails_in_window >= self._max_mails_in_window:
self._append_to_bulk_queue(message)
return
self._mails_in_window += 1
try:
self._send_mail(message)
except Exception as e:
print(f"An exception occurred while sending error email: ", file=sys.stderr)
traceback.print_exception(e)
def _send_mail(self, message: str):
if not self._sender.is_enabled():
return
message = f"""\
Hostname: {socket.gethostname()}
{message}
"""
self._sender.send_plain_mail(
"Video AG Backend Error",
message,
self._receiver_addrs,
str(socket.gethostname())
)
import random
import sched
import sys
import threading
import time
import traceback
from time import sleep
from typing import Callable
class SchedulerCoordinator:
def __init__(self, disable: bool, on_error: Callable[[str, Exception], None]):
super().__init__()
self._on_error = on_error
if disable:
self._scheduler = None
else:
self._scheduler = sched.scheduler()
threading.Thread(target=self._run_scheduler_thread, daemon=True).start()
def _run_scheduler_thread(self):
while True:
self._scheduler.run()
sleep(5)
def scheduled_function(self,
delay_sec: int,
initial_delay_sec: int or None = None,
priority: int = 0,
on_error: Callable[[str, Exception], None] or None = None,
):
if initial_delay_sec is None:
initial_delay_sec = random.randint(1, delay_sec)
def decorator(func):
if self._scheduler is None:
# Scheduler is disabled
return func
def execute_scheduled():
print(f"Scheduler: Executing {func.__name__}")
start_time = time.time_ns()
try:
func()
except Exception as e:
msg = f"Scheduler: An exception occurred during execution of {func.__name__}:"
print(msg, file=sys.stderr)
traceback.print_exception(e)
(on_error or self._on_error)(msg, e)
total_time = time.time_ns() - start_time
print(f"Scheduler: Execution of {func.__name__} took {(total_time//1000)/1000}ms")
self._scheduler.enter(delay_sec, priority, execute_scheduled)
self._scheduler.enter(initial_delay_sec, priority, execute_scheduled)
print(f"Scheduler: Registered '{func.__name__}' to be executed every {delay_sec}s (first execution in"
f" {initial_delay_sec}s) {'(Scheduler is disabled)' if self._scheduler is None else ''}")
return func
return decorator
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment