Skip to content
Snippets Groups Projects
Select Git revision
  • 20779c645047b52a3f82d1bc6a0bb527d1d15f0c
  • master default
2 results

lip.tex

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    lvmsnapshot.py 6.48 KiB
    #!/usr/bin/env python3
    
    # create:
    # xfs_freeze -f <parent mountpoint>
    # lvcreate -s -n <snapshot name> -pr <parent volume>
    # xfs_freeze -u <parent mountpoint>
    # lvchange -ay -K <snapshot volume>
    # mount <snapshot device> <snapshot mountpoint> -nouuid,ro
    
    # delete:
    # umount <snapshot mountpoint>
    # lvchange -an -L <snapshot volume>
    # lvremove <snapshot volume>
    
    # naming scheme:
    # snapshot name: <parent mountpoint>-snapshot-<yyyy-mm-dd-hh-MM>
    # snapshot mountpoint: <base dir>/<parent mountpoint>/<yyyy-mm-dd-hh-MM>
    
    # how many snapshots to keep:
    # give number of snapshots for a given period
    # sort by period
    # mark snapshots to keep, from oldest within period to new
    # delete unmarked snapshots
    # periods are cumulative
    
    import os
    import re
    from datetime import datetime
    import subprocess as sp
    import json
    from contextlib import contextmanager
    
    SNAPSHOT_BASE_DIR = "/snapshots"
    TIMESTAMP_FORMAT = "%Y-%m-%d-%H-%M"
    
    @contextmanager
    def xfs_freeze(mountpoint):
        freeze_xfs(True)
        yield
        freeze_xfs(False)
    
    def freeze_xfs(mountpoint, freeze):
        command = [
            "/usr/sbin/xfs_freeze",
            "-f" if freeze else "-u",
            mountpoint
        ]
        sp.run(command, check=True)
    
    class Volume:
        def __init__(self, volume_group, name):
            self.volume_group = volume_group
            self.name = name
        
        def get_full_name(self):
            return "{}/{}".format(self.volume_group, self.name)
    
        def get_mapper_device(self):
            return "/dev/mapper/{}-{}".format(self.volume_group, self.name)
    
        def get_mountpoint(self):
            mapper_device = self.get_mapper_device()
            command = [
                "/bin/findmnt",
                "--source", mapper_device,
                "--json"
            ]
            result = sp.check_output(command)
            data = json.loads(result)
            filesystems = data["filesystems"]
            if len(filesystems) < 1:
                raise Exception("No parent device {} found!".format(mapper_device))
            return filesystems[0]["target"]
    
    
    class Snapshot:
        def __init__(self, parent_volume, timestamp, active=False):
            self.parent_volume = parent_volume
            self.timestamp = timestamp
            self.active = active
    
        def get_timestamp_str(self):
            return self.timestamp.strftime(TIMESTAMP_FORMAT)
    
        def get_name(self):
            return "{}-snapshot-{}".format(
                self.parent_volume.name,
                self.get_timestamp_str())
    
        @staticmethod
        def parse_name(name):
            parent_name, _, timestamp_str = name.split("-", 2)
            timestamp = datetime.strptime(timestamp_str, TIMESTAMP_FORMAT)
            return parent_name, timestamp
    
        def get_full_volume(self):
            return "{}/{}".format(self.volume_group, self.get_name())
    
        def get_mountpoint(self):
            return os.path.join([
                SNAPSHOT_BASE_DIR, self.parent_volume.name, self.get_timestamp_str()
            ])
    
        def get_mapper_device(self):
            return "/dev/mapper/{}-{}".format(self.volume_group, self.get_name())
    
        def create(self):
            parent_mountpoint = self.parent_volume.get_mountpoint()
            with xfs_freeze(mountpoint):
                create_command = [
                    "/sbin/lvcreate",
                    "--snapshot",
                    "--name", self.get_name(),
                    "--permission", "r",
                    self.parent_volume.get_full_name()
                ]
                sp.run(create_command, check=True)
            self.mount()
    
        def mount(self):
            activate_command = [
                "/sbin/lvchange",
                "--activate", "y",
                "--ignoreactivationskip",
                self.get_full_volume()
            ]
            sp.run(activate_command, check=True)
            self.active = True
            mount_command = [
                "/bin/mount",
                self.get_mapper_device(),
                self.get_mountpoint(),
                "-onouuid,ro" # nouuid is necessary for xfs snapshots
            ]
            sp.run(mount_command, check=True)
    
        def remove(self):
            self.unmount()
            remove_command = [
                "/sbin/lvremove",
                self.get_full_volume()
            ]
            sp.run(remove_command, check=True)
    
        def unmount(self):
            unmount_command = [
                "/bin/umount",
                self.get_mountpoint()
            ]
            sp.run(unmount_command, check=True)
            deactivate_command = [
                "/sbin/lvchange",
                "--activate", "n",
                "--ignoreactivationskip",
                self.get_full_volume()
            ]
            sp.run(deactivate_command, check=True)
            self.active = False
    
        @staticmethod
        def list_snapshots():
            list_command = [
                "/sbin/lvs",
                "-o", "vg_name,lv_name,lv_active,origin,lv_role",
                "--reportformat", "json"
            ]
            result = sp.check_output(list_command)
            data = json.loads(result)
            raw_volumes = data["report"][0]["lv"]
            parent_name_map = {}
            snapshots = {}
            raw_snapshots = []
            for raw_volume in raw_volumes:
                volume_group = raw_volume["vg_name"]
                volume_name = raw_volume["lv_name"]
                active = raw_volume["lv_active"] == "active"
                origin = raw_volume["origin"] or None
                roles = raw_volume["lv_role"].split(",")
                if "snapshot" in roles:
                    raw_snapshots.append((volume_group, volume_name, active, origin, roles))
                elif "public" in roles:
                    volume = Volume(volume_group, volume_name)
                    snapshots[volume] = []
                    parent_name_map[volume.name] = volume
                else:
                    print("Ignoring volume {}/{}".format(volume_group, volume_name)) # todo: remove this output
            for (volume_group, volume_name, active, origin, roles) in raw_snapshots:
                parent_name, timestamp = Snapshot.parse_name(volume_name)
                if parent_name != origin:
                    raise Exception("Parent volume name not matching: '{}' != '{}'".format(parent_name, origin))
                parent = parent_name_map[parent_name]
                snapshot = Snapshot(parent, timestamp, active)
                snapshots[parent].append(snapshot)
            return snapshots
    
    def load_config():
        import sys
        import toml
        config_path = "config.toml"
        ENV_VAR = "LVM_SNAPSHOT_CONFIG"
        if ENV_VAR in os.environ:
            config_path = os.environ[ENV_VAR]
        if len(sys.argv) > 1:
            config_path = sys.argv[1]
        with open(config_path, "r") as config_file:
            return toml.load(config_file)
    
    def main():
        config = load_config()
        print(config)
        snapshots = Snapshot.list_snapshots()
    
    if __name__ == "__main__":
        main()