strpc/strpc/strpc.py

215 lines
6.1 KiB
Python
Executable File

#!/usr/bin/env python3
#234567891123456789212345678931234567894123456789512345678961234567897123456789
# encoding: utf-8
from datetime import datetime, tzinfo, timedelta
from sanelogging import log
from time import sleep
import json
import os
import pathlib
import socket
import subprocess
import uuid
def dirs_in_dir(path):
assert os.path.isdir(path)
items = os.listdir(path)
output = []
for fn in items:
if os.path.isdir(path + '/' + fn):
output.append(path + '/' + fn)
return output
class simple_utc(tzinfo):
def tzname(self,**kwargs):
return "UTC"
def utcoffset(self, dt):
return timedelta(0)
def isodatetime():
return str(
datetime.utcnow().replace(tzinfo=simple_utc()).isoformat()
).replace('+00:00', 'Z')
def read_file(path):
with open(path) as f:
data = f.read()
return data
# FIXME is there some auto reading-and-writing JSONFile class out there
def read_json_file(path):
return json.loads(read_file(path))
def json_pretty(obj):
return json.dumps(obj, sort_keys=True, indent=4, separators=(',', ': '))
def write_json_file(path, obj):
write_file(path, json_pretty(obj))
def write_file(path,content):
with open(path, 'w') as file:
file.write(content)
class STRpcJob(object):
def __init__(self,rpcserver,newjobdir):
self.uid = str(uuid.uuid4())
self.parent = rpcserver
self.jobdir = newjobdir
self.shortname = os.path.basename(newjobdir)
if not os.path.isdir(self.jobdir):
raise Exception("job directory not found")
self.statusfile = self.jobdir + '/' + 'status.json'
self.jobfile = self.jobdir + '/' + 'job.json'
self.jobdesc = None
self.state = None
self.error = None
if os.path.exists(self.statusfile):
# job was known to previous version
x = read_json_file(self.statusfile)
self.uid = x['uid']
self.shortname = x['shortname']
if x['state'] not in ['init', 'ready']:
self.set_state('old')
else:
self.set_state('init')
self.crank()
def write_template_jobfile(self, path):
thisdir = os.path.dirname(os.path.realpath(__file__))
template = read_json_file(thisdir + '/' + 'job-template.json')
write_json_file(path, template)
def __str__(self):
return "<STRrpcJob(%s,'%s'>" % (self.uid, self.shortname)
def write_status(self):
write_file(self.statusfile, json_pretty(self.to_json()))
def read_jobfile(self):
jobdesc = None
try:
jobdesc = read_json_file(self.jobfile)
except(FileNotFoundError):
jobfile_template_filename = self.jobdir + '/' + 'job-template.json'
if not os.path.exists(jobfile_template_filename):
self.write_template_jobfile(jobfile_template_filename)
return
# FIXME validate jobdesc before changing state or updating job-ject
self.jobdesc = jobdesc
self.set_state('ready')
def set_state(self, state):
STATES = [
'done',
'error',
'init',
'old',
'ready',
'running',
]
assert state in STATES
self.state = state
log.info("%s is in state %s" % (self, self.state))
self.write_status()
def crank(self):
if self.state == 'init':
self.read_jobfile()
elif self.state == 'ready':
self.run()
def to_json(self):
me = {
'state': self.state,
'hostname': self.parent.hostname,
'jobdir': self.jobdir,
'shortname': self.shortname,
'uid': self.uid,
}
if self.error:
me['error'] = self.error
return me
def ready_to_run(self):
if self.state == 'ready':
return True
return False
def run(self):
self.set_state('running')
log.die("run job here")
class STRpcRunnerService(object):
def __init__(self):
self.dir = os.environ.get('STRPC_BASE','/var/run/strpc')
self.hostname = socket.gethostname()
self.hostdir = self.dir + '/' + self.hostname
self.logdir = self.hostdir + '/logs'
self.statusfile = self.hostdir + '/status.json'
self.jobdir = self.hostdir + '/jobs'
self.jobs = []
self.running = True
if not os.path.isdir(self.dir):
raise Exception("STRPC base directory not found")
self.setup()
def setup(self):
pathlib.Path(self.jobdir).mkdir(parents=True, exist_ok=True)
pathlib.Path(self.logdir).mkdir(parents=True, exist_ok=True)
self.write_status_file()
def write_status_file(self):
log.info("writing out server status")
write_json_file(self.statusfile, {
'started': isodatetime(),
'hostname': self.hostname,
'jobs': [ job.to_json() for job in self.jobs ],
})
def start(self):
log.info("strpcrunner starting up on %s" % self.hostname)
os.chdir(self.dir)
self.run_forever()
def scan_jobsdir(self, jobsdir):
existing_jobdirs = [ job.jobdir for job in self.jobs ]
for newjobdir in dirs_in_dir(jobsdir):
if newjobdir not in existing_jobdirs:
log.info("found new job: " + os.path.basename(newjobdir))
self.jobs.append(STRpcJob(self,newjobdir))
self.write_status_file()
def run_jobs(self):
for job in self.jobs:
if job.ready_to_run():
job.run()
def run_forever(self):
STATUS_INTERVAL = 60
LOOP_INTERVAL = 0.5
STATUS_ITERATIONS = STATUS_INTERVAL / LOOP_INTERVAL
i = 0
while self.running:
i = i + 1
if i >= STATUS_ITERATIONS:
i = 0
self.write_status_file()
self.scan_jobsdir(self.jobdir)
for job in self.jobs:
job.crank()
sleep(LOOP_INTERVAL)
def main():
s = STRpcRunnerService()
s.start()
if __name__ == "__main__":
main()