Skip to content
Snippets Groups Projects
Commit d14f54b7 authored by Robin Sonnabend's avatar Robin Sonnabend
Browse files

Changed on gnt-ha-daemon

parent e6054f67
Branches
No related tags found
No related merge requests found
RAPI_ENDPOINT="cloud.fsmpi.rwth-aachen.de"
RAPI_USER = "gntweb"
RAPI_PASSWORD = ""
SOCKET_PATH = "/tmp/gnt-ha-daemon.sock"
BUFFER_SIZE = 256
#!/usr/bin/python2
import json
import socket
import os
import config
class Socket(object):
def __init__(self, address):
self.address = address
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.active = False
def __enter__(self):
self.socket.connect(self.address)
self.active = True
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
print exc_type, exc_value, traceback
self.active = False
return self
def step(self):
connection, client_address = self.socket.accept()
message = self.receive(connection)
connection.close()
return message
def receive(self, connection):
parts = []
while True:
part = connection.recv(BUFFER_SIZE)
if len(part) > 0:
parts.append(part)
else:
break
return json.loads(b"".join(parts).decode("utf-8"))
def send(self, message):
self.socket.send(json.dumps(message).encode("utf-8"))
def main():
import sys
if len(sys.argv) < 2:
raise Exception("Missing arguments")
message = {}
print(sys.argv)
for argument in sys.argv[1:]:
if "=" in argument:
key, value = argument.split("=", 1)
message[key] = value
with Socket(config.SOCKET_PATH) as sock:
sock.send(message)
if __name__ == "__main__":
main()
#!/usr/bin/python2
import json
import socket
import os
import time
from ganeti import GanetiRapiClient
import config
TIHANGE = "tihange"
DOEL = "doel"
USVS = {TIHANGE, DOEL}
USV = "usv"
RUNNING = "running"
ONBATTERY = "onbattery"
KILLPOWER = "killpower"
STATES = {RUNNING, ONBATTERY, KILLPOWER}
STATE = "state"
STATE_CHANGE = "state-change"
COMMANDS = {STATE_CHANGE}
COMMAND = "command"
TARGET_USV = "usv"
TARGETS = {TARGET_USV}
TARGET = "target"
class Socket(object):
def __init__(self, address):
self.address = address
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.active = False
def __enter__(self):
self.socket.bind(self.address)
self.socket.listen(1)
self.active = True
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
print exc_type, exc_value, traceback
self.active = False
os.unlink(self.address)
return self
def step(self):
connection, client_address = self.socket.accept()
message = self.receive(connection)
connection.close()
return message
def receive(self, connection):
parts = []
while True:
part = connection.recv(config.BUFFER_SIZE)
if len(part) > 0:
parts.append(part)
else:
break
return json.loads(b"".join(parts).decode("utf-8"))
class UncriticalHADaemonException(Exception):
pass
class CriticalHADaemonException(Exception):
pass
class Daemon(object):
def __init__(self, address):
self.address = address
self.usv_state = {usv: RUNNING for usv in USVS}
self.rapi = GanetiRapiClient(host=config.RAPI_ENDPOINT, username=config.RAPI_USER, password=config.RAPI_PASSWORD)
def run(self):
with Socket(self.address) as sock:
while True:
try:
message = sock.step()
if COMMAND not in message:
raise UncriticalHADaemonException("Invalid message: {}".format(message))
command = message[COMMAND]
if command not in COMMANDS:
raise UncriticalHADaemonException("Invalid command: {} (expected on of {})".format(command, COMMANDS))
if command == STATE_CHANGE:
if not TARGET in message:
raise UncriticalHADaemonException("Invalid message: {}".format(message))
target = message[TARGET]
if target not in TARGETS:
raise UncriticalHADaemonException("Invalid target: {} (expected one of {})".format(target, TARGETS))
if target == TARGET_USV:
if not USV in message or not STATE in message:
raise UncriticalHADaemonException("Invalid message: {}".format(message))
usv = message[USV]
state = message[STATE]
if not usv in USVS:
raise UncriticalHADaemonException("Invalid usv: {} (expected one of {})".format(usv, USVS))
if not state in STATES:
raise UncriticalHADaemonException("Invalid state: {} (expected one of {})".format(state, STATES))
old_state = self.usv_state[usv]
self.usv_state[usv] = state
self.on_usv_state_change(usv, state, old_state)
else:
raise UncriticalHADaemonException("Can't handle target: {}".format(target))
else:
raise UncriticalHADaemonException("Can't handle command: {}".format(command))
except UncriticalHADaemonException, exp:
print(exp)
pass
def on_usv_state_change(self, usv, new_state, old_state):
print("got usv state change on {}: {} -> {}".format(usv, old_state, new_state))
if new_state == RUNNING:
return
if new_state == old_state:
return
if new_state == ONBATTERY:
hostname = socket.gethostname()
cluster_info = self.rapi.GetInfo()
master_name = cluster_info["master"]
if hostname == master_name:
print("is master")
self.shutdown_instances()
self.wait_until_instances_empty()
self.wait_until_nodes_empty()
self.shutdown_self()
else:
print("is not master")
self.wait_until_instances_empty()
self.shutdown_self()
def shutdown_instances(self):
instances = self.rapi.GetInstances(bulk=True)
for instance in instances:
# TODO: implement this with the RAPI
print "need to shut down", instance
def wait_until_instances_empty(self):
while True:
instances = self.rapi.GetInstances(bulk=True)
running_instances = filter(lambda instance: instance["status"] == "running", instances)
if len(running_instances) == 0:
break
print "got {} of {} instances running, waiting...".format(len(running_instances), len(instances))
time.sleep(10)
def wait_until_nodes_empty(self):
while True:
nodes = self.rapi.GetNodes(bulk=True)
running_nodes = filter(lambda node: node.offline == False, nodes)
if len(running_nodes) == 1:
break
time.sleep(10)
def shutdown_self(self):
# TODO: implement this with systemctl poweroff
print "need to shutdown myself here"
def main():
daemon = Daemon(config.SOCKET_PATH)
daemon.run()
if __name__ == "__main__":
main()
import json
import socket
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment