Skip to content
Snippets Groups Projects
Select Git revision
  • 8bfbd2322e934c5a5b97399151fd68ed1bc26cc6
  • master default protected
2 results

lvmsnapshot.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    lvmsnapshot.py 15.53 KiB
    #!/usr/bin/env python3
    
    # create:
    # //xfs_freeze -f <parent mountpoint>
    # lvcreate -s -n <snapshot name> -pr <parent volume>
    # //xfs_freeze -u <parent mountpoint>
    # lvchange -ay -K <snapshot volume>
    # mount <snapshot device> <snapshot mountpoint> -nouuid,ro
    
    # delete:
    # umount <snapshot mountpoint>
    # lvchange -an -L <snapshot volume>
    # lvremove <snapshot volume>
    
    # naming scheme:
    # snapshot name: <parent mountpoint>-snapshot-<yyyy-mm-dd-hh-MM>
    # snapshot mountpoint: <base dir>/<parent mountpoint>/<yyyy-mm-dd-hh-MM>
    
    # how many snapshots to keep:
    # give number of snapshots for a given period
    # sort by period
    # mark snapshots to keep, from oldest within period to new
    # delete unmarked snapshots
    
    import logging
    import os
    import re
    from datetime import datetime, timedelta
    import subprocess as sp
    import json
    from contextlib import contextmanager
    from collections import OrderedDict
    
    class Config:
        snapshot_base_dir = "/snapshots"
        timestamp_format = "%Y-%m-%d-%H-%M"
        period_keys = OrderedDict([
            ("y", 365 * 86400),
            ("m",  31 * 86400),
            ("d",       86400),
            ("H",        3600),
            ("M",          60),
        ])
        min_interval = None
        volumes = {}
        periods = {}
    
        @staticmethod
        def load(config_path=None):
            import toml
            if config_path is None:
                config_path = "config.toml"
                global_config_path = "/etc/lvm-snapshots.toml"
                ENV_VAR = "LVM_SNAPSHOT_CONFIG"
                if not os.path.isfile(config_path) and os.path.isfile(global_config_path):
                    config_path = global_config_path
                if ENV_VAR in os.environ:
                    config_path = os.environ[ENV_VAR]
            with open(config_path, "r") as config_file:
                return toml.load(config_file)
    
        @staticmethod
        def parse(config):
            periods = {}
            min_interval = None
            for volume_conf in config["volume"]:
                name = volume_conf["name"]
                volume_group = volume_conf["volume_group"]
                path = volume_conf.get("path")
                volume = Volume(volume_group, name, path)
                Config.volumes[volume.get_full_name()] = volume
                periods[volume] = []
                for raw_period in volume_conf["keep"]:
                    interval = Period.parse_interval(raw_period["interval"])
                    if min_interval is None or interval < min_interval:
                        min_interval = interval
                    number = int(raw_period["number"])
                    periods[volume].append(
                        Period(target_number=number, interval=interval))
            Config.min_interval = min_interval
            Config.periods = periods
    
    def run_process(command, check=True):
        logging.debug("running command {}".format(command))
        result = sp.run(command, check=check, stdout=sp.PIPE, stderr=sp.PIPE)
        if result.stdout:
            logging.info(result.stdout.decode("utf-8").strip())
        if result.stderr:
            logging.warn(result.stderr.decode("utf-8").strip())
        return result
    
    @contextmanager
    def xfs_freeze(mountpoint):
        freeze_xfs(mountpoint, True)
        yield
        freeze_xfs(mountpoint, False)
    
    def freeze_xfs(mountpoint, freeze):
        command = [
            "/usr/sbin/xfs_freeze",
            "-f" if freeze else "-u",
            mountpoint
        ]
        run_process(command, check=True)
    
    class Volume:
        def __init__(self, volume_group, name, snapshot_dir=None):
            self.volume_group = volume_group
            self.name = name
            if snapshot_dir is None:
                if self.get_full_name() in Config.volumes:
                    snapshot_dir = Config.volumes[self.get_full_name()].snapshot_dir
                else:
                    snapshot_dir = os.path.join(Config.snapshot_base_dir, name)
            if not os.path.isabs(snapshot_dir):
                snapshot_dir = os.path.join(self.get_mountpoint(), snapshot_dir)
            self.snapshot_dir = snapshot_dir
        
        def __repr__(self):
            return self.get_full_name()
    
        def __eq__(self, other):
            return hash(self) == hash(other)
    
        def __hash__(self):
            return sum(ord(c) * i for i, c in enumerate(self.get_full_name()))
    
        def get_full_name(self):
            return "{}/{}".format(self.volume_group, self.name)
    
        def get_mapper_device(self):
            return "/dev/mapper/{}-{}".format(self.volume_group, self.name)
    
        def get_mountpoint(self):
            mapper_device = self.get_mapper_device()
            logging.debug("searching volume mountpoint for {}".format(mapper_device))
            command = [
                "/bin/findmnt",
                "--source", mapper_device,
                "--json"
            ]
            result = sp.check_output(command).decode("utf-8")
            data = json.loads(result)
            filesystems = data["filesystems"]
            if len(filesystems) < 1:
                raise Exception("No parent device {} found!".format(mapper_device))
            return filesystems[0]["target"]
    
        def get_fstab(self, snapshots):
            return "\n".join(snapshot.get_tab_entry() for snapshot in snapshots)
    
    class Snapshot:
        def __init__(self, parent_volume, timestamp, active=False):
            self.parent_volume = parent_volume
            self.timestamp = timestamp
            self.active = active
    
        def __repr__(self):
            return self.get_name()
    
        def get_timestamp_str(self):
            return self.timestamp.strftime(Config.timestamp_format)
    
        def get_name(self):
            return "{}-snapshot-{}".format(
                self.parent_volume.name,
                self.get_timestamp_str())
    
        @staticmethod
        def parse_name(name):
            parent_name, _, timestamp_str = name.split("-", 2)
            timestamp = datetime.strptime(timestamp_str, Config.timestamp_format)
            return parent_name, timestamp
    
        def get_full_volume(self):
            return "{}/{}".format(self.parent_volume.volume_group, self.get_name())
    
        def get_mountpoint(self):
            return os.path.join(
                self.parent_volume.snapshot_dir, self.get_timestamp_str()
            )
    
        def create_mountpoint(self):
            logging.debug("creating mountpoint {}".format(self.get_mountpoint()))
            os.makedirs(self.get_mountpoint(), mode=0o755, exist_ok=True)
    
        def find_mountpoint(self):
            logging.debug("searching snapshot mountpoint for {}".format(self.get_name()))
            command = [
                "/bin/findmnt",
                "--source", self.get_mapper_device(),
                "--json",
            ]
            try:
                result = sp.check_output(command).decode("utf-8")
                data = json.loads(result)
                filesystems = data["filesystems"]
                if len(filesystems) < 1:
                    return None
                return filesystems[0]["target"]
            except sp.CalledProcessError:
                return None
    
        def get_mapper_device(self):
            return "/dev/mapper/{}-{}".format(
                self.parent_volume.volume_group,
                self.get_name().replace("-", "--"))
    
        def get_mount_options(self):
            return "ro,noatime,nouuid,norecovery"
    
        def get_tab_entry(self):
            return "{} -fstype=xfs,{} :{}".format(
                self.get_name(),
                self.get_mount_options(),
                self.get_mapper_device())
    
        def create(self):
            logging.debug("creating snapshot {}".format(self.get_name()))
            parent_mountpoint = self.parent_volume.get_mountpoint()
            #with xfs_freeze(parent_mountpoint): # not necessary, lvm does this
            create_command = [
                "/sbin/lvcreate",
                "--snapshot",
                "--name", self.get_name(),
                "--permission", "r",
                self.parent_volume.get_full_name()
            ]
            run_process(create_command, check=True)
            self.mount()
    
        def mount(self):
            logging.debug("mounting snapshot {}".format(self.get_name()))
            if self.check_mount():
                logging.debug("already mounted")
                return
            activate_command = [
                "/sbin/lvchange",
                "--activate", "y",
                "--ignoreactivationskip",
                self.get_full_volume()
            ]
            run_process(activate_command, check=True)
            self.active = True
            self.create_mountpoint()
            mount_command = [
                "/bin/mount",
                self.get_mapper_device(),
                self.get_mountpoint(),
                "-o", self.get_mount_options()
            ]
            run_process(mount_command, check=True)
    
        def check_mount(self):
            check_command = [
                "/bin/findmnt",
                "--source", self.get_mapper_device()
            ]
            try:
                sp.check_call(check_command, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
                return True
            except sp.CalledProcessError:
                return False
    
        def remove(self):
            logging.debug("removing snapshot {}".format(self.get_name()))
            self.unmount()
            remove_command = [
                "/sbin/lvremove",
                self.get_full_volume()
            ]
            run_process(remove_command, check=True)
    
        def unmount(self):
            logging.debug("unmounting snapshot {}".format(self.get_name()))
            mountpoint = self.find_mountpoint()
            if mountpoint is not None:
                if self.check_mount():
                    unmount_command = [
                        "/bin/umount",
                        mountpoint
                    ]
                    run_process(unmount_command, check=True)
                if os.path.isdir(mountpoint):
                    os.rmdir(mountpoint)
            deactivate_command = [
                "/sbin/lvchange",
                "--activate", "n",
                "--ignoreactivationskip",
                self.get_full_volume()
            ]
            run_process(deactivate_command, check=True)
            self.active = False
    
        @staticmethod
        def list_snapshots():
            list_command = [
                "/sbin/lvs",
                "-o", "vg_name,lv_name,lv_active,origin,lv_role",
                "--reportformat", "json"
            ]
            result = sp.check_output(list_command).decode("utf-8")
            data = json.loads(result)
            raw_volumes = data["report"][0]["lv"]
            parent_name_map = {}
            snapshots = {}
            raw_snapshots = []
            for raw_volume in raw_volumes:
                volume_group = raw_volume["vg_name"]
                volume_name = raw_volume["lv_name"]
                active = raw_volume["lv_active"] == "active"
                origin = raw_volume["origin"] or None
                roles = raw_volume["lv_role"].split(",")
                if "snapshot" in roles:
                    raw_snapshots.append((volume_group, volume_name, active, origin, roles))
                elif "public" in roles:
                    volume = Volume(volume_group, volume_name)
                    snapshots[volume] = []
                    parent_name_map[volume.name] = volume
            for (volume_group, volume_name, active, origin, roles) in raw_snapshots:
                try:
                    parent_name, timestamp = Snapshot.parse_name(volume_name)
                    if parent_name != origin:
                        raise Exception("Parent volume name not matching: "
                            "'{}' != '{}'".format(parent_name, origin))
                    parent = parent_name_map[parent_name]
                    snapshot = Snapshot(parent, timestamp, active)
                    snapshots[parent].append(snapshot)
                except ValueError:
                    # a snapshot, but not named like the autosnapshots
                    # we better keep the hands off them
                    pass
            return snapshots
    
    
    class Period:
        def __init__(self, target_number, interval):
            self.target_number = target_number
            self.interval = interval
    
        def __repr__(self):
            return "Keep {} at interval {}d{}s (since {})".format(
                self.target_number, self.interval.days,
                self.interval.seconds, self.get_start())
    
        def get_start(self):
            return datetime.now() - self.target_number * self.interval
        
        @staticmethod
        def build_regex():
            parts = [r"(?:(?P<{0}>\d+){0})?".format(key) for key in Config.period_keys]
            return "".join(parts)
    
        @staticmethod
        def parse_interval(text):
            period_keys = Config.period_keys
            regex = Period.build_regex()
            match = re.fullmatch(regex, text)
            if match is None:
                raise Exception("Invalid interval config: '{}', "
                    "needs to match '{}'".format(text, regex))
            seconds = 0
            groups = match.groupdict()
            for key in period_keys:
                if groups[key] is not None:
                    seconds += period_keys[key] * int(groups[key])
            return timedelta(seconds=seconds)
    
    def mark_snapshots(snapshots, periods, min_interval):
        all_snapshots = set(snapshots)
        marked_snapshots = set()
        budgets = {period: period.target_number for period in periods}
        last_snapshot = {period: None for period in periods}
        for snapshot in sorted(snapshots, key=lambda s: s.timestamp):
            for period in sorted(periods, key=lambda p: p.interval):
                required_distance = period.interval - min_interval/2
                if (budgets[period] > 0
                        and period.get_start() < snapshot.timestamp):
                    if (last_snapshot[period] is None
                            or abs(snapshot.timestamp - last_snapshot[period].timestamp) > required_distance):
                        marked_snapshots.add(snapshot)
                        last_snapshot[period] = snapshot
                        budgets[period] -= 1
        unmarked_snapshots = all_snapshots - marked_snapshots
        return unmarked_snapshots
    
    def list_snapshots():
        snapshots = Snapshot.list_snapshots()
        for volume in snapshots:
            print("volume: {}".format(str(volume)))
            for snapshot in snapshots[volume]:
                print(" {}".format(str(snapshot)))
    
    def mount_snapshots():
        snapshots = Snapshot.list_snapshots()
        for volume in snapshots:
            for snapshot in snapshots[volume]:
                snapshot.mount()
    
    def unmount_snapshots():
        snapshots = Snapshot.list_snapshots()
        for volume in snapshots:
            for snapshot in snapshots[volume]:
                snapshot.unmount()
    
    def update_snapshots():
        snapshots = Snapshot.list_snapshots()
        periods, min_interval = Config.periods, Config.min_interval
        for volume in set(periods.keys()) - set(snapshots.keys()):
            logging.warn("Warning: Volume {} is configured but does not exist or has no snapshots.".format(volume))
        for volume in set(snapshots.keys()) - set(periods.keys()):
            logging.warn("Warning: Volume {} does exist but is not configured.".format(volume))
            snapshots.pop(volume)
        for volume in snapshots:
            unmarked_snapshots = mark_snapshots(snapshots[volume], periods[volume], min_interval)
            logging.info("removing snapshots {}".format(unmarked_snapshots))
            for snapshot in unmarked_snapshots:
                snapshot.remove()
            logging.info("creating new snapshot")
            new_snapshot = Snapshot(volume, datetime.now())
            new_snapshot.create()
    
    operations = {
        "list": list_snapshots,
        "update": update_snapshots,
        "mount": mount_snapshots,
        "unmount": unmount_snapshots,
    }
    
    def main():
        import argparse
        parser = argparse.ArgumentParser(description="Manage snapshots of thin LVM volumes")
        parser.add_argument("command", choices=["list", "update", "mount", "unmount"], default="list", nargs="?")
        parser.add_argument("--config", help="path to config file", required=False)
        parser.add_argument("--verbose", "-v", action="count", help="do not redirect the command output to /dev/null")
        args = parser.parse_args()
        loglevels = {
            0: logging.ERROR,
            1: logging.WARNING,
            2: logging.INFO,
            3: logging.DEBUG
        }
        loglevel = loglevels[min(3, max(0, args.verbose or 0))]
        logging.basicConfig(level=loglevel)
        Config.parse(Config.load(args.config))
    
        operations[args.command]()
        
    if __name__ == "__main__":
        main()