diff --git a/db_schema.sql b/db_schema.sql index c1570e44f4003323ca541f159badb8fa0adec4cd..c66eb86892ed3a4fb963ed0d2299965bf0e16b22 100644 --- a/db_schema.sql +++ b/db_schema.sql @@ -241,6 +241,7 @@ CREATE TABLE IF NOT EXISTS `jobs` ( `priority` INTEGER NOT NULL DEFAULT 0, `state` text NOT NULL DEFAULT 'ready', `time_finished` datetime DEFAULT '', + `time_scheduled` datetime DEFAULT '', `time_created` datetime NOT NULL, `last_ping` datetime NOT NULL DEFAULT '', `worker` text DEFAULT NULL, diff --git a/jobs.py b/jobs.py index aff04af9758791c49ab5eccd0d75424d88131da2..909aab3bd99a43dcc21abd157199d0db10ddac6c 100644 --- a/jobs.py +++ b/jobs.py @@ -30,27 +30,54 @@ def jobs_api_token_required(func): def date_json_handler(obj): return obj.isoformat() if hasattr(obj, 'isoformat') else obj -@app.route('/jobs/api/ping', methods=['GET', 'POST']) +@sched_func(10) +def jobs_catch_broken(): + # scheduled but never pinged + query('BEGIN') + query('UPDATE jobs SET state="ready" WHERE state="scheduled" and time_scheduled < ?', datetime.now() - timedelta(seconds=10)) + try: + query('COMMIT') + except: + pass + # no pings since 60s + query('BEGIN') + query('UPDATE jobs SET state="failed" WHERE state="running" and last_ping < ?', datetime.now() - timedelta(seconds=60)) + try: + query('COMMIT') + except: + pass + +@app.route('/jobs/api/worker/<hostname>/ping', methods=['GET', 'POST']) @jobs_api_token_required -def jobs_worker_ping(): - hostname = request.values['host'] - query('INSERT OR REPLACE INTO worker (hostname, last_ping) values (?, ?)',hostname,datetime.now()) +def jobs_worker_ping(hostname): + query('INSERT OR REPLACE INTO worker (hostname, last_ping) values (?, ?)', hostname, datetime.now()) return 'OK',200 -@app.route('/jobs/api/job/<id>/ping', methods=['GET', 'POST']) +@app.route('/jobs/api/job/<int:id>/ping', methods=['GET', 'POST']) @jobs_api_token_required def jobs_ping(id): hostname = request.values['host'] - query('UPDATE jobs SET worker = ?, last_ping = ? where id = ?',hostname,datetime.now(),id) + status = json.dumps(request.values['status'], default=date_json_handler) + state = request.values['state'] + query('UPDATE jobs SET worker = ?, last_ping = ?, status = ?, state = ? where id = ?', hostname, datetime.now(), status, state, id) + return 'OK',200 + +@app.route('/jobs/api/job/<int:id>/finished', methods=['GET', 'POST']) +@jobs_api_token_required +def jobs_finished(id): + if 'status' in request.values: + status = request.values['status'] + else: + status = json.dumps(request.get_json()['status'], default=date_json_handler) + query('UPDATE jobs SET time_finished = ?, status = ?, state = "finished" where id = ?', datetime.now(), status, id) return 'OK',200 -@app.route('/jobs/api/schedule', methods=['POST']) +@app.route('/jobs/api/worker/<hostname>/schedule', methods=['POST']) @jobs_api_token_required -def jobs_schedule(): +def jobs_schedule(hostname): hostdata = request.get_json() - print(hostdata) if not hostdata: - return 'no data', 500 + return 'no hostdata sent', 400 job = None jobtypes = hostdata['jobtypes'] if 'jobtypes' in hostdata else [] while (not job): @@ -59,7 +86,9 @@ def jobs_schedule(): if i['type'] in hostdata['jobtypes'].split(','): job=i break - modify('UPDATE jobs SET state="running", worker = ? WHERE id = ?', hostdata['host'], job['id']) + if not job: + return 'no jobs', 503 + modify('UPDATE jobs SET state="scheduled", worker = ?, time_scheduled = ? WHERE id = ?', hostname, datetime.now(), job['id']) try: query("COMMIT") except: