Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
lvmsnapshot.py 6.48 KiB
#!/usr/bin/env python3
# create:
# xfs_freeze -f <parent mountpoint>
# lvcreate -s -n <snapshot name> -pr <parent volume>
# xfs_freeze -u <parent mountpoint>
# lvchange -ay -K <snapshot volume>
# mount <snapshot device> <snapshot mountpoint> -nouuid,ro
# delete:
# umount <snapshot mountpoint>
# lvchange -an -L <snapshot volume>
# lvremove <snapshot volume>
# naming scheme:
# snapshot name: <parent mountpoint>-snapshot-<yyyy-mm-dd-hh-MM>
# snapshot mountpoint: <base dir>/<parent mountpoint>/<yyyy-mm-dd-hh-MM>
# how many snapshots to keep:
# give number of snapshots for a given period
# sort by period
# mark snapshots to keep, from oldest within period to new
# delete unmarked snapshots
# periods are cumulative
import os
import re
from datetime import datetime
import subprocess as sp
import json
from contextlib import contextmanager
SNAPSHOT_BASE_DIR = "/snapshots"
TIMESTAMP_FORMAT = "%Y-%m-%d-%H-%M"
@contextmanager
def xfs_freeze(mountpoint):
freeze_xfs(True)
yield
freeze_xfs(False)
def freeze_xfs(mountpoint, freeze):
command = [
"/usr/sbin/xfs_freeze",
"-f" if freeze else "-u",
mountpoint
]
sp.run(command, check=True)
class Volume:
def __init__(self, volume_group, name):
self.volume_group = volume_group
self.name = name
def get_full_name(self):
return "{}/{}".format(self.volume_group, self.name)
def get_mapper_device(self):
return "/dev/mapper/{}-{}".format(self.volume_group, self.name)
def get_mountpoint(self):
mapper_device = self.get_mapper_device()
command = [
"/bin/findmnt",
"--source", mapper_device,
"--json"
]
result = sp.check_output(command)
data = json.loads(result)
filesystems = data["filesystems"]
if len(filesystems) < 1:
raise Exception("No parent device {} found!".format(mapper_device))
return filesystems[0]["target"]
class Snapshot:
def __init__(self, parent_volume, timestamp, active=False):
self.parent_volume = parent_volume
self.timestamp = timestamp
self.active = active
def get_timestamp_str(self):
return self.timestamp.strftime(TIMESTAMP_FORMAT)
def get_name(self):
return "{}-snapshot-{}".format(
self.parent_volume.name,
self.get_timestamp_str())
@staticmethod
def parse_name(name):
parent_name, _, timestamp_str = name.split("-", 2)
timestamp = datetime.strptime(timestamp_str, TIMESTAMP_FORMAT)
return parent_name, timestamp
def get_full_volume(self):
return "{}/{}".format(self.volume_group, self.get_name())
def get_mountpoint(self):
return os.path.join([
SNAPSHOT_BASE_DIR, self.parent_volume.name, self.get_timestamp_str()
])
def get_mapper_device(self):
return "/dev/mapper/{}-{}".format(self.volume_group, self.get_name())
def create(self):
parent_mountpoint = self.parent_volume.get_mountpoint()
with xfs_freeze(mountpoint):
create_command = [
"/sbin/lvcreate",
"--snapshot",
"--name", self.get_name(),
"--permission", "r",
self.parent_volume.get_full_name()
]
sp.run(create_command, check=True)
self.mount()
def mount(self):
activate_command = [
"/sbin/lvchange",
"--activate", "y",
"--ignoreactivationskip",
self.get_full_volume()
]
sp.run(activate_command, check=True)
self.active = True
mount_command = [
"/bin/mount",
self.get_mapper_device(),
self.get_mountpoint(),
"-onouuid,ro" # nouuid is necessary for xfs snapshots
]
sp.run(mount_command, check=True)
def remove(self):
self.unmount()
remove_command = [
"/sbin/lvremove",
self.get_full_volume()
]
sp.run(remove_command, check=True)
def unmount(self):
unmount_command = [
"/bin/umount",
self.get_mountpoint()
]
sp.run(unmount_command, check=True)
deactivate_command = [
"/sbin/lvchange",
"--activate", "n",
"--ignoreactivationskip",
self.get_full_volume()
]
sp.run(deactivate_command, check=True)
self.active = False
@staticmethod
def list_snapshots():
list_command = [
"/sbin/lvs",
"-o", "vg_name,lv_name,lv_active,origin,lv_role",
"--reportformat", "json"
]
result = sp.check_output(list_command)
data = json.loads(result)
raw_volumes = data["report"][0]["lv"]
parent_name_map = {}
snapshots = {}
raw_snapshots = []
for raw_volume in raw_volumes:
volume_group = raw_volume["vg_name"]
volume_name = raw_volume["lv_name"]
active = raw_volume["lv_active"] == "active"
origin = raw_volume["origin"] or None
roles = raw_volume["lv_role"].split(",")
if "snapshot" in roles:
raw_snapshots.append((volume_group, volume_name, active, origin, roles))
elif "public" in roles:
volume = Volume(volume_group, volume_name)
snapshots[volume] = []
parent_name_map[volume.name] = volume
else:
print("Ignoring volume {}/{}".format(volume_group, volume_name)) # todo: remove this output
for (volume_group, volume_name, active, origin, roles) in raw_snapshots:
parent_name, timestamp = Snapshot.parse_name(volume_name)
if parent_name != origin:
raise Exception("Parent volume name not matching: '{}' != '{}'".format(parent_name, origin))
parent = parent_name_map[parent_name]
snapshot = Snapshot(parent, timestamp, active)
snapshots[parent].append(snapshot)
return snapshots
def load_config():
import sys
import toml
config_path = "config.toml"
ENV_VAR = "LVM_SNAPSHOT_CONFIG"
if ENV_VAR in os.environ:
config_path = os.environ[ENV_VAR]
if len(sys.argv) > 1:
config_path = sys.argv[1]
with open(config_path, "r") as config_file:
return toml.load(config_file)
def main():
config = load_config()
print(config)
snapshots = Snapshot.list_snapshots()
if __name__ == "__main__":
main()