utils.py 8.32 KB
Newer Older
1
from flask import request
2 3 4

import random
import string
5
import math
6 7 8
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
9
from email.mime.application import MIMEApplication
10
from datetime import datetime
Robin Sonnabend's avatar
Robin Sonnabend committed
11 12
import requests
from io import BytesIO
13
import ipaddress
14 15
from socket import getfqdn
from uuid import uuid4
16
import subprocess
17
import contextlib
18

19
from shared import config
20

21

22
def random_string(length):
23 24 25
    return "".join((random.choice(string.ascii_letters)
                    for i in range(length)))

26 27 28 29

def is_past(some_date):
    return (datetime.now() - some_date).total_seconds() > 0

30

31 32 33 34 35 36 37 38 39 40
def encode_kwargs(kwargs):
    encoded_kwargs = {}
    for key in kwargs:
        value = kwargs[key]
        if hasattr(value, "id"):
            encoded_kwargs[key] = (type(value), value.id, True)
        else:
            encoded_kwargs[key] = (type(value), value, False)
    return encoded_kwargs

41

42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
def decode_kwargs(encoded_kwargs):
    kwargs = {}
    for name in encoded_kwargs:
        kind, id, from_db = encoded_kwargs[name]
        if from_db:
            kwargs[name] = kind.query.filter_by(id=id).first()
        else:
            kwargs[name] = id
    return kwargs


class MailManager:
    def __init__(self, config):
        self.active = getattr(config, "MAIL_ACTIVE", False)
        self.from_addr = getattr(config, "MAIL_FROM", "")
        self.hostname = getattr(config, "MAIL_HOST", "")
        self.username = getattr(config, "MAIL_USER", "")
        self.password = getattr(config, "MAIL_PASSWORD", "")
60
        self.use_tls = getattr(config, "MAIL_USE_TLS", True)
61
        self.use_starttls = getattr(config, "MAIL_USE_STARTTLS", False)
62

63 64 65 66 67
    def _get_smtp(self):
        if self.use_tls:
            return smtplib.SMTP_SSL
        return smtplib.SMTP

68 69 70 71 72 73 74 75 76 77 78 79 80 81
    def connect(self):
        server = self._get_smtp()(self.hostname)
        if self.use_starttls:
            server.starttls()
        if self.username not in [None, ""] and self.password not in [None, ""]:
            server.login(self.username, self.password)
        return server

    @contextlib.contextmanager
    def session(self):
        server = self.connect()
        yield server
        server.quit()

82
    def send(self, to_addr, subject, content, appendix=None, reply_to=None):
83
        if (not self.active
84 85
                or not self.hostname
                or not self.from_addr):
Robin Sonnabend's avatar
Robin Sonnabend committed
86
            return
87
        msg = MIMEMultipart("mixed")
Robin Sonnabend's avatar
Robin Sonnabend committed
88 89 90
        msg["From"] = self.from_addr
        msg["To"] = to_addr
        msg["Subject"] = subject
91
        msg["Message-ID"] = "<{}@{}>".format(uuid4(), getfqdn())
92 93
        if reply_to is not None:
            msg["Reply-To"] = reply_to
Robin Sonnabend's avatar
Robin Sonnabend committed
94
        msg.attach(MIMEText(content, _charset="utf-8"))
95 96 97
        if appendix is not None:
            for name, file_like in appendix:
                part = MIMEApplication(file_like.read(), "octet-stream")
98 99
                part["Content-Disposition"] = (
                    'attachment; filename="{}"'.format(name))
100
                msg.attach(part)
101 102 103 104 105 106 107 108 109 110 111 112 113 114
        with self.session() as server:
            server.sendmail(
                self.from_addr,
                to_addr.split(","),
                msg.as_string())

    def check(self):
        if not self.active:
            return True
        if not self.hostname or not self.from_addr:
            return False
        with self.session():
            pass
        return True
115

116

117
mail_manager = MailManager(config)
Robin Sonnabend's avatar
Robin Sonnabend committed
118

119

Robin Sonnabend's avatar
Robin Sonnabend committed
120 121 122 123 124
def get_first_unused_int(numbers):
    positive_numbers = [number for number in numbers if number >= 0]
    if len(positive_numbers) == 0:
        return 0
    highest = max(positive_numbers)
125
    for given, linear in zip(positive_numbers, range(highest + 1)):
Robin Sonnabend's avatar
Robin Sonnabend committed
126 127 128
        if linear < given:
            return linear
    return highest + 1
Robin Sonnabend's avatar
Robin Sonnabend committed
129

130

131 132
def normalize_pad(pad):
    return pad.replace(" ", "_")
133 134


Robin Sonnabend's avatar
Robin Sonnabend committed
135
def get_etherpad_url(pad):
136
    return "{}/p/{}".format(config.ETHERPAD_URL, normalize_pad(pad))
137 138


Robin Sonnabend's avatar
Robin Sonnabend committed
139
def get_etherpad_export_url(pad):
140
    return "{}/p/{}/export/txt".format(config.ETHERPAD_URL, normalize_pad(pad))
141 142


Robin Sonnabend's avatar
Robin Sonnabend committed
143
def get_etherpad_import_url(pad):
144
    return "{}/p/{}/import".format(config.ETHERPAD_URL, normalize_pad(pad))
Robin Sonnabend's avatar
Robin Sonnabend committed
145

146

Robin Sonnabend's avatar
Robin Sonnabend committed
147
def get_etherpad_text(pad):
148
    req = requests.get(get_etherpad_export_url(pad))
Robin Sonnabend's avatar
Robin Sonnabend committed
149 150
    return req.text

151

Robin Sonnabend's avatar
Robin Sonnabend committed
152 153
def set_etherpad_text(pad, text, only_if_default=True):
    if only_if_default:
154 155 156
        current_text = get_etherpad_text(pad).strip()
        if (current_text != config.EMPTY_ETHERPAD.strip()
                and len(current_text) > 0):
Robin Sonnabend's avatar
Robin Sonnabend committed
157 158 159
            return False
    file_like = BytesIO(text.encode("utf-8"))
    files = {"file": file_like}
Administrator's avatar
Administrator committed
160 161
    url = get_etherpad_import_url(pad)
    req = requests.post(url, files=files)
Robin Sonnabend's avatar
Robin Sonnabend committed
162
    return req.status_code == 200
163 164


Robin Sonnabend's avatar
Robin Sonnabend committed
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
def split_terms(text, quote_chars="\"'", separators=" \t\n"):
    terms = []
    in_quote = False
    last_quote_char = ""
    current_term = ""
    for char in text:
        if in_quote:
            if char != last_quote_char:
                current_term += char
            else:
                in_quote = False
                last_quote_char = ""
                terms.append(current_term)
                current_term = ""
        else:
            if char in separators:
                if len(current_term) > 0:
                    terms.append(current_term)
                    current_term = ""
            else:
                if char in quote_chars and len(current_term) == 0:
                    in_quote = True
                    last_quote_char = char
                else:
                    current_term += char
    if len(current_term) > 0:
        terms.append(current_term)
    return terms
193

194

195 196 197 198 199
def optional_int_arg(name):
    try:
        return int(request.args.get(name))
    except (ValueError, TypeError):
        return None
200

201

202 203 204 205 206 207
def add_line_numbers(text):
    raw_lines = text.splitlines()
    linenumber_length = math.ceil(math.log10(len(raw_lines)) + 1)
    lines = []
    for linenumber, line in enumerate(raw_lines):
        lines.append("{} {}".format(
208
            str(linenumber + 1).rjust(linenumber_length),
209 210 211
            line
        ))
    return "\n".join(lines)
212

213

214
def get_current_ip():
215
    address = ipaddress.ip_address(request.remote_addr)
216 217
    if (address == ipaddress.ip_address("127.0.0.1")
            and "X-Real-Ip" in request.headers):
218
        address = ipaddress.ip_address(request.headers["X-Real-Ip"])
219 220 221 222 223
    return address


def check_ip_in_networks(networks_string):
    address = get_current_ip()
224 225 226 227 228 229 230 231
    try:
        for network_string in networks_string.split(","):
            network = ipaddress.ip_network(network_string.strip())
            if address in network:
                return True
        return False
    except ValueError:
        return False
Robin Sonnabend's avatar
Robin Sonnabend committed
232

233

Robin Sonnabend's avatar
Robin Sonnabend committed
234 235 236 237 238 239 240
def fancy_join(values, sep1=" und ", sep2=", "):
    values = list(values)
    if len(values) <= 1:
        return "".join(values)
    last = values[-1]
    start = values[:-1]
    return "{}{}{}".format(sep2.join(start), sep1, last)
241

242

243 244
def footnote_hash(text, length=5):
    return str(sum(ord(c) * i for i, c in enumerate(text)) % 10**length)
245

246

247 248 249
def parse_datetime_from_string(text):
    text = text.strip()
    for format in ("%d.%m.%Y", "%d.%m.%y", "%Y-%m-%d",
250
                   "%d. %B %Y", "%d. %b %Y", "%d. %B %y", "%d. %b %y"):
251 252 253 254 255 256
        try:
            return datetime.strptime(text, format)
        except ValueError:
            pass
    for format in ("%d.%m.", "%d. %m.", "%d.%m", "%d.%m"):
        try:
257 258 259 260
            return datetime.strptime(text, format).replace(
                year=datetime.now().year)
        except ValueError:
            pass
261
    raise ValueError("Date '{}' does not match any known format!".format(text))
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285


def get_git_revision():
    try:
        gitlab_url = "https://git.fsmpi.rwth-aachen.de/protokollsystem/proto3"
        commit_hash = subprocess.check_output(
            ["git", "log", "-g", "-1", "--pretty=%H"]).decode("UTF-8").strip()
        timestamp = int(subprocess.check_output(
            ["git", "log", "-g", "-1", "--pretty=%at"]).strip())
        commit_date = datetime.fromtimestamp(timestamp)
        return {"url": gitlab_url, "hash": commit_hash, "date": commit_date}
    except subprocess.SubprocessError:
        pass


def get_max_page_length_exp(objects):
    length = len(objects)
    if length > 0:
        return math.ceil(math.log10(length))
    return 1


def get_internal_filename(protocol, document, filename):
    return "{}-{}-{}".format(protocol.id, document.id, filename)