diff --git a/bin/devserver b/bin/devserver index 3e43000..7acef64 100755 --- a/bin/devserver +++ b/bin/devserver @@ -11,5 +11,5 @@ cd "$UPDIR" if [[ ! -d "$STRPC_BASE" ]]; then mkdir -p "$STRPC_BASE" fi -pipenv run bin/strpc +pipenv run bin/strpcd diff --git a/bin/prodserver b/bin/prodserver new file mode 100755 index 0000000..ad33859 --- /dev/null +++ b/bin/prodserver @@ -0,0 +1,16 @@ +#!/bin/bash + +# NOTE! +# when you mount the docker socket into the docker container +# this is equivalent to giving it root on the outside host. +# the jobs run via this tool should not be able to exploit this, +# but it's possible that they can, so don't assume that jobs execute +# in an isolated security context. + +docker run \ + --name strpcd \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /storage/strpc:/rpc \ + --restart always \ + -d \ + sneak/strpcd diff --git a/strpc/__init__.py b/strpc/__init__.py index 52dd2e3..f4f6c36 100755 --- a/strpc/__init__.py +++ b/strpc/__init__.py @@ -2,4 +2,4 @@ #234567891123456789212345678931234567894123456789512345678961234567897123456789 # encoding: utf-8 -from .strpc import STRpcRunnerService +from .rpc import STRpcRunnerService diff --git a/strpc/job.py b/strpc/job.py new file mode 100755 index 0000000..25adfca --- /dev/null +++ b/strpc/job.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +#234567891123456789212345678931234567894123456789512345678961234567897123456789 +# encoding: utf-8 + +from datetime import datetime, tzinfo, timedelta +from sanelogging import log +from time import sleep +import json +import os +import pathlib +import socket +import subprocess +import uuid +from .utils import * + +class STRpcJob(object): + def __init__(self,rpcserver,newjobdir): + self.uid = str(uuid.uuid4()) + self.parent = rpcserver + self.jobdir = newjobdir + self.shortname = os.path.basename(newjobdir) + if not os.path.isdir(self.jobdir): + raise Exception("job directory not found") + self.statusfile = self.jobdir + '/' + 'status.json' + self.jobfile = self.jobdir + '/' + 'job.json' + self.jobstdoutfile = self.jobdir + '/' + 'stdout.txt' + self.jobstderrfile = self.jobdir + '/' + 'stderr.txt' + self.stderrfd = None + self.stdoutfd = None + self.jobstart = None + self.jobruntime = None + self.jobdesc = None + self.runtimecap = 0 + self.state = None + self.error = None + self.subprocess = None + + if os.path.exists(self.statusfile): + # job was known to previous version + x = read_json_file(self.statusfile) + self.uid = x['uid'] + self.shortname = x['shortname'] + if x['state'] not in ['init', 'ready']: + self.set_state('old') + else: + self.set_state(x['state']) + else: + self.set_state('init') + + self.crank() + + def write_template_jobfile(self, path): + thisdir = os.path.dirname(os.path.realpath(__file__)) + template = read_json_file(thisdir + '/' + 'job-template.json') + write_json_file(path, template) + + def __str__(self): + return "" % (self.uid, self.shortname) + + def write_status(self): + write_file(self.statusfile, json_pretty(self.to_json())) + + def read_jobfile(self): + jobdesc = None + try: + jobdesc = read_json_file(self.jobfile) + except(FileNotFoundError): + jobfile_template_filename = self.jobdir + '/' + 'job-template.json' + if not os.path.exists(jobfile_template_filename): + self.write_template_jobfile(jobfile_template_filename) + return + # FIXME validate jobdesc before changing state or updating job-ject + self.jobdesc = jobdesc + self.runtimecap = int(jobdesc['run'].get('max_runtime_secs',86400)) + self.set_state('ready') + + def set_state(self, state): + STATES = [ + 'done', + 'error', + 'init', + 'old', + 'ready', + 'running', + 'killed' + ] + assert state in STATES + self.state = state + log.info("%s is in state %s" % (self, self.state)) + self.write_status() + + def crank(self): + #log.debug("cranking %s (state %s)" % (self, self.state)) + if self.state == 'init': + self.read_jobfile() + elif self.state == 'ready': + self.run() + elif self.state == 'running': + self.check_if_done() + + def to_json(self): + me = { + 'state': self.state, + 'hostname': self.parent.hostname, + 'jobdir': self.jobdir, + 'shortname': self.shortname, + 'uid': self.uid, + } + if self.error: + me['error'] = self.error + if self.jobruntime: + me['runtime'] = self.jobruntime + return me + + def check_if_done(self): + if self.state != 'running': + return + self.subprocess.poll() + if self.subprocess.returncode is None: + rt = (datetime.utcnow() - self.jobstart).total_seconds() + if rt > self.runtimecap: + self.subprocess.terminate() + sleep(10) + self.subprocess.kill() + self.set_state('killed') + return + self.jobruntime = (datetime.utcnow() - self.jobstart).total_seconds() + self.set_state('done') + self.stdoutfd.close() + self.stderrfd.close() + + def run(self): + self.set_state('running') + self.stdoutfd = open(self.jobstdoutfile, 'w') + self.stderrfd = open(self.jobstderrfile, 'w') + + # set up the command and args + args = [] + args.append(self.parent.dockerpath) + args.append('run') + + # add the job dir as a volume into the container at /job + args.append('-v') + args.append(self.jobdir + ':/job') + + args.append(self.jobdesc['run']['image']) + + args.extend(list(self.jobdesc['run']['command'])) + + log.info("running job %s: %s" % (self.uid, json.dumps(args))) + self.jobstart = datetime.utcnow() + self.subprocess = subprocess.Popen(args,stdin=None, stdout=self.stdoutfd, stderr=self.stderrfd) + log.info("job %s spawned as pid %d" % (self.uid, self.subprocess.pid)) diff --git a/strpc/rpc.py b/strpc/rpc.py new file mode 100755 index 0000000..5038be2 --- /dev/null +++ b/strpc/rpc.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +#234567891123456789212345678931234567894123456789512345678961234567897123456789 +# encoding: utf-8 + +from datetime import datetime, tzinfo, timedelta +from sanelogging import log +from time import sleep +import json +import os +import pathlib +import socket +import subprocess +import uuid +from .job import STRpcJob +from .utils import * + +class STRpcRunnerService(object): + + def __init__(self): + self.dir = os.environ.get('STRPC_BASE','/var/run/strpc') + self.hostname = socket.gethostname() + self.hostdir = self.dir + '/' + self.hostname + self.logdir = self.hostdir + '/logs' + self.statusfile = self.hostdir + '/status.json' + self.jobdir = self.hostdir + '/jobs' + self.jobs = [] + self.running = True + self.dockerpath = None + + log.info("strpcd starting up on %s" % self.hostname) + + if not os.path.isdir(self.dir): + raise Exception("STRPC base directory not found") + self.setup() + + def setup(self): + pathlib.Path(self.jobdir).mkdir(parents=True, exist_ok=True) + pathlib.Path(self.logdir).mkdir(parents=True, exist_ok=True) + self.write_status_file() + + self.dockerpath = subprocess.check_output("which docker", shell=True).decode("utf-8").strip() + log.info("found docker at %s" % self.dockerpath) + + def write_status_file(self): + log.info("writing out server status") + write_json_file(self.statusfile, { + 'started': isodatetime(), + 'hostname': self.hostname, + 'jobs': [ job.to_json() for job in self.jobs ], + }) + + def start(self): + os.chdir(self.dir) + self.run_forever() + + def scan_jobsdir(self, jobsdir): + existing_jobdirs = [ job.jobdir for job in self.jobs ] + for newjobdir in dirs_in_dir(jobsdir): + if newjobdir not in existing_jobdirs: + log.info("found new job: " + os.path.basename(newjobdir)) + self.jobs.append(STRpcJob(self,newjobdir)) + self.write_status_file() + + def run_jobs(self): + for job in self.jobs: + if job.ready_to_run(): + job.run() + + def run_forever(self): + STATUS_INTERVAL = 60 + LOOP_INTERVAL = 0.1 + STATUS_ITERATIONS = STATUS_INTERVAL / LOOP_INTERVAL + i = 0 + + while self.running: + i = i + 1 + if i >= STATUS_ITERATIONS: + i = 0 + self.write_status_file() + self.scan_jobsdir(self.jobdir) + for job in self.jobs: + job.crank() + sleep(LOOP_INTERVAL) diff --git a/strpc/strpc.py b/strpc/strpc.py deleted file mode 100755 index 1b4d1d8..0000000 --- a/strpc/strpc.py +++ /dev/null @@ -1,214 +0,0 @@ -#!/usr/bin/env python3 -#234567891123456789212345678931234567894123456789512345678961234567897123456789 -# encoding: utf-8 - -from datetime import datetime, tzinfo, timedelta -from sanelogging import log -from time import sleep -import json -import os -import pathlib -import socket -import subprocess -import uuid - -def dirs_in_dir(path): - assert os.path.isdir(path) - items = os.listdir(path) - output = [] - for fn in items: - if os.path.isdir(path + '/' + fn): - output.append(path + '/' + fn) - return output - -class simple_utc(tzinfo): - def tzname(self,**kwargs): - return "UTC" - def utcoffset(self, dt): - return timedelta(0) - -def isodatetime(): - return str( - datetime.utcnow().replace(tzinfo=simple_utc()).isoformat() - ).replace('+00:00', 'Z') - -def read_file(path): - with open(path) as f: - data = f.read() - return data - -# FIXME is there some auto reading-and-writing JSONFile class out there -def read_json_file(path): - return json.loads(read_file(path)) - -def json_pretty(obj): - return json.dumps(obj, sort_keys=True, indent=4, separators=(',', ': ')) - -def write_json_file(path, obj): - write_file(path, json_pretty(obj)) - -def write_file(path,content): - with open(path, 'w') as file: - file.write(content) - -class STRpcJob(object): - def __init__(self,rpcserver,newjobdir): - self.uid = str(uuid.uuid4()) - self.parent = rpcserver - self.jobdir = newjobdir - self.shortname = os.path.basename(newjobdir) - if not os.path.isdir(self.jobdir): - raise Exception("job directory not found") - self.statusfile = self.jobdir + '/' + 'status.json' - self.jobfile = self.jobdir + '/' + 'job.json' - self.jobdesc = None - self.state = None - self.error = None - - if os.path.exists(self.statusfile): - # job was known to previous version - x = read_json_file(self.statusfile) - self.uid = x['uid'] - self.shortname = x['shortname'] - if x['state'] not in ['init', 'ready']: - self.set_state('old') - else: - self.set_state('init') - - self.crank() - - def write_template_jobfile(self, path): - thisdir = os.path.dirname(os.path.realpath(__file__)) - template = read_json_file(thisdir + '/' + 'job-template.json') - write_json_file(path, template) - - def __str__(self): - return "" % (self.uid, self.shortname) - - def write_status(self): - write_file(self.statusfile, json_pretty(self.to_json())) - - def read_jobfile(self): - jobdesc = None - try: - jobdesc = read_json_file(self.jobfile) - except(FileNotFoundError): - jobfile_template_filename = self.jobdir + '/' + 'job-template.json' - if not os.path.exists(jobfile_template_filename): - self.write_template_jobfile(jobfile_template_filename) - return - # FIXME validate jobdesc before changing state or updating job-ject - self.jobdesc = jobdesc - self.set_state('ready') - - def set_state(self, state): - STATES = [ - 'done', - 'error', - 'init', - 'old', - 'ready', - 'running', - ] - assert state in STATES - self.state = state - log.info("%s is in state %s" % (self, self.state)) - self.write_status() - - def crank(self): - if self.state == 'init': - self.read_jobfile() - elif self.state == 'ready': - self.run() - - def to_json(self): - me = { - 'state': self.state, - 'hostname': self.parent.hostname, - 'jobdir': self.jobdir, - 'shortname': self.shortname, - 'uid': self.uid, - } - if self.error: - me['error'] = self.error - return me - - def ready_to_run(self): - if self.state == 'ready': - return True - return False - - def run(self): - self.set_state('running') - log.die("run job here") - - -class STRpcRunnerService(object): - - - def __init__(self): - self.dir = os.environ.get('STRPC_BASE','/var/run/strpc') - self.hostname = socket.gethostname() - self.hostdir = self.dir + '/' + self.hostname - self.logdir = self.hostdir + '/logs' - self.statusfile = self.hostdir + '/status.json' - self.jobdir = self.hostdir + '/jobs' - self.jobs = [] - self.running = True - if not os.path.isdir(self.dir): - raise Exception("STRPC base directory not found") - self.setup() - - def setup(self): - pathlib.Path(self.jobdir).mkdir(parents=True, exist_ok=True) - pathlib.Path(self.logdir).mkdir(parents=True, exist_ok=True) - self.write_status_file() - - def write_status_file(self): - log.info("writing out server status") - write_json_file(self.statusfile, { - 'started': isodatetime(), - 'hostname': self.hostname, - 'jobs': [ job.to_json() for job in self.jobs ], - }) - - def start(self): - log.info("strpcrunner starting up on %s" % self.hostname) - os.chdir(self.dir) - self.run_forever() - - def scan_jobsdir(self, jobsdir): - existing_jobdirs = [ job.jobdir for job in self.jobs ] - for newjobdir in dirs_in_dir(jobsdir): - if newjobdir not in existing_jobdirs: - log.info("found new job: " + os.path.basename(newjobdir)) - self.jobs.append(STRpcJob(self,newjobdir)) - self.write_status_file() - - def run_jobs(self): - for job in self.jobs: - if job.ready_to_run(): - job.run() - - def run_forever(self): - STATUS_INTERVAL = 60 - LOOP_INTERVAL = 0.5 - STATUS_ITERATIONS = STATUS_INTERVAL / LOOP_INTERVAL - i = 0 - - while self.running: - i = i + 1 - if i >= STATUS_ITERATIONS: - i = 0 - self.write_status_file() - self.scan_jobsdir(self.jobdir) - for job in self.jobs: - job.crank() - sleep(LOOP_INTERVAL) - -def main(): - s = STRpcRunnerService() - s.start() - -if __name__ == "__main__": - main() diff --git a/strpc/utils.py b/strpc/utils.py new file mode 100755 index 0000000..d4c5ede --- /dev/null +++ b/strpc/utils.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 +#234567891123456789212345678931234567894123456789512345678961234567897123456789 +# encoding: utf-8 + +from datetime import datetime, tzinfo, timedelta +from sanelogging import log +from time import sleep +import json +import os +import pathlib +import socket +import subprocess +import uuid + +def dirs_in_dir(path): + assert os.path.isdir(path) + items = os.listdir(path) + output = [] + for fn in items: + if os.path.isdir(path + '/' + fn): + output.append(path + '/' + fn) + return output + +class simple_utc(tzinfo): + def tzname(self,**kwargs): + return "UTC" + def utcoffset(self, dt): + return timedelta(0) + +def isodatetime(): + return str( + datetime.utcnow().replace(tzinfo=simple_utc()).isoformat() + ).replace('+00:00', 'Z') + +def read_file(path): + with open(path) as f: + data = f.read() + return data + +# FIXME is there some auto reading-and-writing JSONFile class out there +def read_json_file(path): + return json.loads(read_file(path)) + +def json_pretty(obj): + return json.dumps(obj, sort_keys=True, indent=4, separators=(',', ': ')) + +def write_json_file(path, obj): + write_file(path, json_pretty(obj)) + +def write_file(path,content): + with open(path, 'w') as file: + file.write(content)