1
0
mirror of https://github.com/mail-in-a-box/mailinabox.git synced 2026-03-05 15:57:23 +01:00

Initial commit of a log capture and reporting feature

This adds a new section to the admin panel called "Activity", that
supplies charts, graphs and details about messages entering and leaving
the host.

A new daemon captures details of system mail activity by monitoring
the /var/log/mail.log file, summarizing it into a sqllite database
that's kept in user-data.
This commit is contained in:
downtownallday
2021-01-11 18:02:07 -05:00
parent 73a2b72243
commit 2a0e50c8d4
108 changed files with 9027 additions and 6 deletions

View File

@@ -0,0 +1,2 @@
tests/
run.sh

View File

@@ -0,0 +1,300 @@
#!/usr/bin/env python3
from logs.TailFile import TailFile
from mail.InboundMailLogHandler import InboundMailLogHandler
from logs.ReadPositionStoreInFile import ReadPositionStoreInFile
from db.SqliteConnFactory import SqliteConnFactory
from db.SqliteEventStore import SqliteEventStore
from db.Pruner import Pruner
from util.env import load_env_vars_from_file
import time
import logging, logging.handlers
import json
import re
import os
import sys
import signal
log = logging.getLogger(__name__)
if os.path.exists('/etc/mailinabox.conf'):
env = load_env_vars_from_file('/etc/mailinabox.conf')
else:
env = { 'STORAGE_ROOT': os.environ.get('STORAGE_ROOT', os.getcwd()) }
CAPTURE_STORAGE_ROOT = os.path.join(env['STORAGE_ROOT'], 'reporting')
# default configuration, if not specified or
# CAPTURE_STORAGE_ROOT/config.json does not exist
config_default = {
'default_config': True,
'capture': True,
'prune_policy': {
'frequency_min': 2400,
'older_than_days': 30
},
'drop_disposition': {
'failed_login_attempt': True,
'suspected_scanner': False,
'reject': True
}
}
config_default_file = os.path.join(CAPTURE_STORAGE_ROOT, 'config.json')
# options
options = {
'daemon': True,
'log_level': logging.WARNING,
'log_file': "/var/log/mail.log",
'pos_file': "/var/lib/mailinabox/capture-pos.json",
'sqlite_file': os.path.join(CAPTURE_STORAGE_ROOT, 'capture.sqlite'),
'working_dir': "/var/run/mailinabox",
'config': config_default,
'_config_file': False, # absolute path if "config" was from a file
'_runtime_config_file': "runtime_config.json" # relative to working_dir
}
def read_config_file(file, throw=False):
try:
with open(file) as fp:
newconfig = json.loads(fp.read())
newconfig['from'] = { 'type':'file', 'location':file }
return newconfig
except FileNotFoundError as e:
if not throw:
return False
raise e
def write_config(tofile, config):
d = os.path.dirname(tofile)
if d and not os.path.exists(d):
os.mkdir(d, mode=0o770)
with open(tofile, "w") as fp:
fp.write(json.dumps(config))
def usage():
print('usage: %s [options]' % sys.argv[0])
sys.exit(1)
def process_cmdline(options):
argi = 1
marg = 0
while argi < len(sys.argv):
arg = sys.argv[argi]
have_next = ( argi+1 < len(sys.argv) )
if arg=='-d':
options['daemon'] = False
elif arg=='-loglevel' and have_next:
argi += 1
arg = sys.argv[argi].lower()
if arg=='info':
options['log_level'] = logging.INFO
elif arg=='warning':
options['log_level'] = logging.WARNING
elif arg=='debug':
options['log_level'] = logging.DEBUG
elif arg=='error':
options['log_level'] = logging.ERROR
else:
sys.stderr.write('unknown log level "%s"\n' % sys.argv[argi])
elif arg=='-config' and have_next:
argi += 1
arg = sys.argv[argi]
try:
if arg.startswith('{'):
options['config'] = json.loads(arg)
else:
newconfig = read_config_file(arg, throw=True)
options['config'] = newconfig
options['_config_file'] = arg
except Exception as e:
if options['daemon']: log.exception(e)
raise e
elif arg=='-logfile' and have_next:
argi+=1
options['log_file'] = sys.argv[argi]
elif arg=='-posfile' and have_next:
argi+=1
options['pos_file'] = sys.argv[argi]
elif arg=='-sqlitefile' and have_next:
argi+=1
options['sqlite_file'] = sys.argv[argi]
elif arg.startswith('-'):
usage()
else:
if marg==0:
options['log_file'] = arg
elif marg==1:
options['pos_file'] = arg
elif marg==2:
options['sqlite_file'] = arg
else:
usage()
marg += 1
argi += 1
def set_working_dir(working_dir):
try:
if not os.path.exists(working_dir):
os.mkdir(working_dir, mode=0o770)
os.chdir(working_dir)
except Exception as e:
log.exception(e)
raise e
def close_stdio():
sys.stdout.close()
sys.stderr.close()
sys.stdin.close()
# if config.json exists in the default location start with that
# instead of `config_default`. config can still be changed with the
# command line argument "-config"
newconfig = read_config_file(config_default_file, throw=False)
if newconfig:
options['config'] = newconfig
options['_config_file'] = config_default_file
# process command line
process_cmdline(options)
# init logging & set working directory
if options['daemon']:
logging.basicConfig(
level = options['log_level'],
handlers= [
logging.handlers.SysLogHandler(address='/dev/log')
],
format = 'miabldap/capture %(message)s'
)
log.warning('MIAB-LDAP capture/SEM daemon starting: wd=%s; log_file=%s; pos_file=%s; db=%s',
options['working_dir'],
options['log_file'],
options['pos_file'],
options['sqlite_file']
)
close_stdio()
set_working_dir(options['working_dir'])
else:
logging.basicConfig(level=options['log_level'])
log.info('starting: log_file=%s; pos_file=%s; db=%s',
options['log_file'],
options['pos_file'],
options['sqlite_file']
)
# save runtime config
write_config(options['_runtime_config_file'], options['config'])
# start modules
log.info('config: %s', options['config'])
try:
db_conn_factory = SqliteConnFactory(
options['sqlite_file']
)
event_store = SqliteEventStore(
db_conn_factory
)
position_store = ReadPositionStoreInFile(
options['pos_file']
)
mail_tail = TailFile(
options['log_file'],
position_store
)
inbound_mail_handler = InboundMailLogHandler(
event_store,
capture_enabled = options['config'].get('capture',True),
drop_disposition = options['config'].get('drop_disposition')
)
mail_tail.add_handler(inbound_mail_handler)
pruner = Pruner(
db_conn_factory,
policy=options['config']['prune_policy']
)
pruner.add_prunable(event_store)
except Exception as e:
if options['daemon']: log.exception(e)
raise e
# termination handler for graceful shutdowns
def terminate(sig, stack):
if sig == signal.SIGTERM:
log.warning("shutting down due to SIGTERM")
log.debug("stopping mail_tail")
mail_tail.stop()
log.debug("stopping pruner")
pruner.stop()
log.debug("stopping position_store")
position_store.stop()
log.debug("stopping event_store")
event_store.stop()
try:
os.remove(options['_runtime_config_file'])
except Exception:
pass
log.info("stopped")
# reload settings handler
def reload(sig, stack):
# if the default config (`config_default`) is in use, check to see
# if a default config.json (`config_default_file`) now exists, and
# if so, use that
if options['config'].get('default_config', False) and os.path.exists(config_default_file):
options['config']['default_config'] = False
options['_config_file'] = config_default_file
log.info('%s records are in-progress',
inbound_mail_handler.get_inprogress_count())
if options['_config_file']:
log.info('reloading %s', options['_config_file'])
try:
newconfig = read_config_file(options['_config_file'], throw=True)
pruner.set_policy(
newconfig['prune_policy']
)
inbound_mail_handler.set_capture_enabled(
newconfig.get('capture', True)
)
inbound_mail_handler.update_drop_disposition(
newconfig.get('drop_disposition', {})
)
write_config(options['_runtime_config_file'], newconfig)
except Exception as e:
if options['daemon']:
log.exception(e)
else:
raise e
signal.signal(signal.SIGTERM, terminate)
signal.signal(signal.SIGINT, terminate)
signal.signal(signal.SIGHUP, reload)
# monitor and capture
mail_tail.start()
mail_tail.join()

View File

@@ -0,0 +1,10 @@
class DatabaseConnectionFactory(object):
def connect(self):
raise NotImplementedError()
def close(self, conn):
raise NotImplementedError()

View File

@@ -0,0 +1,122 @@
# -*- indent-tabs-mode: t; tab-width: 4; python-indent-offset: 4; -*-
import threading
import queue
import logging
from .Prunable import Prunable
log = logging.getLogger(__name__)
'''subclass this and override:
write_rec()
read_rec()
to provide storage for event "records"
EventStore is thread safe and uses a single thread to write all
records.
'''
class EventStore(Prunable):
def __init__(self, db_conn_factory):
self.db_conn_factory = db_conn_factory
# we'll have a single thread do all the writing to the database
#self.queue = queue.SimpleQueue() # available in Python 3.7+
self.queue = queue.Queue()
self.interrupt = threading.Event()
self.rec_added = threading.Event()
self.have_event = threading.Event()
self.t = threading.Thread(
target=self._bg_writer,
name="EventStore",
daemon=True
)
self.max_queue_size = 100000
self.t.start()
def connect(self):
return self.db_conn_factory.connect()
def close(self, conn):
self.db_conn_factory.close(conn)
def write_rec(self, conn, type, rec):
'''write a "rec" of the given "type" to the database. The subclass
must know how to do that. "type" is a string identifier of the
subclass's choosing. Users of this class should call store()
and not this function, which will queue the request and a
thread managed by this class will call this function.
'''
raise NotImplementedError()
def read_rec(self, conn, type, args):
'''read from the database'''
raise NotImplementedError()
def prune(self, conn):
raise NotImplementedError()
def store(self, type, rec):
self.queue.put({
'type': type,
'rec': rec
})
self.rec_added.set()
self.have_event.set()
def stop(self):
self.interrupt.set()
self.have_event.set()
self.t.join()
def __del__(self):
log.debug('EventStore __del__')
self.interrupt.set()
self.have_event.set()
def _pop(self):
try:
return self.queue.get(block=False)
except queue.Empty:
return None
def _bg_writer(self):
log.debug('start EventStore thread')
conn = self.connect()
try:
while not self.interrupt.is_set() or not self.queue.empty():
item = self._pop()
if item:
try:
self.write_rec(conn, item['type'], item['rec'])
except Exception as e:
log.exception(e)
retry_count = item.get('retry_count', 0)
if self.interrupt.is_set():
log.warning('interrupted, dropping record: %s',item)
elif retry_count > 2:
log.warning('giving up after %s attempts, dropping record: %s', retry_count, item)
elif self.queue.qsize() >= self.max_queue_size:
log.warning('queue full, dropping record: %s', item)
else:
item['retry_count'] = retry_count + 1
self.queue.put(item)
# wait for another record to prevent immediate retry
if not self.interrupt.is_set():
self.have_event.wait()
self.rec_added.clear()
self.have_event.clear()
self.queue.task_done() # remove for SimpleQueue
else:
self.have_event.wait()
self.rec_added.clear()
self.have_event.clear()
finally:
self.close(conn)

View File

@@ -0,0 +1,7 @@
class Prunable(object):
def prune(self, conn, policy):
raise NotImplementedError()

View File

@@ -0,0 +1,79 @@
import threading
import logging
log = logging.getLogger(__name__)
class Pruner(object):
'''periodically calls the prune() method of registered Prunable
objects
'''
def __init__(self, db_conn_factory, policy={
'older_than_days': 7,
'frequency_min': 60
}):
self.db_conn_factory = db_conn_factory
self.policy = policy
self.prunables = []
self.interrupt = threading.Event()
self._new_thread()
self.t.start()
def _new_thread(self):
self.interrupt.clear()
self.t = threading.Thread(
target=self._bg_pruner,
name="Pruner",
daemon=True
)
def add_prunable(self, inst):
self.prunables.append(inst)
def set_policy(self, policy):
self.stop()
self.policy = policy
# a new thread object must be created or Python(<3.8?) throws
# RuntimeError("threads can only be started once")
self._new_thread()
self.t.start()
def stop(self, do_join=True):
self.interrupt.set()
if do_join:
self.t.join()
def connect(self):
return self.db_conn_factory.connect()
def close(self, conn):
self.db_conn_factory.close(conn)
def __del__(self):
self.stop(do_join=False)
def _bg_pruner(self):
conn = self.connect()
def do_prune():
for prunable in self.prunables:
if not self.interrupt.is_set():
try:
prunable.prune(conn, self.policy)
except Exception as e:
log.exception(e)
try:
# prune right-off
do_prune()
while not self.interrupt.is_set():
# wait until interrupted or it's time to prune
if self.interrupt.wait(self.policy['frequency_min'] * 60) is not True:
do_prune()
finally:
self.close(conn)

View File

@@ -0,0 +1,52 @@
import os, stat
import sqlite3
import logging
import threading
from .DatabaseConnectionFactory import DatabaseConnectionFactory
log = logging.getLogger(__name__)
class SqliteConnFactory(DatabaseConnectionFactory):
def __init__(self, db_path):
super(SqliteConnFactory, self).__init__()
log.debug('factory for %s', db_path)
self.db_path = db_path
self.db_basename = os.path.basename(db_path)
self.ensure_exists()
def ensure_exists(self):
# create the parent directory and set its permissions
parent = os.path.dirname(self.db_path)
if parent != '' and not os.path.exists(parent):
os.makedirs(parent)
os.chmod(parent,
stat.S_IRWXU |
stat.S_IRGRP |
stat.S_IXGRP |
stat.S_IROTH |
stat.S_IXOTH
)
# if the database is new, create an empty file and set file
# permissions
if not os.path.exists(self.db_path):
log.debug('creating empty database: %s', self.db_basename)
with open(self.db_path, 'w') as fp:
pass
os.chmod(self.db_path,
stat.S_IRUSR |
stat.S_IWUSR
)
def connect(self):
log.debug('opening database %s', self.db_basename)
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def close(self, conn):
log.debug('closing database %s', self.db_basename)
conn.close()

View File

@@ -0,0 +1,368 @@
# -*- indent-tabs-mode: t; tab-width: 4; python-indent-offset: 4; -*-
import sqlite3
import os, stat
import logging
import json
import datetime
from .EventStore import EventStore
log = logging.getLogger(__name__)
#
# schema
#
mta_conn_fields = [
'service',
'service_tid',
'connect_time',
'disconnect_time',
'remote_host',
'remote_ip',
'sasl_method',
'sasl_username',
'remote_auth_success',
'remote_auth_attempts',
'remote_used_starttls',
'disposition',
]
mta_accept_fields = [
'mta_conn_id',
'queue_time',
'queue_remove_time',
'subsystems',
# 'spf_tid',
'spf_result',
'spf_reason',
'postfix_msg_id',
'message_id',
'dkim_result',
'dkim_reason',
'dmarc_result',
'dmarc_reason',
'envelope_from',
'message_size',
'message_nrcpt',
'accept_status',
'failure_info',
'failure_category',
]
mta_delivery_fields = [
'mta_accept_id',
'service',
# 'service_tid',
'rcpt_to',
# 'postgrey_tid',
'postgrey_result',
'postgrey_reason',
'postgrey_delay',
# 'spam_tid',
'spam_result',
'spam_score',
'relay',
'status',
'delay',
'delivery_connection',
'delivery_connection_info',
'delivery_info',
'failure_category',
]
db_info_create_table_stmt = "CREATE TABLE IF NOT EXISTS db_info(id INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT NOT NULL, value TEXT NOT NULL)"
schema_updates = [
# update 0
[
# three "mta" tables having a one-to-many-to-many relationship:
# mta_connection(1) -> mta_accept(0:N) -> mta_delivery(0:N)
#
"CREATE TABLE mta_connection(\
mta_conn_id INTEGER PRIMARY KEY AUTOINCREMENT,\
service TEXT NOT NULL, /* 'smtpd', 'submission' or 'pickup' */\
service_tid TEXT NOT NULL,\
connect_time TEXT NOT NULL,\
disconnect_time TEXT,\
remote_host TEXT COLLATE NOCASE,\
remote_ip TEXT COLLATE NOCASE,\
sasl_method TEXT, /* sasl: submission service only */\
sasl_username TEXT COLLATE NOCASE,\
remote_auth_success INTEGER, /* count of successes */\
remote_auth_attempts INTEGER, /* count of attempts */\
remote_used_starttls INTEGER, /* 1 if STARTTLS used */\
disposition TEXT /* 'normal','scanner','login_attempt',etc */\
)",
"CREATE INDEX idx_mta_connection_connect_time ON mta_connection(connect_time, sasl_username COLLATE NOCASE)",
"CREATE TABLE mta_accept(\
mta_accept_id INTEGER PRIMARY KEY AUTOINCREMENT,\
mta_conn_id INTEGER,\
queue_time TEXT,\
queue_remove_time TEXT,\
subsystems TEXT,\
/*spf_tid TEXT,*/\
spf_result TEXT,\
spf_reason TEXT,\
postfix_msg_id TEXT,\
message_id TEXT,\
dkim_result TEXT,\
dkim_reason TEXT,\
dmarc_result TEXT,\
dmarc_reason TEXT,\
envelope_from TEXT COLLATE NOCASE,\
message_size INTEGER,\
message_nrcpt INTEGER,\
accept_status TEXT, /* 'accept','greylist','spf-reject',others... */\
failure_info TEXT, /* details from mta or subsystems */\
failure_category TEXT,\
FOREIGN KEY(mta_conn_id) REFERENCES mta_connection(mta_conn_id) ON DELETE RESTRICT\
)",
"CREATE TABLE mta_delivery(\
mta_delivery_id INTEGER PRIMARY KEY AUTOINCREMENT,\
mta_accept_id INTEGER,\
service TEXT, /* 'lmtp' or 'smtp' */\
/*service_tid TEXT,*/\
rcpt_to TEXT COLLATE NOCASE, /* email addr */\
/*postgrey_tid TEXT,*/\
postgrey_result TEXT,\
postgrey_reason TEXT,\
postgrey_delay NUMBER,\
/*spam_tid TEXT,*/ /* spam: lmtp only */\
spam_result TEXT, /* 'clean' or 'spam' */\
spam_score NUMBER, /* eg: 2.10 */\
relay TEXT, /* hostname[IP]:port */\
status TEXT, /* 'sent', 'bounce', 'reject', etc */\
delay NUMBER, /* fractional seconds, 'sent' status only */\
delivery_connection TEXT, /* 'trusted' or 'untrusted' */\
delivery_connection_info TEXT, /* details on TLS connection */\
delivery_info TEXT, /* details from the remote mta */\
failure_category TEXT,\
FOREIGN KEY(mta_accept_id) REFERENCES mta_accept(mta_accept_id) ON DELETE RESTRICT\
)",
"CREATE INDEX idx_mta_delivery_rcpt_to ON mta_delivery(rcpt_to COLLATE NOCASE)",
"CREATE TABLE state_cache(\
state_cache_id INTEGER PRIMARY KEY AUTOINCREMENT,\
owner_id INTEGER NOT NULL,\
state TEXT\
)",
"INSERT INTO db_info (key,value) VALUES ('schema_version', '0')"
]
]
class SqliteEventStore(EventStore):
def __init__(self, db_conn_factory):
super(SqliteEventStore, self).__init__(db_conn_factory)
self.update_schema()
def update_schema(self):
''' update the schema to the latest version
'''
c = None
conn = None
try:
conn = self.connect()
c = conn.cursor()
c.execute(db_info_create_table_stmt)
conn.commit()
c.execute("SELECT value from db_info WHERE key='schema_version'")
v = c.fetchone()
if v is None:
v = -1
else:
v = int(v[0])
for idx in range(v+1, len(schema_updates)):
log.info('updating database to v%s', idx)
for stmt in schema_updates[idx]:
try:
c.execute(stmt)
except Exception as e:
log.error('problem with sql statement at version=%s error="%s" stmt="%s"' % (idx, e, stmt))
raise e
conn.commit()
finally:
if c: c.close(); c=None
if conn: self.close(conn); conn=None
def write_rec(self, conn, type, rec):
if type=='inbound_mail':
#log.debug('wrote inbound_mail record')
self.write_inbound_mail(conn, rec)
elif type=='state':
''' rec: {
owner_id: int,
state: list
}
'''
self.write_state(conn, rec)
else:
raise ValueError('type "%s" not implemented' % type)
def _insert(self, table, fields):
insert = 'INSERT INTO ' + table + ' (' + \
",".join(fields) + \
') VALUES (' + \
"?,"*(len(fields)-1) + \
'?)'
return insert
def _values(self, fields, data_dict):
values = []
for field in fields:
if field in data_dict:
values.append(data_dict[field])
data_dict.pop(field)
else:
values.append(None)
for field in data_dict:
if type(data_dict[field]) != list and not field.startswith('_') and not field.endswith('_tid'):
log.warning('unused field: %s', field)
return values
def write_inbound_mail(self, conn, rec):
c = None
try:
c = conn.cursor()
# mta_connection
insert = self._insert('mta_connection', mta_conn_fields)
values = self._values(mta_conn_fields, rec)
#log.debug('INSERT: %s VALUES: %s REC=%s', insert, values, rec)
c.execute(insert, values)
conn_id = c.lastrowid
accept_insert = self._insert('mta_accept', mta_accept_fields)
delivery_insert = self._insert('mta_delivery', mta_delivery_fields)
for accept in rec.get('mta_accept', []):
accept['mta_conn_id'] = conn_id
values = self._values(mta_accept_fields, accept)
c.execute(accept_insert, values)
accept_id = c.lastrowid
for delivery in accept.get('mta_delivery', []):
delivery['mta_accept_id'] = accept_id
values = self._values(mta_delivery_fields, delivery)
c.execute(delivery_insert, values)
conn.commit()
except sqlite3.Error as e:
conn.rollback()
raise e
finally:
if c: c.close(); c=None
def write_state(self, conn, rec):
c = None
try:
c = conn.cursor()
owner_id = rec['owner_id']
insert = 'INSERT INTO state_cache (owner_id, state) VALUES (?, ?)'
for item in rec['state']:
item_json = json.dumps(item)
c.execute(insert, (owner_id, item_json))
conn.commit()
except sqlite3.Error as e:
conn.rollback()
raise e
finally:
if c: c.close(); c=None
def read_rec(self, conn, type, args):
if type=='state':
return self.read_state(
conn,
args['owner_id'],
args.get('clear',False)
)
else:
raise ValueError('type "%s" not implemented' % type)
def read_state(self, conn, owner_id, clear):
c = None
state = []
try:
c = conn.cursor()
select = 'SELECT state FROM state_cache WHERE owner_id=? ORDER BY state_cache_id'
for row in c.execute(select, (owner_id,)):
state.append(json.loads(row[0]))
if clear:
delete = 'DELETE FROM state_cache WHERE owner_id=?'
c.execute(delete, (owner_id,))
conn.commit()
finally:
if c: c.close(); c=None
return state
def prune(self, conn, policy):
older_than_days = datetime.timedelta(days=policy['older_than_days'])
if older_than_days.days <= 0:
return
now = datetime.datetime.now(datetime.timezone.utc)
d = (now - older_than_days)
dstr = d.isoformat(sep=' ', timespec='seconds')
c = None
try:
c = conn.cursor()
deletes = [
'DELETE FROM mta_delivery WHERE mta_accept_id IN (\
SELECT mta_accept.mta_accept_id FROM mta_accept\
JOIN mta_connection ON mta_connection.mta_conn_id = mta_accept.mta_conn_id\
WHERE connect_time < ?)',
'DELETE FROM mta_accept WHERE mta_accept_id IN (\
SELECT mta_accept.mta_accept_id FROM mta_accept\
JOIN mta_connection ON mta_connection.mta_conn_id = mta_accept.mta_conn_id\
WHERE connect_time < ?)',
'DELETE FROM mta_connection WHERE connect_time < ?'
]
counts = []
for delete in deletes:
c.execute(delete, (dstr,))
counts.append(str(c.rowcount))
conn.commit()
counts.reverse()
log.info("pruned %s rows", "/".join(counts))
except sqlite3.Error as e:
conn.rollback()
raise e
finally:
if c: c.close()

View File

@@ -0,0 +1,33 @@
import datetime
import pytz
rsyslog_traditional_regexp = '^(.{15})'
with open('/etc/timezone') as fp:
timezone_id = fp.read().strip()
def rsyslog_traditional(str):
# Handles the default timestamp in rsyslog
# (RSYSLOG_TraditionalFileFormat)
#
# eg: "Dec 6 06:25:04" (always 15 characters)
#
# the date string is in local time
#
d = datetime.datetime.strptime(str, '%b %d %H:%M:%S')
# since the log date has no year, use the current year
today = datetime.date.today()
year = today.year
if d.month == 12 and today.month == 1:
year -= 1
d = d.replace(year=year)
# convert to UTC
if timezone_id == 'Etc/UTC':
return d
local_tz = pytz.timezone(timezone_id)
return local_tz.localize(d, is_dst=None).astimezone(pytz.utc)

View File

@@ -0,0 +1,15 @@
'''subclass this and override methods to handle log output'''
class ReadLineHandler(object):
def handle(self, line):
''' handle a single line of output '''
raise NotImplementedError()
def end_of_callbacks(self, thread):
'''called when no more output will be sent to handle(). override this
method to save state, or perform cleanup during this
callback
'''
pass

View File

@@ -0,0 +1,26 @@
'''subclass this and override all methods to persist the position of
the log file that has been processed so far.
this enables the log monitor to pick up where it left off
a single FilePositionStore can safely be used with multiple
LogMonitor instances
'''
class ReadPositionStore(object):
def get(self, log_file, inode):
'''return the offset from the start of the file of the last
position saved for log_file having the given inode, or zero if
no position is currently saved
'''
raise NotImplementedError()
def save(self, log_file, inode, offset):
'''save the current position'''
raise NotImplementedError()
def clear(self, log_file):
'''remove all entries for `log_file`'''
raise NotImplementedError()

View File

@@ -0,0 +1,84 @@
from .ReadPositionStore import ReadPositionStore
import threading
import json
import os
import logging
log = logging.getLogger(__name__)
class ReadPositionStoreInFile(ReadPositionStore):
def __init__(self, output_file):
self.output_file = output_file
self.changed = False
self.lock = threading.Lock()
self.interrupt = threading.Event()
if os.path.exists(output_file):
with open(output_file, "r", encoding="utf-8") as fp:
self.db = json.loads(fp.read())
else:
self.db = {}
self.t = threading.Thread(
target=self._persist_bg,
name="ReadPositionStoreInFile",
daemon=True
)
self.t.start()
def __del__(self):
log.debug('ReadPositionStoreInFile __del__')
self.interrupt.set()
def stop(self):
self.interrupt.set()
self.t.join()
def get(self, file, inode):
with self.lock:
if file in self.db and str(inode) in self.db[file]:
return self.db[file][str(inode)]
return 0
def save(self, file, inode, pos):
with self.lock:
if not file in self.db:
self.db[file] = { str(inode):pos }
else:
self.db[file][str(inode)] = pos
self.changed = True
def clear(self, file):
with self.lock:
self.db[file] = {}
self.changed = True
def persist(self):
if self.changed:
try:
with open(self.output_file, "w") as fp:
with self.lock:
json_str = json.dumps(self.db)
self.changed = False
try:
fp.write(json_str)
except Exception as e:
with self.lock:
self.changed = True
log.error(e)
except Exception as e:
log.error(e)
def _persist_bg(self):
while not self.interrupt.is_set():
# wait 60 seconds before persisting
self.interrupt.wait(60)
# even if interrupted, persist one final time
self.persist()

View File

@@ -0,0 +1,160 @@
import threading
import os
import logging
import stat
from .ReadLineHandler import ReadLineHandler
log = logging.getLogger(__name__)
'''Spawn a thread to "tail" a log file. For each line read, provided
callbacks do something with the output. Callbacks must be a subclass
of ReadLineHandler.
'''
class TailFile(threading.Thread):
def __init__(self, log_file, store=None):
''' log_file - the log file to monitor
store - a ReadPositionStore instance
'''
self.log_file = log_file
self.store = store
self.fp = None
self.inode = None
self.callbacks = []
self.interrupt = threading.Event()
name=f'{__name__}-{os.path.basename(log_file)}'
log.debug('init thread: %s', name)
super(TailFile, self).__init__(name=name, daemon=True)
def stop(self, do_join=True):
log.debug('TailFile stopping')
self.interrupt.set()
# close must be called to unblock the thread fp.readline() call
self._close()
if do_join:
self.join()
def __del__(self):
self.stop(do_join=False)
def add_handler(self, fn):
assert self.is_alive() == False
self.callbacks.append(fn)
def clear_callbacks(self):
assert self.is_alive() == False
self.callbacks = []
def _open(self):
self._close()
self.inode = os.stat(self.log_file)[stat.ST_INO]
self.fp = open(
self.log_file,
"r",
encoding="utf-8",
errors="backslashreplace"
)
def _close(self):
if self.fp is not None:
self.fp.close()
self.fp = None
def _is_rotated(self):
try:
return os.stat(self.log_file)[stat.ST_INO] != self.inode
except FileNotFoundError:
return False
def _issue_callbacks(self, line):
for cb in self.callbacks:
if isinstance(cb, ReadLineHandler):
cb.handle(line)
else:
cb(line)
def _notify_end_of_callbacks(self):
for cb in self.callbacks:
if isinstance(cb, ReadLineHandler):
cb.end_of_callbacks(self)
def _restore_read_position(self):
if self.fp is None:
return
if self.store is None:
self.fp.seek(
0,
os.SEEK_END
)
else:
pos = self.store.get(self.log_file, self.inode)
size = os.stat(self.log_file)[stat.ST_SIZE]
if size < pos:
log.debug("truncated: %s" % self.log_file)
self.fp.seek(0, os.SEEK_SET)
else:
# if pos>size here, the seek call succeeds and returns
# 'pos', but future reads will fail
self.fp.seek(pos, os.SEEK_SET)
def run(self):
self.interrupt.clear()
# initial open - wait until file exists
while not self.interrupt.is_set() and self.fp is None:
try:
self._open()
except FileNotFoundError:
log.debug('log file "%s" not found, waiting...', self.log_file)
self.interrupt.wait(2)
continue
# restore reading position
self._restore_read_position()
while not self.interrupt.is_set():
try:
line = self.fp.readline() # blocking
if line=='':
log.debug('got EOF')
# EOF - check if file was rotated
if self._is_rotated():
log.debug('rotated')
self._open()
if self.store is not None:
self.store.clear(self.log_file)
# if not rotated, sleep
else:
self.interrupt.wait(1)
else:
# save position and call all callbacks
if self.store is not None:
self.store.save(
self.log_file,
self.inode,
self.fp.tell()
)
self._issue_callbacks(line)
except Exception as e:
log.exception(e)
if self.interrupt.wait(1) is not True:
if self._is_rotated():
self._open()
self._close()
try:
self._notify_end_of_callbacks()
except Exception as e:
log.exception(e)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,125 @@
class PostfixLogParser(object):
@staticmethod
def split_host(str):
''' split string in form HOST[IP] and return HOST and IP '''
ip_start = str.find('[')
ip_end = -1
if ip_start>=0:
ip_end = str.find(']', ip_start)
if ip_start<0 or ip_end<0:
return str, str
return str[0:ip_start], str[ip_start+1:ip_end]
@staticmethod
def strip_brackets(str, bracket_l='<', bracket_r='>'):
# strip enclosing '<>'
if len(str)>=2 and str[0]==bracket_l and str[-1]==bracket_r:
return str[1:-1]
return str
class SplitList(object):
''' split a postfix name=value list. For example:
"delay=4.7, to=<alice@post.com>, status=sent (250 2.0.0 <user@domain.tld> YB5nM1eS01+lSgAAlWWVsw Saved)"
returns: {
"delay": {
"name": "delay",
"value": "4.7"
},
"to": {
"name": "to",
"value": "alice@post.com"
},
"status": {
"name": "status",
"value": "sent",
"comment": "250 2.0.0 <user@domain.tld> YB5nM1eS01+lSgAAlWWVsw Saved"
}
}
'''
def __init__(self, str, delim=',', strip_brackets=True):
self.str = str
self.delim = delim
self.strip_brackets = True
self.pos = 0
def asDict(self):
d = {}
for pair in self:
d[pair['name']] = pair
return d
def __iter__(self):
self.pos = 0
return self
def __next__(self):
if self.pos >= len(self.str):
raise StopIteration
# name
eq = self.str.find('=', self.pos)
if eq<0:
self.pos = len(self.str)
raise StopIteration
name = self.str[self.pos:eq].strip()
# value and comment
self.pos = eq+1
value = []
comment = []
while self.pos < len(self.str):
c = self.str[self.pos]
self.pos += 1
if c=='<':
idx = self.str.find('>', self.pos)
if idx>=0:
value.append(self.str[self.pos-1:idx+1])
self.pos = idx+1
continue
if c=='(':
# parens may be nested...
open_count = 1
begin = self.pos
while self.pos < len(self.str) and open_count>0:
c = self.str[self.pos]
self.pos += 1
if c=='(':
open_count += 1
elif c==')':
open_count -= 1
if open_count == 0:
comment.append(self.str[begin:self.pos-1])
else:
comment.append(self.str[begin:len(self.str)])
continue
if c==self.delim:
break
begin = self.pos-1
while self.pos < len(self.str):
lookahead = self.str[self.pos]
if lookahead in [self.delim,'<','(']:
break
self.pos += 1
value.append(self.str[begin:self.pos])
if self.strip_brackets and len(value)==1:
value[0] = PostfixLogParser.strip_brackets(value[0])
return {
'name': name,
'value': ''.join(value),
'comment': None if len(comment)==0 else '; '.join(comment)
}

View File

@@ -0,0 +1,108 @@
class DictQuery(object):
@staticmethod
def find(data_list, q_list, return_first_exact=False, reverse=False):
'''find items in list `data_list` using the query specified in
`q_list` (a list of dicts).
side-effects:
q_list is modified ('_val' is added)
'''
if data_list is None:
if return_first_exact:
return None
else:
return []
if type(q_list) is not list:
q_list = [ q_list ]
# set _val to value.lower() if ignorecase is True
for q in q_list:
if q=='*': continue
ignorecase = q.get('ignorecase', False)
match_val = q['value']
if ignorecase and match_val is not None:
match_val = match_val.lower()
q['_val'] = match_val
# find all matches
matches = []
direction = -1 if reverse else 1
idx = len(data_list)-1 if reverse else 0
while (reverse and idx>=0) or (not reverse and idx<len(data_list)):
item = data_list[idx]
if 'rank' in item and 'item' in item:
# for re-querying...
item=item['item']
count_mismatch = 0
autoset_list = []
optional_list = []
for q in q_list:
if q=='*': continue
cmp_val = item.get(q['key'])
if cmp_val is not None and q.get('ignorecase'):
cmp_val = cmp_val.lower()
op = q.get('op', '=')
mismatch = False
if op == '=':
mismatch = q['_val'] != cmp_val
elif op == '!=':
mismatch = q['_val'] == cmp_val
else:
raise TypeError('No such op: ' + op)
if mismatch:
count_mismatch += 1
if cmp_val is None:
if q.get('autoset'):
autoset_list.append(q)
elif q.get('optional'):
optional_list.append(q)
if return_first_exact:
break
if return_first_exact:
if count_mismatch == 0:
return item
else:
optional_count = len(autoset_list) + len(optional_list)
if count_mismatch - optional_count == 0:
rank = '{0:05d}.{1:08d}'.format(
optional_count,
len(data_list) - idx if reverse else idx
)
matches.append({
'exact': ( optional_count == 0 ),
'rank': rank,
'autoset_list': autoset_list,
'optional_list': optional_list,
'item': item
})
idx += direction
if not return_first_exact:
# return the list sorted so the items with the fewest
# number of required autoset/optional's appear first
matches.sort(key=lambda x: x['rank'])
return matches
@staticmethod
def autoset(match, incl_optional=False):
item = match['item']
for q in match['autoset_list']:
assert item.get(q['key']) is None
item[q['key']] = q['value']
if incl_optional:
for q in match['optional_list']:
item[q['key']] = q['value']

View File

@@ -0,0 +1,9 @@
def load_env_vars_from_file(fn):
# Load settings from a KEY=VALUE file.
env = {}
for line in open(fn):
env.setdefault(*line.strip().split("=", 1))
# strip_quotes:
for k in env: env[k]=env[k].strip('"')
return env

View File

@@ -0,0 +1,18 @@
def safe_int(str, default_value=0):
try:
return int(str)
except ValueError:
return default_value
def safe_append(d, key, value):
if key not in d:
d[key] = [ value ]
else:
d[key].append(value)
return d
def safe_del(d, key):
if key in d:
del d[key]
return d