From dfdd42f006658cf4a73f685882101ab2e43d5145 Mon Sep 17 00:00:00 2001 From: sneak Date: Tue, 6 Aug 2019 18:36:35 +0000 Subject: [PATCH] sort of cool now --- .gitignore | 1 + Makefile | 4 + Pipfile | 12 +++ Pipfile.lock | 35 +++++++ bin/devserver | 15 +++ bin/strpc | 12 +++ strpc/__init__.py | 5 + strpc/job-template.json | 10 ++ strpc/strpc.py | 214 ++++++++++++++++++++++++++++++++++++++++ 9 files changed, 308 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 Pipfile create mode 100644 Pipfile.lock create mode 100755 bin/devserver create mode 100755 bin/strpc create mode 100755 strpc/__init__.py create mode 100644 strpc/job-template.json create mode 100755 strpc/strpc.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e4432c6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +devserverroot/ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..52921d9 --- /dev/null +++ b/Makefile @@ -0,0 +1,4 @@ +default: devserver + +devserver: + /bin/bash bin/devserver diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..5ff048d --- /dev/null +++ b/Pipfile @@ -0,0 +1,12 @@ +[[source]] +name = "pypi" +url = "https://pypi.org/simple" +verify_ssl = true + +[dev-packages] + +[packages] +sanelogging = "*" + +[requires] +python_version = "3.6" diff --git a/Pipfile.lock b/Pipfile.lock new file mode 100644 index 0000000..3b9689b --- /dev/null +++ b/Pipfile.lock @@ -0,0 +1,35 @@ +{ + "_meta": { + "hash": { + "sha256": "d189f59cdb9acb7b516c87ead0bf88b4ebc5d7057caf2f46d4cb25e25b317035" + }, + "pipfile-spec": 6, + "requires": { + "python_version": "3.6" + }, + "sources": [ + { + "name": "pypi", + "url": "https://pypi.org/simple", + "verify_ssl": true + } + ] + }, + "default": { + "colorlog": { + "hashes": [ + "sha256:3cf31b25cbc8f86ec01fef582ef3b840950dea414084ed19ab922c8b493f9b42", + "sha256:450f52ea2a2b6ebb308f034ea9a9b15cea51e65650593dca1da3eb792e4e4981" + ], + "version": "==4.0.2" + }, + "sanelogging": { + "hashes": [ + "sha256:ae018de50f00ace45b34dbade66b511651cb1758a7c26e262841651b2dd6f2cf" + ], + "index": "pypi", + "version": "==1.0.1" + } + }, + "develop": {} +} diff --git a/bin/devserver b/bin/devserver new file mode 100755 index 0000000..3e43000 --- /dev/null +++ b/bin/devserver @@ -0,0 +1,15 @@ +#!/bin/bash + +set -x + +THISDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd -P)" +UPDIR="$( cd "$( dirname "$THISDIR" )" >/dev/null 2>&1 && pwd -P)" +export STRPC_BASE="$UPDIR/devserverroot" +export PYTHONPATH="$UPDIR" + +cd "$UPDIR" +if [[ ! -d "$STRPC_BASE" ]]; then + mkdir -p "$STRPC_BASE" +fi +pipenv run bin/strpc + diff --git a/bin/strpc b/bin/strpc new file mode 100755 index 0000000..fcb5a92 --- /dev/null +++ b/bin/strpc @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 +#234567891123456789212345678931234567894123456789512345678961234567897123456789 +# encoding: utf-8 + +from strpc import STRpcRunnerService + +def main(): + s = STRpcRunnerService() + s.start() + +if __name__ == "__main__": + main() diff --git a/strpc/__init__.py b/strpc/__init__.py new file mode 100755 index 0000000..52dd2e3 --- /dev/null +++ b/strpc/__init__.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +#234567891123456789212345678931234567894123456789512345678961234567897123456789 +# encoding: utf-8 + +from .strpc import STRpcRunnerService diff --git a/strpc/job-template.json b/strpc/job-template.json new file mode 100644 index 0000000..307a6f7 --- /dev/null +++ b/strpc/job-template.json @@ -0,0 +1,10 @@ +{ + "berlin.sneak.type": "strpcjob", + "ready": false, + "run": { + "type": "docker", + "image": "ubuntu:18.04", + "command": "/bin/bash -c 'cat /proc/cpuinfo'" + } +} + diff --git a/strpc/strpc.py b/strpc/strpc.py new file mode 100755 index 0000000..1b4d1d8 --- /dev/null +++ b/strpc/strpc.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +#234567891123456789212345678931234567894123456789512345678961234567897123456789 +# encoding: utf-8 + +from datetime import datetime, tzinfo, timedelta +from sanelogging import log +from time import sleep +import json +import os +import pathlib +import socket +import subprocess +import uuid + +def dirs_in_dir(path): + assert os.path.isdir(path) + items = os.listdir(path) + output = [] + for fn in items: + if os.path.isdir(path + '/' + fn): + output.append(path + '/' + fn) + return output + +class simple_utc(tzinfo): + def tzname(self,**kwargs): + return "UTC" + def utcoffset(self, dt): + return timedelta(0) + +def isodatetime(): + return str( + datetime.utcnow().replace(tzinfo=simple_utc()).isoformat() + ).replace('+00:00', 'Z') + +def read_file(path): + with open(path) as f: + data = f.read() + return data + +# FIXME is there some auto reading-and-writing JSONFile class out there +def read_json_file(path): + return json.loads(read_file(path)) + +def json_pretty(obj): + return json.dumps(obj, sort_keys=True, indent=4, separators=(',', ': ')) + +def write_json_file(path, obj): + write_file(path, json_pretty(obj)) + +def write_file(path,content): + with open(path, 'w') as file: + file.write(content) + +class STRpcJob(object): + def __init__(self,rpcserver,newjobdir): + self.uid = str(uuid.uuid4()) + self.parent = rpcserver + self.jobdir = newjobdir + self.shortname = os.path.basename(newjobdir) + if not os.path.isdir(self.jobdir): + raise Exception("job directory not found") + self.statusfile = self.jobdir + '/' + 'status.json' + self.jobfile = self.jobdir + '/' + 'job.json' + self.jobdesc = None + self.state = None + self.error = None + + if os.path.exists(self.statusfile): + # job was known to previous version + x = read_json_file(self.statusfile) + self.uid = x['uid'] + self.shortname = x['shortname'] + if x['state'] not in ['init', 'ready']: + self.set_state('old') + else: + self.set_state('init') + + self.crank() + + def write_template_jobfile(self, path): + thisdir = os.path.dirname(os.path.realpath(__file__)) + template = read_json_file(thisdir + '/' + 'job-template.json') + write_json_file(path, template) + + def __str__(self): + return "" % (self.uid, self.shortname) + + def write_status(self): + write_file(self.statusfile, json_pretty(self.to_json())) + + def read_jobfile(self): + jobdesc = None + try: + jobdesc = read_json_file(self.jobfile) + except(FileNotFoundError): + jobfile_template_filename = self.jobdir + '/' + 'job-template.json' + if not os.path.exists(jobfile_template_filename): + self.write_template_jobfile(jobfile_template_filename) + return + # FIXME validate jobdesc before changing state or updating job-ject + self.jobdesc = jobdesc + self.set_state('ready') + + def set_state(self, state): + STATES = [ + 'done', + 'error', + 'init', + 'old', + 'ready', + 'running', + ] + assert state in STATES + self.state = state + log.info("%s is in state %s" % (self, self.state)) + self.write_status() + + def crank(self): + if self.state == 'init': + self.read_jobfile() + elif self.state == 'ready': + self.run() + + def to_json(self): + me = { + 'state': self.state, + 'hostname': self.parent.hostname, + 'jobdir': self.jobdir, + 'shortname': self.shortname, + 'uid': self.uid, + } + if self.error: + me['error'] = self.error + return me + + def ready_to_run(self): + if self.state == 'ready': + return True + return False + + def run(self): + self.set_state('running') + log.die("run job here") + + +class STRpcRunnerService(object): + + + def __init__(self): + self.dir = os.environ.get('STRPC_BASE','/var/run/strpc') + self.hostname = socket.gethostname() + self.hostdir = self.dir + '/' + self.hostname + self.logdir = self.hostdir + '/logs' + self.statusfile = self.hostdir + '/status.json' + self.jobdir = self.hostdir + '/jobs' + self.jobs = [] + self.running = True + if not os.path.isdir(self.dir): + raise Exception("STRPC base directory not found") + self.setup() + + def setup(self): + pathlib.Path(self.jobdir).mkdir(parents=True, exist_ok=True) + pathlib.Path(self.logdir).mkdir(parents=True, exist_ok=True) + self.write_status_file() + + def write_status_file(self): + log.info("writing out server status") + write_json_file(self.statusfile, { + 'started': isodatetime(), + 'hostname': self.hostname, + 'jobs': [ job.to_json() for job in self.jobs ], + }) + + def start(self): + log.info("strpcrunner starting up on %s" % self.hostname) + os.chdir(self.dir) + self.run_forever() + + def scan_jobsdir(self, jobsdir): + existing_jobdirs = [ job.jobdir for job in self.jobs ] + for newjobdir in dirs_in_dir(jobsdir): + if newjobdir not in existing_jobdirs: + log.info("found new job: " + os.path.basename(newjobdir)) + self.jobs.append(STRpcJob(self,newjobdir)) + self.write_status_file() + + def run_jobs(self): + for job in self.jobs: + if job.ready_to_run(): + job.run() + + def run_forever(self): + STATUS_INTERVAL = 60 + LOOP_INTERVAL = 0.5 + STATUS_ITERATIONS = STATUS_INTERVAL / LOOP_INTERVAL + i = 0 + + while self.running: + i = i + 1 + if i >= STATUS_ITERATIONS: + i = 0 + self.write_status_file() + self.scan_jobsdir(self.jobdir) + for job in self.jobs: + job.crank() + sleep(LOOP_INTERVAL) + +def main(): + s = STRpcRunnerService() + s.start() + +if __name__ == "__main__": + main()