strpc/strpc/server.py

93 lines
3.0 KiB
Python
Executable File

#!/usr/bin/env python3
#234567891123456789212345678931234567894123456789512345678961234567897123456789
# encoding: utf-8
from .job import STRpcJob
from .utils import *
from datetime import datetime, tzinfo, timedelta
from sanelogging import log
from time import sleep
import json
import os
import pathlib
import sentry_sdk
import socket
import subprocess
import uuid
class STRpcRunnerService(object):
def __init__(self):
sentry_sdk.init("https://598ba2b22ae947328590765e31e29a82@sentry.io/1525291")
self.dir = os.environ.get('STRPC_BASE','/var/run/strpc')
self.hostname = socket.gethostname()
self.hostdir = self.dir + '/' + self.hostname
self.logdir = self.hostdir + '/logs'
self.statusfile = self.hostdir + '/status.json'
self.jobdir = self.hostdir + '/jobs'
self.jobs = []
self.running = True
self.dockerpath = None
log.info("strpcd starting up on %s out of directory %s" % (
self.hostname, self.dir)
)
if not os.path.isdir(self.dir):
raise Exception("STRPC base directory not found")
self.setup()
def setup(self):
log.info("creating dir %s if not exist" % self.hostdir)
pathlib.Path(self.hostdir).mkdir(parents=True, exist_ok=True)
log.info("creating dir %s if not exist" % self.jobdir)
pathlib.Path(self.jobdir).mkdir(parents=True, exist_ok=True)
log.info("creating dir %s if not exist" % self.logdir)
pathlib.Path(self.logdir).mkdir(parents=True, exist_ok=True)
self.write_status_file()
self.dockerpath = subprocess.check_output("which docker", shell=True).decode("utf-8").strip()
log.info("found docker at %s" % self.dockerpath)
def write_status_file(self):
log.info("writing out server status")
write_json_file(self.statusfile, {
'started': isodatetime(),
'hostname': self.hostname,
'jobs': [ job.to_json() for job in self.jobs ],
})
def start(self):
os.chdir(self.dir)
self.run_forever()
def scan_jobsdir(self, jobsdir):
existing_jobdirs = [ job.jobdir for job in self.jobs ]
for newjobdir in dirs_in_dir(jobsdir):
if newjobdir not in existing_jobdirs:
log.info("found new job: " + os.path.basename(newjobdir))
self.jobs.append(STRpcJob(self,newjobdir))
self.write_status_file()
def run_jobs(self):
for job in self.jobs:
if job.ready_to_run():
job.run()
def run_forever(self):
STATUS_INTERVAL = 60
LOOP_INTERVAL = 0.1
STATUS_ITERATIONS = STATUS_INTERVAL / LOOP_INTERVAL
i = 0
while self.running:
i = i + 1
if i >= STATUS_ITERATIONS:
i = 0
self.write_status_file()
self.scan_jobsdir(self.jobdir)
for job in self.jobs:
job.crank()
sleep(LOOP_INTERVAL)