#!/usr/bin/env python3 #234567891123456789212345678931234567894123456789512345678961234567897123456789 # encoding: utf-8 from datetime import datetime, tzinfo, timedelta from sanelogging import log from time import sleep import json import os import pathlib import socket import subprocess import uuid def dirs_in_dir(path): assert os.path.isdir(path) items = os.listdir(path) output = [] for fn in items: if os.path.isdir(path + '/' + fn): output.append(path + '/' + fn) return output class simple_utc(tzinfo): def tzname(self,**kwargs): return "UTC" def utcoffset(self, dt): return timedelta(0) def isodatetime(): return str( datetime.utcnow().replace(tzinfo=simple_utc()).isoformat() ).replace('+00:00', 'Z') def read_file(path): with open(path) as f: data = f.read() return data # FIXME is there some auto reading-and-writing JSONFile class out there def read_json_file(path): return json.loads(read_file(path)) def json_pretty(obj): return json.dumps(obj, sort_keys=True, indent=4, separators=(',', ': ')) def write_json_file(path, obj): write_file(path, json_pretty(obj)) def write_file(path,content): with open(path, 'w') as file: file.write(content) class STRpcJob(object): def __init__(self,rpcserver,newjobdir): self.uid = str(uuid.uuid4()) self.parent = rpcserver self.jobdir = newjobdir self.shortname = os.path.basename(newjobdir) if not os.path.isdir(self.jobdir): raise Exception("job directory not found") self.statusfile = self.jobdir + '/' + 'status.json' self.jobfile = self.jobdir + '/' + 'job.json' self.jobdesc = None self.state = None self.error = None if os.path.exists(self.statusfile): # job was known to previous version x = read_json_file(self.statusfile) self.uid = x['uid'] self.shortname = x['shortname'] if x['state'] not in ['init', 'ready']: self.set_state('old') else: self.set_state('init') self.crank() def write_template_jobfile(self, path): thisdir = os.path.dirname(os.path.realpath(__file__)) template = read_json_file(thisdir + '/' + 'job-template.json') write_json_file(path, template) def __str__(self): return "" % (self.uid, self.shortname) def write_status(self): write_file(self.statusfile, json_pretty(self.to_json())) def read_jobfile(self): jobdesc = None try: jobdesc = read_json_file(self.jobfile) except(FileNotFoundError): jobfile_template_filename = self.jobdir + '/' + 'job-template.json' if not os.path.exists(jobfile_template_filename): self.write_template_jobfile(jobfile_template_filename) return # FIXME validate jobdesc before changing state or updating job-ject self.jobdesc = jobdesc self.set_state('ready') def set_state(self, state): STATES = [ 'done', 'error', 'init', 'old', 'ready', 'running', ] assert state in STATES self.state = state log.info("%s is in state %s" % (self, self.state)) self.write_status() def crank(self): if self.state == 'init': self.read_jobfile() elif self.state == 'ready': self.run() def to_json(self): me = { 'state': self.state, 'hostname': self.parent.hostname, 'jobdir': self.jobdir, 'shortname': self.shortname, 'uid': self.uid, } if self.error: me['error'] = self.error return me def ready_to_run(self): if self.state == 'ready': return True return False def run(self): self.set_state('running') log.die("run job here") class STRpcRunnerService(object): def __init__(self): self.dir = os.environ.get('STRPC_BASE','/var/run/strpc') self.hostname = socket.gethostname() self.hostdir = self.dir + '/' + self.hostname self.logdir = self.hostdir + '/logs' self.statusfile = self.hostdir + '/status.json' self.jobdir = self.hostdir + '/jobs' self.jobs = [] self.running = True if not os.path.isdir(self.dir): raise Exception("STRPC base directory not found") self.setup() def setup(self): pathlib.Path(self.jobdir).mkdir(parents=True, exist_ok=True) pathlib.Path(self.logdir).mkdir(parents=True, exist_ok=True) self.write_status_file() def write_status_file(self): log.info("writing out server status") write_json_file(self.statusfile, { 'started': isodatetime(), 'hostname': self.hostname, 'jobs': [ job.to_json() for job in self.jobs ], }) def start(self): log.info("strpcrunner starting up on %s" % self.hostname) os.chdir(self.dir) self.run_forever() def scan_jobsdir(self, jobsdir): existing_jobdirs = [ job.jobdir for job in self.jobs ] for newjobdir in dirs_in_dir(jobsdir): if newjobdir not in existing_jobdirs: log.info("found new job: " + os.path.basename(newjobdir)) self.jobs.append(STRpcJob(self,newjobdir)) self.write_status_file() def run_jobs(self): for job in self.jobs: if job.ready_to_run(): job.run() def run_forever(self): STATUS_INTERVAL = 60 LOOP_INTERVAL = 0.5 STATUS_ITERATIONS = STATUS_INTERVAL / LOOP_INTERVAL i = 0 while self.running: i = i + 1 if i >= STATUS_ITERATIONS: i = 0 self.write_status_file() self.scan_jobsdir(self.jobdir) for job in self.jobs: job.crank() sleep(LOOP_INTERVAL) def main(): s = STRpcRunnerService() s.start() if __name__ == "__main__": main()