diff --git a/baremetal/files/apcupsd/config.py b/baremetal/files/apcupsd/config.py new file mode 100644 index 0000000000000000000000000000000000000000..a6427b6896c464d6553690644d1226561cd9e9c9 --- /dev/null +++ b/baremetal/files/apcupsd/config.py @@ -0,0 +1,5 @@ +RAPI_ENDPOINT="cloud.fsmpi.rwth-aachen.de" +RAPI_USER = "gntweb" +RAPI_PASSWORD = "" +SOCKET_PATH = "/tmp/gnt-ha-daemon.sock" +BUFFER_SIZE = 256 diff --git a/baremetal/files/apcupsd/gnt-ha-client.py b/baremetal/files/apcupsd/gnt-ha-client.py new file mode 100644 index 0000000000000000000000000000000000000000..678b13f83b5961287992cf4a2f5fcbe9ae2148da --- /dev/null +++ b/baremetal/files/apcupsd/gnt-ha-client.py @@ -0,0 +1,58 @@ +#!/usr/bin/python2 +import json +import socket +import os +import config + +class Socket(object): + def __init__(self, address): + self.address = address + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.active = False + + def __enter__(self): + self.socket.connect(self.address) + self.active = True + return self + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is not None: + print exc_type, exc_value, traceback + self.active = False + return self + + def step(self): + connection, client_address = self.socket.accept() + message = self.receive(connection) + connection.close() + return message + + def receive(self, connection): + parts = [] + while True: + part = connection.recv(BUFFER_SIZE) + if len(part) > 0: + parts.append(part) + else: + break + return json.loads(b"".join(parts).decode("utf-8")) + + def send(self, message): + self.socket.send(json.dumps(message).encode("utf-8")) + +def main(): + import sys + if len(sys.argv) < 2: + raise Exception("Missing arguments") + message = {} + print(sys.argv) + for argument in sys.argv[1:]: + if "=" in argument: + key, value = argument.split("=", 1) + message[key] = value + with Socket(config.SOCKET_PATH) as sock: + sock.send(message) + + +if __name__ == "__main__": + main() diff --git a/baremetal/files/apcupsd/gnt-ha-daemon.py b/baremetal/files/apcupsd/gnt-ha-daemon.py new file mode 100644 index 0000000000000000000000000000000000000000..b04aa99e8d6ea1887015cc36f5bf1707a82cbd7a --- /dev/null +++ b/baremetal/files/apcupsd/gnt-ha-daemon.py @@ -0,0 +1,169 @@ +#!/usr/bin/python2 +import json +import socket +import os +import time +from ganeti import GanetiRapiClient +import config + +TIHANGE = "tihange" +DOEL = "doel" +USVS = {TIHANGE, DOEL} +USV = "usv" + +RUNNING = "running" +ONBATTERY = "onbattery" +KILLPOWER = "killpower" +STATES = {RUNNING, ONBATTERY, KILLPOWER} +STATE = "state" + +STATE_CHANGE = "state-change" +COMMANDS = {STATE_CHANGE} +COMMAND = "command" + +TARGET_USV = "usv" +TARGETS = {TARGET_USV} +TARGET = "target" + +class Socket(object): + def __init__(self, address): + self.address = address + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.active = False + + def __enter__(self): + self.socket.bind(self.address) + self.socket.listen(1) + self.active = True + return self + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is not None: + print exc_type, exc_value, traceback + self.active = False + os.unlink(self.address) + return self + + def step(self): + connection, client_address = self.socket.accept() + message = self.receive(connection) + connection.close() + return message + + def receive(self, connection): + parts = [] + while True: + part = connection.recv(config.BUFFER_SIZE) + if len(part) > 0: + parts.append(part) + else: + break + return json.loads(b"".join(parts).decode("utf-8")) + +class UncriticalHADaemonException(Exception): + pass + +class CriticalHADaemonException(Exception): + pass + +class Daemon(object): + def __init__(self, address): + self.address = address + self.usv_state = {usv: RUNNING for usv in USVS} + self.rapi = GanetiRapiClient(host=config.RAPI_ENDPOINT, username=config.RAPI_USER, password=config.RAPI_PASSWORD) + + def run(self): + with Socket(self.address) as sock: + while True: + try: + message = sock.step() + if COMMAND not in message: + raise UncriticalHADaemonException("Invalid message: {}".format(message)) + command = message[COMMAND] + if command not in COMMANDS: + raise UncriticalHADaemonException("Invalid command: {} (expected on of {})".format(command, COMMANDS)) + if command == STATE_CHANGE: + if not TARGET in message: + raise UncriticalHADaemonException("Invalid message: {}".format(message)) + target = message[TARGET] + if target not in TARGETS: + raise UncriticalHADaemonException("Invalid target: {} (expected one of {})".format(target, TARGETS)) + if target == TARGET_USV: + if not USV in message or not STATE in message: + raise UncriticalHADaemonException("Invalid message: {}".format(message)) + usv = message[USV] + state = message[STATE] + if not usv in USVS: + raise UncriticalHADaemonException("Invalid usv: {} (expected one of {})".format(usv, USVS)) + if not state in STATES: + raise UncriticalHADaemonException("Invalid state: {} (expected one of {})".format(state, STATES)) + old_state = self.usv_state[usv] + self.usv_state[usv] = state + self.on_usv_state_change(usv, state, old_state) + else: + raise UncriticalHADaemonException("Can't handle target: {}".format(target)) + else: + raise UncriticalHADaemonException("Can't handle command: {}".format(command)) + + except UncriticalHADaemonException, exp: + print(exp) + pass + + def on_usv_state_change(self, usv, new_state, old_state): + print("got usv state change on {}: {} -> {}".format(usv, old_state, new_state)) + if new_state == RUNNING: + return + if new_state == old_state: + return + if new_state == ONBATTERY: + hostname = socket.gethostname() + cluster_info = self.rapi.GetInfo() + master_name = cluster_info["master"] + if hostname == master_name: + print("is master") + self.shutdown_instances() + self.wait_until_instances_empty() + self.wait_until_nodes_empty() + self.shutdown_self() + else: + print("is not master") + self.wait_until_instances_empty() + self.shutdown_self() + + def shutdown_instances(self): + instances = self.rapi.GetInstances(bulk=True) + for instance in instances: + # TODO: implement this with the RAPI + print "need to shut down", instance + + def wait_until_instances_empty(self): + while True: + instances = self.rapi.GetInstances(bulk=True) + running_instances = filter(lambda instance: instance["status"] == "running", instances) + if len(running_instances) == 0: + break + print "got {} of {} instances running, waiting...".format(len(running_instances), len(instances)) + time.sleep(10) + + def wait_until_nodes_empty(self): + while True: + nodes = self.rapi.GetNodes(bulk=True) + running_nodes = filter(lambda node: node.offline == False, nodes) + if len(running_nodes) == 1: + break + time.sleep(10) + + def shutdown_self(self): + # TODO: implement this with systemctl poweroff + print "need to shutdown myself here" + + + +def main(): + daemon = Daemon(config.SOCKET_PATH) + daemon.run() + +if __name__ == "__main__": + main() + + diff --git a/baremetal/files/apcupsd/power-daemon.py b/baremetal/files/apcupsd/power-daemon.py deleted file mode 100644 index ffd060386bedab155e0c368d73ab8b4f452c9367..0000000000000000000000000000000000000000 --- a/baremetal/files/apcupsd/power-daemon.py +++ /dev/null @@ -1,2 +0,0 @@ -import json -import socket