#!/usr/bin/env python3 #234567891123456789212345678931234567894123456789512345678961234567897123456789 # encoding: utf-8 from datetime import datetime, tzinfo, timedelta from sanelogging import log from time import sleep import json import os import pathlib import socket import subprocess import uuid from .utils import * class STRpcJob(object): def __init__(self,rpcserver,newjobdir): self.uid = str(uuid.uuid4()) self.parent = rpcserver self.jobdir = newjobdir self.shortname = os.path.basename(newjobdir) if not os.path.isdir(self.jobdir): raise Exception("job directory not found") self.statusfile = self.jobdir + '/' + 'status.json' self.jobfile = self.jobdir + '/' + 'job.json' self.jobstdoutfile = self.jobdir + '/' + 'stdout.txt' self.jobstderrfile = self.jobdir + '/' + 'stderr.txt' self.stderrfd = None self.stdoutfd = None self.jobstart = None self.jobruntime = None self.jobdesc = None self.runtimecap = 0 self.state = None self.error = None self.subprocess = None if os.path.exists(self.statusfile): # job was known to previous version x = read_json_file(self.statusfile) self.uid = x['uid'] self.shortname = x['shortname'] if x['state'] not in ['init', 'ready']: self.set_state('old') else: self.set_state(x['state']) else: self.set_state('init') self.crank() def write_template_jobfile(self, path): thisdir = os.path.dirname(os.path.realpath(__file__)) template = read_json_file(thisdir + '/' + 'job-template.json') write_json_file(path, template) def __str__(self): return "" % (self.uid, self.shortname) def write_status(self): write_file(self.statusfile, json_pretty(self.to_json())) def read_jobfile(self): jobdesc = None try: jobdesc = read_json_file(self.jobfile) except(FileNotFoundError): jobfile_template_filename = self.jobdir + '/' + 'job-template.json' if not os.path.exists(jobfile_template_filename): self.write_template_jobfile(jobfile_template_filename) return # FIXME validate jobdesc before changing state or updating job-ject self.jobdesc = jobdesc self.runtimecap = int(jobdesc['run'].get('max_runtime_secs',86400)) self.set_state('ready') def set_state(self, state): STATES = [ 'done', 'error', 'init', 'old', 'ready', 'running', 'killed' ] assert state in STATES self.state = state log.info("%s is in state %s" % (self, self.state)) self.write_status() def crank(self): #log.debug("cranking %s (state %s)" % (self, self.state)) if self.state == 'init': self.read_jobfile() elif self.state == 'ready': self.run() elif self.state == 'running': self.check_if_done() def to_json(self): me = { 'state': self.state, 'hostname': self.parent.hostname, 'jobdir': self.jobdir, 'shortname': self.shortname, 'uid': self.uid, } if self.error: me['error'] = self.error if self.jobruntime: me['runtime'] = self.jobruntime return me def check_if_done(self): if self.state != 'running': return self.subprocess.poll() if self.subprocess.returncode is None: rt = (datetime.utcnow() - self.jobstart).total_seconds() if rt > self.runtimecap: self.subprocess.terminate() sleep(10) self.subprocess.kill() self.set_state('killed') return self.jobruntime = (datetime.utcnow() - self.jobstart).total_seconds() self.set_state('done') self.stdoutfd.close() self.stderrfd.close() def run(self): self.set_state('running') self.stdoutfd = open(self.jobstdoutfile, 'w') self.stderrfd = open(self.jobstderrfile, 'w') # set up the command and args args = [] args.append(self.parent.dockerpath) args.append('run') # add the job dir as a volume into the container at /job args.append('-v') args.append(self.jobdir + ':/job') args.append(self.jobdesc['run']['image']) args.extend(list(self.jobdesc['run']['command'])) log.info("running job %s: %s" % (self.uid, json.dumps(args))) self.jobstart = datetime.utcnow() self.subprocess = subprocess.Popen(args,stdin=None, stdout=self.stdoutfd, stderr=self.stderrfd) log.info("job %s spawned as pid %d" % (self.uid, self.subprocess.pid))