strpc/strpc/rpc.py

84 lines
2.6 KiB
Python
Executable File

#!/usr/bin/env python3
#234567891123456789212345678931234567894123456789512345678961234567897123456789
# encoding: utf-8
from datetime import datetime, tzinfo, timedelta
from sanelogging import log
from time import sleep
import json
import os
import pathlib
import socket
import subprocess
import uuid
from .job import STRpcJob
from .utils import *
class STRpcRunnerService(object):
def __init__(self):
self.dir = os.environ.get('STRPC_BASE','/var/run/strpc')
self.hostname = socket.gethostname()
self.hostdir = self.dir + '/' + self.hostname
self.logdir = self.hostdir + '/logs'
self.statusfile = self.hostdir + '/status.json'
self.jobdir = self.hostdir + '/jobs'
self.jobs = []
self.running = True
self.dockerpath = None
log.info("strpcd starting up on %s" % self.hostname)
if not os.path.isdir(self.dir):
raise Exception("STRPC base directory not found")
self.setup()
def setup(self):
pathlib.Path(self.jobdir).mkdir(parents=True, exist_ok=True)
pathlib.Path(self.logdir).mkdir(parents=True, exist_ok=True)
self.write_status_file()
self.dockerpath = subprocess.check_output("which docker", shell=True).decode("utf-8").strip()
log.info("found docker at %s" % self.dockerpath)
def write_status_file(self):
log.info("writing out server status")
write_json_file(self.statusfile, {
'started': isodatetime(),
'hostname': self.hostname,
'jobs': [ job.to_json() for job in self.jobs ],
})
def start(self):
os.chdir(self.dir)
self.run_forever()
def scan_jobsdir(self, jobsdir):
existing_jobdirs = [ job.jobdir for job in self.jobs ]
for newjobdir in dirs_in_dir(jobsdir):
if newjobdir not in existing_jobdirs:
log.info("found new job: " + os.path.basename(newjobdir))
self.jobs.append(STRpcJob(self,newjobdir))
self.write_status_file()
def run_jobs(self):
for job in self.jobs:
if job.ready_to_run():
job.run()
def run_forever(self):
STATUS_INTERVAL = 60
LOOP_INTERVAL = 0.1
STATUS_ITERATIONS = STATUS_INTERVAL / LOOP_INTERVAL
i = 0
while self.running:
i = i + 1
if i >= STATUS_ITERATIONS:
i = 0
self.write_status_file()
self.scan_jobsdir(self.jobdir)
for job in self.jobs:
job.crank()
sleep(LOOP_INTERVAL)