strpc/strpc/job.py

154 lines
4.9 KiB
Python
Executable File

#!/usr/bin/env python3
#234567891123456789212345678931234567894123456789512345678961234567897123456789
# encoding: utf-8
from datetime import datetime, tzinfo, timedelta
from sanelogging import log
from time import sleep
import json
import os
import pathlib
import socket
import subprocess
import uuid
from .utils import *
class STRpcJob(object):
def __init__(self,rpcserver,newjobdir):
self.uid = str(uuid.uuid4())
self.parent = rpcserver
self.jobdir = newjobdir
self.shortname = os.path.basename(newjobdir)
if not os.path.isdir(self.jobdir):
raise Exception("job directory not found")
self.statusfile = self.jobdir + '/' + 'status.json'
self.jobfile = self.jobdir + '/' + 'job.json'
self.jobstdoutfile = self.jobdir + '/' + 'stdout.txt'
self.jobstderrfile = self.jobdir + '/' + 'stderr.txt'
self.stderrfd = None
self.stdoutfd = None
self.jobstart = None
self.jobruntime = None
self.jobdesc = None
self.runtimecap = 0
self.state = None
self.error = None
self.subprocess = None
if os.path.exists(self.statusfile):
# job was known to previous version
x = read_json_file(self.statusfile)
self.uid = x['uid']
self.shortname = x['shortname']
if x['state'] not in ['init', 'ready']:
self.set_state('old')
else:
self.set_state(x['state'])
else:
self.set_state('init')
self.crank()
def write_template_jobfile(self, path):
thisdir = os.path.dirname(os.path.realpath(__file__))
template = read_json_file(thisdir + '/' + 'job-template.json')
write_json_file(path, template)
def __str__(self):
return "<STRrpcJob(%s,'%s'>" % (self.uid, self.shortname)
def write_status(self):
write_file(self.statusfile, json_pretty(self.to_json()))
def read_jobfile(self):
jobdesc = None
try:
jobdesc = read_json_file(self.jobfile)
except(FileNotFoundError):
jobfile_template_filename = self.jobdir + '/' + 'job-template.json'
if not os.path.exists(jobfile_template_filename):
self.write_template_jobfile(jobfile_template_filename)
return
# FIXME validate jobdesc before changing state or updating job-ject
self.jobdesc = jobdesc
self.runtimecap = int(jobdesc['run'].get('max_runtime_secs',86400))
self.set_state('ready')
def set_state(self, state):
STATES = [
'done',
'error',
'init',
'old',
'ready',
'running',
'killed'
]
assert state in STATES
self.state = state
log.info("%s is in state %s" % (self, self.state))
self.write_status()
def crank(self):
#log.debug("cranking %s (state %s)" % (self, self.state))
if self.state == 'init':
self.read_jobfile()
elif self.state == 'ready':
self.run()
elif self.state == 'running':
self.check_if_done()
def to_json(self):
me = {
'state': self.state,
'hostname': self.parent.hostname,
'jobdir': self.jobdir,
'shortname': self.shortname,
'uid': self.uid,
}
if self.error:
me['error'] = self.error
if self.jobruntime:
me['runtime'] = self.jobruntime
return me
def check_if_done(self):
if self.state != 'running':
return
self.subprocess.poll()
if self.subprocess.returncode is None:
rt = (datetime.utcnow() - self.jobstart).total_seconds()
if rt > self.runtimecap:
self.subprocess.terminate()
sleep(10)
self.subprocess.kill()
self.set_state('killed')
return
self.jobruntime = (datetime.utcnow() - self.jobstart).total_seconds()
self.set_state('done')
self.stdoutfd.close()
self.stderrfd.close()
def run(self):
self.set_state('running')
self.stdoutfd = open(self.jobstdoutfile, 'w')
self.stderrfd = open(self.jobstderrfile, 'w')
# set up the command and args
args = []
args.append(self.parent.dockerpath)
args.append('run')
# add the job dir as a volume into the container at /job
args.append('-v')
args.append(self.jobdir + ':/job')
args.append(self.jobdesc['run']['image'])
args.extend(list(self.jobdesc['run']['command']))
log.info("running job %s: %s" % (self.uid, json.dumps(args)))
self.jobstart = datetime.utcnow()
self.subprocess = subprocess.Popen(args,stdin=None, stdout=self.stdoutfd, stderr=self.stderrfd)
log.info("job %s spawned as pid %d" % (self.uid, self.subprocess.pid))