Select Git revision
lvmsnapshot.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
lvmsnapshot.py 15.53 KiB
#!/usr/bin/env python3
# create:
# //xfs_freeze -f <parent mountpoint>
# lvcreate -s -n <snapshot name> -pr <parent volume>
# //xfs_freeze -u <parent mountpoint>
# lvchange -ay -K <snapshot volume>
# mount <snapshot device> <snapshot mountpoint> -nouuid,ro
# delete:
# umount <snapshot mountpoint>
# lvchange -an -L <snapshot volume>
# lvremove <snapshot volume>
# naming scheme:
# snapshot name: <parent mountpoint>-snapshot-<yyyy-mm-dd-hh-MM>
# snapshot mountpoint: <base dir>/<parent mountpoint>/<yyyy-mm-dd-hh-MM>
# how many snapshots to keep:
# give number of snapshots for a given period
# sort by period
# mark snapshots to keep, from oldest within period to new
# delete unmarked snapshots
import logging
import os
import re
from datetime import datetime, timedelta
import subprocess as sp
import json
from contextlib import contextmanager
from collections import OrderedDict
class Config:
snapshot_base_dir = "/snapshots"
timestamp_format = "%Y-%m-%d-%H-%M"
period_keys = OrderedDict([
("y", 365 * 86400),
("m", 31 * 86400),
("d", 86400),
("H", 3600),
("M", 60),
])
min_interval = None
volumes = {}
periods = {}
@staticmethod
def load(config_path=None):
import toml
if config_path is None:
config_path = "config.toml"
global_config_path = "/etc/lvm-snapshots.toml"
ENV_VAR = "LVM_SNAPSHOT_CONFIG"
if not os.path.isfile(config_path) and os.path.isfile(global_config_path):
config_path = global_config_path
if ENV_VAR in os.environ:
config_path = os.environ[ENV_VAR]
with open(config_path, "r") as config_file:
return toml.load(config_file)
@staticmethod
def parse(config):
periods = {}
min_interval = None
for volume_conf in config["volume"]:
name = volume_conf["name"]
volume_group = volume_conf["volume_group"]
path = volume_conf.get("path")
volume = Volume(volume_group, name, path)
Config.volumes[volume.get_full_name()] = volume
periods[volume] = []
for raw_period in volume_conf["keep"]:
interval = Period.parse_interval(raw_period["interval"])
if min_interval is None or interval < min_interval:
min_interval = interval
number = int(raw_period["number"])
periods[volume].append(
Period(target_number=number, interval=interval))
Config.min_interval = min_interval
Config.periods = periods
def run_process(command, check=True):
logging.debug("running command {}".format(command))
result = sp.run(command, check=check, stdout=sp.PIPE, stderr=sp.PIPE)
if result.stdout:
logging.info(result.stdout.decode("utf-8").strip())
if result.stderr:
logging.warn(result.stderr.decode("utf-8").strip())
return result
@contextmanager
def xfs_freeze(mountpoint):
freeze_xfs(mountpoint, True)
yield
freeze_xfs(mountpoint, False)
def freeze_xfs(mountpoint, freeze):
command = [
"/usr/sbin/xfs_freeze",
"-f" if freeze else "-u",
mountpoint
]
run_process(command, check=True)
class Volume:
def __init__(self, volume_group, name, snapshot_dir=None):
self.volume_group = volume_group
self.name = name
if snapshot_dir is None:
if self.get_full_name() in Config.volumes:
snapshot_dir = Config.volumes[self.get_full_name()].snapshot_dir
else:
snapshot_dir = os.path.join(Config.snapshot_base_dir, name)
if not os.path.isabs(snapshot_dir):
snapshot_dir = os.path.join(self.get_mountpoint(), snapshot_dir)
self.snapshot_dir = snapshot_dir
def __repr__(self):
return self.get_full_name()
def __eq__(self, other):
return hash(self) == hash(other)
def __hash__(self):
return sum(ord(c) * i for i, c in enumerate(self.get_full_name()))
def get_full_name(self):
return "{}/{}".format(self.volume_group, self.name)
def get_mapper_device(self):
return "/dev/mapper/{}-{}".format(self.volume_group, self.name)
def get_mountpoint(self):
mapper_device = self.get_mapper_device()
logging.debug("searching volume mountpoint for {}".format(mapper_device))
command = [
"/bin/findmnt",
"--source", mapper_device,
"--json"
]
result = sp.check_output(command).decode("utf-8")
data = json.loads(result)
filesystems = data["filesystems"]
if len(filesystems) < 1:
raise Exception("No parent device {} found!".format(mapper_device))
return filesystems[0]["target"]
def get_fstab(self, snapshots):
return "\n".join(snapshot.get_tab_entry() for snapshot in snapshots)
class Snapshot:
def __init__(self, parent_volume, timestamp, active=False):
self.parent_volume = parent_volume
self.timestamp = timestamp
self.active = active
def __repr__(self):
return self.get_name()
def get_timestamp_str(self):
return self.timestamp.strftime(Config.timestamp_format)
def get_name(self):
return "{}-snapshot-{}".format(
self.parent_volume.name,
self.get_timestamp_str())
@staticmethod
def parse_name(name):
parent_name, _, timestamp_str = name.split("-", 2)
timestamp = datetime.strptime(timestamp_str, Config.timestamp_format)
return parent_name, timestamp
def get_full_volume(self):
return "{}/{}".format(self.parent_volume.volume_group, self.get_name())
def get_mountpoint(self):
return os.path.join(
self.parent_volume.snapshot_dir, self.get_timestamp_str()
)
def create_mountpoint(self):
logging.debug("creating mountpoint {}".format(self.get_mountpoint()))
os.makedirs(self.get_mountpoint(), mode=0o755, exist_ok=True)
def find_mountpoint(self):
logging.debug("searching snapshot mountpoint for {}".format(self.get_name()))
command = [
"/bin/findmnt",
"--source", self.get_mapper_device(),
"--json",
]
try:
result = sp.check_output(command).decode("utf-8")
data = json.loads(result)
filesystems = data["filesystems"]
if len(filesystems) < 1:
return None
return filesystems[0]["target"]
except sp.CalledProcessError:
return None
def get_mapper_device(self):
return "/dev/mapper/{}-{}".format(
self.parent_volume.volume_group,
self.get_name().replace("-", "--"))
def get_mount_options(self):
return "ro,noatime,nouuid,norecovery"
def get_tab_entry(self):
return "{} -fstype=xfs,{} :{}".format(
self.get_name(),
self.get_mount_options(),
self.get_mapper_device())
def create(self):
logging.debug("creating snapshot {}".format(self.get_name()))
parent_mountpoint = self.parent_volume.get_mountpoint()
#with xfs_freeze(parent_mountpoint): # not necessary, lvm does this
create_command = [
"/sbin/lvcreate",
"--snapshot",
"--name", self.get_name(),
"--permission", "r",
self.parent_volume.get_full_name()
]
run_process(create_command, check=True)
self.mount()
def mount(self):
logging.debug("mounting snapshot {}".format(self.get_name()))
if self.check_mount():
logging.debug("already mounted")
return
activate_command = [
"/sbin/lvchange",
"--activate", "y",
"--ignoreactivationskip",
self.get_full_volume()
]
run_process(activate_command, check=True)
self.active = True
self.create_mountpoint()
mount_command = [
"/bin/mount",
self.get_mapper_device(),
self.get_mountpoint(),
"-o", self.get_mount_options()
]
run_process(mount_command, check=True)
def check_mount(self):
check_command = [
"/bin/findmnt",
"--source", self.get_mapper_device()
]
try:
sp.check_call(check_command, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
return True
except sp.CalledProcessError:
return False
def remove(self):
logging.debug("removing snapshot {}".format(self.get_name()))
self.unmount()
remove_command = [
"/sbin/lvremove",
self.get_full_volume()
]
run_process(remove_command, check=True)
def unmount(self):
logging.debug("unmounting snapshot {}".format(self.get_name()))
mountpoint = self.find_mountpoint()
if mountpoint is not None:
if self.check_mount():
unmount_command = [
"/bin/umount",
mountpoint
]
run_process(unmount_command, check=True)
if os.path.isdir(mountpoint):
os.rmdir(mountpoint)
deactivate_command = [
"/sbin/lvchange",
"--activate", "n",
"--ignoreactivationskip",
self.get_full_volume()
]
run_process(deactivate_command, check=True)
self.active = False
@staticmethod
def list_snapshots():
list_command = [
"/sbin/lvs",
"-o", "vg_name,lv_name,lv_active,origin,lv_role",
"--reportformat", "json"
]
result = sp.check_output(list_command).decode("utf-8")
data = json.loads(result)
raw_volumes = data["report"][0]["lv"]
parent_name_map = {}
snapshots = {}
raw_snapshots = []
for raw_volume in raw_volumes:
volume_group = raw_volume["vg_name"]
volume_name = raw_volume["lv_name"]
active = raw_volume["lv_active"] == "active"
origin = raw_volume["origin"] or None
roles = raw_volume["lv_role"].split(",")
if "snapshot" in roles:
raw_snapshots.append((volume_group, volume_name, active, origin, roles))
elif "public" in roles:
volume = Volume(volume_group, volume_name)
snapshots[volume] = []
parent_name_map[volume.name] = volume
for (volume_group, volume_name, active, origin, roles) in raw_snapshots:
try:
parent_name, timestamp = Snapshot.parse_name(volume_name)
if parent_name != origin:
raise Exception("Parent volume name not matching: "
"'{}' != '{}'".format(parent_name, origin))
parent = parent_name_map[parent_name]
snapshot = Snapshot(parent, timestamp, active)
snapshots[parent].append(snapshot)
except ValueError:
# a snapshot, but not named like the autosnapshots
# we better keep the hands off them
pass
return snapshots
class Period:
def __init__(self, target_number, interval):
self.target_number = target_number
self.interval = interval
def __repr__(self):
return "Keep {} at interval {}d{}s (since {})".format(
self.target_number, self.interval.days,
self.interval.seconds, self.get_start())
def get_start(self):
return datetime.now() - self.target_number * self.interval
@staticmethod
def build_regex():
parts = [r"(?:(?P<{0}>\d+){0})?".format(key) for key in Config.period_keys]
return "".join(parts)
@staticmethod
def parse_interval(text):
period_keys = Config.period_keys
regex = Period.build_regex()
match = re.fullmatch(regex, text)
if match is None:
raise Exception("Invalid interval config: '{}', "
"needs to match '{}'".format(text, regex))
seconds = 0
groups = match.groupdict()
for key in period_keys:
if groups[key] is not None:
seconds += period_keys[key] * int(groups[key])
return timedelta(seconds=seconds)
def mark_snapshots(snapshots, periods, min_interval):
all_snapshots = set(snapshots)
marked_snapshots = set()
budgets = {period: period.target_number for period in periods}
last_snapshot = {period: None for period in periods}
for snapshot in sorted(snapshots, key=lambda s: s.timestamp):
for period in sorted(periods, key=lambda p: p.interval):
required_distance = period.interval - min_interval/2
if (budgets[period] > 0
and period.get_start() < snapshot.timestamp):
if (last_snapshot[period] is None
or abs(snapshot.timestamp - last_snapshot[period].timestamp) > required_distance):
marked_snapshots.add(snapshot)
last_snapshot[period] = snapshot
budgets[period] -= 1
unmarked_snapshots = all_snapshots - marked_snapshots
return unmarked_snapshots
def list_snapshots():
snapshots = Snapshot.list_snapshots()
for volume in snapshots:
print("volume: {}".format(str(volume)))
for snapshot in snapshots[volume]:
print(" {}".format(str(snapshot)))
def mount_snapshots():
snapshots = Snapshot.list_snapshots()
for volume in snapshots:
for snapshot in snapshots[volume]:
snapshot.mount()
def unmount_snapshots():
snapshots = Snapshot.list_snapshots()
for volume in snapshots:
for snapshot in snapshots[volume]:
snapshot.unmount()
def update_snapshots():
snapshots = Snapshot.list_snapshots()
periods, min_interval = Config.periods, Config.min_interval
for volume in set(periods.keys()) - set(snapshots.keys()):
logging.warn("Warning: Volume {} is configured but does not exist or has no snapshots.".format(volume))
for volume in set(snapshots.keys()) - set(periods.keys()):
logging.warn("Warning: Volume {} does exist but is not configured.".format(volume))
snapshots.pop(volume)
for volume in snapshots:
unmarked_snapshots = mark_snapshots(snapshots[volume], periods[volume], min_interval)
logging.info("removing snapshots {}".format(unmarked_snapshots))
for snapshot in unmarked_snapshots:
snapshot.remove()
logging.info("creating new snapshot")
new_snapshot = Snapshot(volume, datetime.now())
new_snapshot.create()
operations = {
"list": list_snapshots,
"update": update_snapshots,
"mount": mount_snapshots,
"unmount": unmount_snapshots,
}
def main():
import argparse
parser = argparse.ArgumentParser(description="Manage snapshots of thin LVM volumes")
parser.add_argument("command", choices=["list", "update", "mount", "unmount"], default="list", nargs="?")
parser.add_argument("--config", help="path to config file", required=False)
parser.add_argument("--verbose", "-v", action="count", help="do not redirect the command output to /dev/null")
args = parser.parse_args()
loglevels = {
0: logging.ERROR,
1: logging.WARNING,
2: logging.INFO,
3: logging.DEBUG
}
loglevel = loglevels[min(3, max(0, args.verbose or 0))]
logging.basicConfig(level=loglevel)
Config.parse(Config.load(args.config))
operations[args.command]()
if __name__ == "__main__":
main()