Select Git revision
Forked from
protokollsystem / proto3
Source project has a limited visibility.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
utils.py 8.32 KiB
from flask import request
import random
import string
import math
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from datetime import datetime
import requests
from io import BytesIO
import ipaddress
from socket import getfqdn
from uuid import uuid4
import subprocess
import contextlib
from shared import config
def random_string(length):
return "".join((random.choice(string.ascii_letters)
for i in range(length)))
def is_past(some_date):
return (datetime.now() - some_date).total_seconds() > 0
def encode_kwargs(kwargs):
encoded_kwargs = {}
for key in kwargs:
value = kwargs[key]
if hasattr(value, "id"):
encoded_kwargs[key] = (type(value), value.id, True)
else:
encoded_kwargs[key] = (type(value), value, False)
return encoded_kwargs
def decode_kwargs(encoded_kwargs):
kwargs = {}
for name in encoded_kwargs:
kind, id, from_db = encoded_kwargs[name]
if from_db:
kwargs[name] = kind.query.filter_by(id=id).first()
else:
kwargs[name] = id
return kwargs
class MailManager:
def __init__(self, config):
self.active = getattr(config, "MAIL_ACTIVE", False)
self.from_addr = getattr(config, "MAIL_FROM", "")
self.hostname = getattr(config, "MAIL_HOST", "")
self.username = getattr(config, "MAIL_USER", "")
self.password = getattr(config, "MAIL_PASSWORD", "")
self.use_tls = getattr(config, "MAIL_USE_TLS", True)
self.use_starttls = getattr(config, "MAIL_USE_STARTTLS", False)
def _get_smtp(self):
if self.use_tls:
return smtplib.SMTP_SSL
return smtplib.SMTP
def connect(self):
server = self._get_smtp()(self.hostname)
if self.use_starttls:
server.starttls()
if self.username not in [None, ""] and self.password not in [None, ""]:
server.login(self.username, self.password)
return server
@contextlib.contextmanager
def session(self):
server = self.connect()
yield server
server.quit()
def send(self, to_addr, subject, content, appendix=None, reply_to=None):
if (not self.active
or not self.hostname
or not self.from_addr):
return
msg = MIMEMultipart("mixed")
msg["From"] = self.from_addr
msg["To"] = to_addr
msg["Subject"] = subject
msg["Message-ID"] = "<{}@{}>".format(uuid4(), getfqdn())
if reply_to is not None:
msg["Reply-To"] = reply_to
msg.attach(MIMEText(content, _charset="utf-8"))
if appendix is not None:
for name, file_like in appendix:
part = MIMEApplication(file_like.read(), "octet-stream")
part["Content-Disposition"] = (
'attachment; filename="{}"'.format(name))
msg.attach(part)
with self.session() as server:
server.sendmail(
self.from_addr,
to_addr.split(","),
msg.as_string())
def check(self):
if not self.active:
return True
if not self.hostname or not self.from_addr:
return False
with self.session():
pass
return True
mail_manager = MailManager(config)
def get_first_unused_int(numbers):
positive_numbers = [number for number in numbers if number >= 0]
if len(positive_numbers) == 0:
return 0
highest = max(positive_numbers)
for given, linear in zip(positive_numbers, range(highest + 1)):
if linear < given:
return linear
return highest + 1
def normalize_pad(pad):
return pad.replace(" ", "_")
def get_etherpad_url(pad):
return "{}/p/{}".format(config.ETHERPAD_URL, normalize_pad(pad))
def get_etherpad_export_url(pad):
return "{}/p/{}/export/txt".format(config.ETHERPAD_URL, normalize_pad(pad))
def get_etherpad_import_url(pad):
return "{}/p/{}/import".format(config.ETHERPAD_URL, normalize_pad(pad))
def get_etherpad_text(pad):
req = requests.get(get_etherpad_export_url(pad))
return req.text
def set_etherpad_text(pad, text, only_if_default=True):
if only_if_default:
current_text = get_etherpad_text(pad).strip()
if (current_text != config.EMPTY_ETHERPAD.strip()
and len(current_text) > 0):
return False
file_like = BytesIO(text.encode("utf-8"))
files = {"file": file_like}
url = get_etherpad_import_url(pad)
req = requests.post(url, files=files)
return req.status_code == 200
def split_terms(text, quote_chars="\"'", separators=" \t\n"):
terms = []
in_quote = False
last_quote_char = ""
current_term = ""
for char in text:
if in_quote:
if char != last_quote_char:
current_term += char
else:
in_quote = False
last_quote_char = ""
terms.append(current_term)
current_term = ""
else:
if char in separators:
if len(current_term) > 0:
terms.append(current_term)
current_term = ""
else:
if char in quote_chars and len(current_term) == 0:
in_quote = True
last_quote_char = char
else:
current_term += char
if len(current_term) > 0:
terms.append(current_term)
return terms
def optional_int_arg(name):
try:
return int(request.args.get(name))
except (ValueError, TypeError):
return None
def add_line_numbers(text):
raw_lines = text.splitlines()
linenumber_length = math.ceil(math.log10(len(raw_lines)) + 1)
lines = []
for linenumber, line in enumerate(raw_lines):
lines.append("{} {}".format(
str(linenumber + 1).rjust(linenumber_length),
line
))
return "\n".join(lines)
def get_current_ip():
address = ipaddress.ip_address(request.remote_addr)
if (address == ipaddress.ip_address("127.0.0.1")
and "X-Real-Ip" in request.headers):
address = ipaddress.ip_address(request.headers["X-Real-Ip"])
return address
def check_ip_in_networks(networks_string):
address = get_current_ip()
try:
for network_string in networks_string.split(","):
network = ipaddress.ip_network(network_string.strip())
if address in network:
return True
return False
except ValueError:
return False
def fancy_join(values, sep1=" und ", sep2=", "):
values = list(values)
if len(values) <= 1:
return "".join(values)
last = values[-1]
start = values[:-1]
return "{}{}{}".format(sep2.join(start), sep1, last)
def footnote_hash(text, length=5):
return str(sum(ord(c) * i for i, c in enumerate(text)) % 10**length)
def parse_datetime_from_string(text):
text = text.strip()
for format in ("%d.%m.%Y", "%d.%m.%y", "%Y-%m-%d",
"%d. %B %Y", "%d. %b %Y", "%d. %B %y", "%d. %b %y"):
try:
return datetime.strptime(text, format)
except ValueError:
pass
for format in ("%d.%m.", "%d. %m.", "%d.%m", "%d.%m"):
try:
return datetime.strptime(text, format).replace(
year=datetime.now().year)
except ValueError:
pass
raise ValueError("Date '{}' does not match any known format!".format(text))
def get_git_revision():
try:
gitlab_url = "https://git.fsmpi.rwth-aachen.de/protokollsystem/proto3"
commit_hash = subprocess.check_output(
["git", "log", "-g", "-1", "--pretty=%H"]).decode("UTF-8").strip()
timestamp = int(subprocess.check_output(
["git", "log", "-g", "-1", "--pretty=%at"]).strip())
commit_date = datetime.fromtimestamp(timestamp)
return {"url": gitlab_url, "hash": commit_hash, "date": commit_date}
except subprocess.SubprocessError:
pass
def get_max_page_length_exp(objects):
length = len(objects)
if length > 0:
return math.ceil(math.log10(length))
return 1
def get_internal_filename(protocol, document, filename):
return "{}-{}-{}".format(protocol.id, document.id, filename)