1
0
mirror of https://github.com/mail-in-a-box/mailinabox.git synced 2025-04-05 00:27:25 +00:00

Capture Dovecot logs

This commit is contained in:
downtownallday 2021-04-07 18:03:06 -04:00
parent 87cc106574
commit 33ea865d65
9 changed files with 809 additions and 142 deletions

View File

@ -1,2 +1 @@
tests/ tests/
run.sh

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from logs.TailFile import TailFile from logs.TailFile import TailFile
from mail.InboundMailLogHandler import InboundMailLogHandler from mail.PostfixLogHandler import PostfixLogHandler
from mail.DovecotLogHandler import DovecotLogHandler
from logs.ReadPositionStoreInFile import ReadPositionStoreInFile from logs.ReadPositionStoreInFile import ReadPositionStoreInFile
from db.SqliteConnFactory import SqliteConnFactory from db.SqliteConnFactory import SqliteConnFactory
from db.SqliteEventStore import SqliteEventStore from db.SqliteEventStore import SqliteEventStore
@ -50,6 +51,7 @@ options = {
'daemon': True, 'daemon': True,
'log_level': logging.WARNING, 'log_level': logging.WARNING,
'log_file': "/var/log/mail.log", 'log_file': "/var/log/mail.log",
'stop_at_eof': False,
'pos_file': "/var/lib/mailinabox/capture-pos.json", 'pos_file': "/var/lib/mailinabox/capture-pos.json",
'sqlite_file': os.path.join(CAPTURE_STORAGE_ROOT, 'capture.sqlite'), 'sqlite_file': os.path.join(CAPTURE_STORAGE_ROOT, 'capture.sqlite'),
'working_dir': "/var/run/mailinabox", 'working_dir': "/var/run/mailinabox",
@ -122,6 +124,10 @@ def process_cmdline(options):
elif arg=='-logfile' and have_next: elif arg=='-logfile' and have_next:
argi+=1 argi+=1
options['log_file'] = sys.argv[argi] options['log_file'] = sys.argv[argi]
elif arg=='-stopateof':
argi+=1
options['stop_at_eof'] = True
elif arg=='-posfile' and have_next: elif arg=='-posfile' and have_next:
argi+=1 argi+=1
@ -220,14 +226,21 @@ try:
) )
mail_tail = TailFile( mail_tail = TailFile(
options['log_file'], options['log_file'],
position_store position_store,
options['stop_at_eof']
) )
inbound_mail_handler = InboundMailLogHandler( postfix_log_handler = PostfixLogHandler(
event_store, event_store,
capture_enabled = options['config'].get('capture',True), capture_enabled = options['config'].get('capture',True),
drop_disposition = options['config'].get('drop_disposition') drop_disposition = options['config'].get('drop_disposition')
) )
mail_tail.add_handler(inbound_mail_handler) mail_tail.add_handler(postfix_log_handler)
dovecot_log_handler = DovecotLogHandler(
event_store,
capture_enabled = options['config'].get('capture',True),
drop_disposition = options['config'].get('drop_disposition')
)
mail_tail.add_handler(dovecot_log_handler)
pruner = Pruner( pruner = Pruner(
db_conn_factory, db_conn_factory,
policy=options['config']['prune_policy'] policy=options['config']['prune_policy']
@ -245,17 +258,6 @@ def terminate(sig, stack):
log.warning("shutting down due to SIGTERM") log.warning("shutting down due to SIGTERM")
log.debug("stopping mail_tail") log.debug("stopping mail_tail")
mail_tail.stop() mail_tail.stop()
log.debug("stopping pruner")
pruner.stop()
log.debug("stopping position_store")
position_store.stop()
log.debug("stopping event_store")
event_store.stop()
try:
os.remove(options['_runtime_config_file'])
except Exception:
pass
log.info("stopped")
# reload settings handler # reload settings handler
def reload(sig, stack): def reload(sig, stack):
@ -266,8 +268,10 @@ def reload(sig, stack):
options['config']['default_config'] = False options['config']['default_config'] = False
options['_config_file'] = config_default_file options['_config_file'] = config_default_file
log.info('%s records are in-progress', log.info('%s mta records are in-progress',
inbound_mail_handler.get_inprogress_count()) postfix_log_handler.get_inprogress_count())
log.info('%s imap records are in-progress',
dovecot_log_handler.get_inprogress_count())
if options['_config_file']: if options['_config_file']:
log.info('reloading %s', options['_config_file']) log.info('reloading %s', options['_config_file'])
@ -276,10 +280,16 @@ def reload(sig, stack):
pruner.set_policy( pruner.set_policy(
newconfig['prune_policy'] newconfig['prune_policy']
) )
inbound_mail_handler.set_capture_enabled( postfix_log_handler.set_capture_enabled(
newconfig.get('capture', True) newconfig.get('capture', True)
) )
inbound_mail_handler.update_drop_disposition( postfix_log_handler.update_drop_disposition(
newconfig.get('drop_disposition', {})
)
dovecot_log_handler.set_capture_enabled(
newconfig.get('capture', True)
)
dovecot_log_handler.update_drop_disposition(
newconfig.get('drop_disposition', {}) newconfig.get('drop_disposition', {})
) )
write_config(options['_runtime_config_file'], newconfig) write_config(options['_runtime_config_file'], newconfig)
@ -298,3 +308,15 @@ signal.signal(signal.SIGHUP, reload)
mail_tail.start() mail_tail.start()
mail_tail.join() mail_tail.join()
# gracefully close other threads
log.debug("stopping pruner")
pruner.stop()
log.debug("stopping position_store")
position_store.stop()
log.debug("stopping event_store")
event_store.stop()
try:
os.remove(options['_runtime_config_file'])
except Exception:
pass
log.info("stopped")

View File

@ -74,7 +74,6 @@ class EventStore(Prunable):
self.t.join() self.t.join()
def __del__(self): def __del__(self):
log.debug('EventStore __del__')
self.interrupt.set() self.interrupt.set()
self.have_event.set() self.have_event.set()

View File

@ -71,6 +71,23 @@ mta_delivery_fields = [
'failure_category', 'failure_category',
] ]
imap_conn_fields = [
'service',
'service_tid',
'connect_time',
'disconnect_time',
'disconnect_reason',
'remote_host',
'remote_ip',
'sasl_method',
'sasl_username',
'remote_auth_success',
'remote_auth_attempts',
'connection_security',
'in_bytes',
'out_bytes',
'disposition'
]
db_info_create_table_stmt = "CREATE TABLE IF NOT EXISTS db_info(id INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT NOT NULL, value TEXT NOT NULL)" db_info_create_table_stmt = "CREATE TABLE IF NOT EXISTS db_info(id INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT NOT NULL, value TEXT NOT NULL)"
@ -162,7 +179,33 @@ schema_updates = [
[ [
"ALTER TABLE mta_delivery ADD COLUMN orig_to TEXT COLLATE NOCASE", "ALTER TABLE mta_delivery ADD COLUMN orig_to TEXT COLLATE NOCASE",
"UPDATE db_info SET value='1' WHERE key='schema_version'" "UPDATE db_info SET value='1' WHERE key='schema_version'"
] ],
# update 2
[
"CREATE TABLE imap_connection(\
imap_conn_id INTEGER PRIMARY KEY AUTOINCREMENT,\
service TEXT NOT NULL, /* 'imap', 'imap-login', 'pop3-login' */\
service_tid TEXT,\
connect_time TEXT NOT NULL,\
disconnect_time TEXT,\
disconnect_reason TEXT,\
remote_host TEXT COLLATE NOCASE,\
remote_ip TEXT COLLATE NOCASE,\
sasl_method TEXT, /* eg. 'PLAIN' */\
sasl_username TEXT COLLATE NOCASE,\
remote_auth_success INTEGER, /* count of successes */\
remote_auth_attempts INTEGER, /* count of attempts */\
connection_security TEXT, /* eg 'TLS' */\
in_bytes INTEGER,\
out_bytes INTEGER,\
disposition TEXT /* 'ok','failed_login_attempt',etc */\
)",
"CREATE INDEX idx_imap_connection_connect_time ON imap_connection(connect_time, sasl_username COLLATE NOCASE)",
"UPDATE db_info SET value='2' WHERE key='schema_version'"
],
] ]
@ -209,9 +252,12 @@ class SqliteEventStore(EventStore):
def write_rec(self, conn, type, rec): def write_rec(self, conn, type, rec):
if type=='inbound_mail': if type=='mta_mail':
#log.debug('wrote inbound_mail record') self.write_mta_mail(conn, rec)
self.write_inbound_mail(conn, rec)
elif type=='imap_mail':
self.write_imap_mail(conn, rec)
elif type=='state': elif type=='state':
''' rec: { ''' rec: {
owner_id: int, owner_id: int,
@ -246,7 +292,7 @@ class SqliteEventStore(EventStore):
return values return values
def write_inbound_mail(self, conn, rec): def write_mta_mail(self, conn, rec):
c = None c = None
try: try:
c = conn.cursor() c = conn.cursor()
@ -282,6 +328,28 @@ class SqliteEventStore(EventStore):
def write_imap_mail(self, conn, rec):
c = None
try:
c = conn.cursor()
# imap_connection
insert = self._insert('imap_connection', imap_conn_fields)
values = self._values(imap_conn_fields, rec)
#log.debug('INSERT: %s VALUES: %s REC=%s', insert, values, rec)
c.execute(insert, values)
conn_id = c.lastrowid
conn.commit()
except sqlite3.Error as e:
conn.rollback()
raise e
finally:
if c: c.close(); c=None
def write_state(self, conn, rec): def write_state(self, conn, rec):
c = None c = None
try: try:
@ -354,7 +422,9 @@ class SqliteEventStore(EventStore):
JOIN mta_connection ON mta_connection.mta_conn_id = mta_accept.mta_conn_id\ JOIN mta_connection ON mta_connection.mta_conn_id = mta_accept.mta_conn_id\
WHERE connect_time < ?)', WHERE connect_time < ?)',
'DELETE FROM mta_connection WHERE connect_time < ?' 'DELETE FROM mta_connection WHERE connect_time < ?',
'DELETE FROM imap_connection WHERE connect_time < ?',
] ]
counts = [] counts = []

View File

@ -29,7 +29,6 @@ class ReadPositionStoreInFile(ReadPositionStore):
self.t.start() self.t.start()
def __del__(self): def __del__(self):
log.debug('ReadPositionStoreInFile __del__')
self.interrupt.set() self.interrupt.set()
def stop(self): def stop(self):

View File

@ -14,12 +14,13 @@ of ReadLineHandler.
''' '''
class TailFile(threading.Thread): class TailFile(threading.Thread):
def __init__(self, log_file, store=None): def __init__(self, log_file, store=None, stop_at_eof=False):
''' log_file - the log file to monitor ''' log_file - the log file to monitor
store - a ReadPositionStore instance store - a ReadPositionStore instance
''' '''
self.log_file = log_file self.log_file = log_file
self.store = store self.store = store
self.stop_at_eof = stop_at_eof
self.fp = None self.fp = None
self.inode = None self.inode = None
@ -31,7 +32,6 @@ class TailFile(threading.Thread):
super(TailFile, self).__init__(name=name, daemon=True) super(TailFile, self).__init__(name=name, daemon=True)
def stop(self, do_join=True): def stop(self, do_join=True):
log.debug('TailFile stopping')
self.interrupt.set() self.interrupt.set()
# close must be called to unblock the thread fp.readline() call # close must be called to unblock the thread fp.readline() call
self._close() self._close()
@ -72,15 +72,11 @@ class TailFile(threading.Thread):
def _issue_callbacks(self, line): def _issue_callbacks(self, line):
for cb in self.callbacks: for cb in self.callbacks:
if isinstance(cb, ReadLineHandler): cb.handle(line)
cb.handle(line)
else:
cb(line)
def _notify_end_of_callbacks(self): def _notify_end_of_callbacks(self):
for cb in self.callbacks: for cb in self.callbacks:
if isinstance(cb, ReadLineHandler): cb.end_of_callbacks(self)
cb.end_of_callbacks(self)
def _restore_read_position(self): def _restore_read_position(self):
if self.fp is None: if self.fp is None:
@ -122,6 +118,9 @@ class TailFile(threading.Thread):
line = self.fp.readline() # blocking line = self.fp.readline() # blocking
if line=='': if line=='':
log.debug('got EOF') log.debug('got EOF')
if self.stop_at_eof:
self.interrupt.set()
# EOF - check if file was rotated # EOF - check if file was rotated
if self._is_rotated(): if self._is_rotated():
log.debug('rotated') log.debug('rotated')
@ -144,6 +143,7 @@ class TailFile(threading.Thread):
self._issue_callbacks(line) self._issue_callbacks(line)
except Exception as e: except Exception as e:
log.error('exception processing line: %s', line)
log.exception(e) log.exception(e)
if self.interrupt.wait(1) is not True: if self.interrupt.wait(1) is not True:
if self._is_rotated(): if self._is_rotated():

View File

@ -0,0 +1,128 @@
import logging
import re
import datetime
import traceback
import ipaddress
import threading
from logs.ReadLineHandler import ReadLineHandler
import logs.DateParser
from db.EventStore import EventStore
from util.DictQuery import DictQuery
from util.safe import (safe_int, safe_append, safe_del)
log = logging.getLogger(__name__)
class CommonHandler(ReadLineHandler):
'''
'''
def __init__(self, state_cache_owner_id, record_store,
date_regexp = logs.DateParser.rsyslog_traditional_regexp,
date_parser_fn = logs.DateParser.rsyslog_traditional,
capture_enabled = True,
drop_disposition = None
):
self.state_cache_owner_id = state_cache_owner_id
''' EventStore instance for persisting "records" '''
self.record_store = record_store
self.set_capture_enabled(capture_enabled)
# our in-progress record queue is a simple list
self.recs = self.get_cached_state(clear=True)
self.current_inprogress_recs = len(self.recs)
# records that have these dispositions will be dropped (not
# recorded in the record store
self.drop_disposition_lock = threading.Lock()
self.drop_disposition = {
'failed_login_attempt': False,
'suspected_scanner': False,
'reject': False
}
self.update_drop_disposition(drop_disposition)
# regular expression that matches a syslog date (always anchored)
self.date_regexp = date_regexp
if date_regexp.startswith('^'):
self.date_regexp = date_regexp[1:]
# function that parses the syslog date
self.date_parser_fn = date_parser_fn
def get_inprogress_count(self):
''' thread-safe '''
return self.current_inprogress_recs
def update_inprogress_count(self):
self.current_inprogress_recs = len(self.recs)
def set_capture_enabled(self, capture_enabled):
''' thread-safe '''
self.capture_enabled = capture_enabled
def update_drop_disposition(self, drop_disposition):
''' thread-safe '''
with self.drop_disposition_lock:
self.drop_disposition.update(drop_disposition)
def test_drop_disposition(self, disposition):
with self.drop_disposition_lock:
return self.drop_disposition.get(disposition, False)
def datetime_as_str(self, d):
# iso-8601 time friendly to sqlite3
timestamp = d.isoformat(sep=' ', timespec='seconds')
# strip the utc offset from the iso format (ie. remove "+00:00")
idx = timestamp.find('+00:00')
if idx>0:
timestamp = timestamp[0:idx]
return timestamp
def parse_date(self, str):
# we're expecting UTC times from date_parser()
d = self.date_parser_fn(str)
return self.datetime_as_str(d)
def get_cached_state(self, clear=True):
conn = None
try:
# obtain the cached records from the record store
conn = self.record_store.connect()
recs = self.record_store.read_rec(conn, 'state', {
"owner_id": self.state_cache_owner_id,
"clear": clear
})
log.info('read %s incomplete records from cache %s', len(recs), self.state_cache_owner_id)
# eliminate stale records - "stale" should be longer than
# the "give-up" time for postfix (4-5 days)
stale = datetime.timedelta(days=7)
cutoff = self.datetime_as_str(
datetime.datetime.now(datetime.timezone.utc) - stale
)
newlist = [ rec for rec in recs if rec['connect_time'] >= cutoff ]
if len(newlist) < len(recs):
log.warning('dropping %s stale incomplete records',
len(recs) - len(newlist))
return newlist
finally:
if conn: self.record_store.close(conn)
def save_state(self):
log.info('saving state to cache %s: %s records', self.state_cache_owner_id, len(self.recs))
self.record_store.store('state', {
'owner_id': self.state_cache_owner_id,
'state': self.recs
})
def end_of_callbacks(self, thread):
'''overrides ReadLineHandler method
save incomplete records so we can pick up where we left off
'''
self.update_inprogress_count()
self.save_state()

View File

@ -0,0 +1,534 @@
import logging
import re
import datetime
import ipaddress
import threading
from logs.ReadLineHandler import ReadLineHandler
import logs.DateParser
from db.EventStore import EventStore
from util.DictQuery import DictQuery
from util.safe import (safe_int, safe_append, safe_del)
from .CommonHandler import CommonHandler
log = logging.getLogger(__name__)
STATE_CACHE_OWNER_ID = 2
class DovecotLogHandler(CommonHandler):
'''
'''
def __init__(self, record_store,
date_regexp = logs.DateParser.rsyslog_traditional_regexp,
date_parser_fn = logs.DateParser.rsyslog_traditional,
capture_enabled = True,
drop_disposition = None
):
super(DovecotLogHandler, self).__init__(
STATE_CACHE_OWNER_ID,
record_store,
date_regexp,
date_parser_fn,
capture_enabled,
drop_disposition
)
# A "record" is composed by parsing all the syslog output from
# the activity generated by dovecot (imap, pop3) from a single
# remote connection. Once a full history of the connection,
# the record is written to the record_store.
#
# `recs` is an array holding incomplete, in-progress
# "records". This array has the following format:
#
# (For convenience, it's easier to refer to the table column
# names found in SqliteEventStore for the dict member names that
# are used here since they're all visible in one place.)
#
# [{
# ... fields of the imap_connection table ...
# }]
#
# IMPORTANT:
#
# No methods in this class are safe to call by any thread
# other than the caller of handle(), unless marked as
# thread-safe.
#
# maximum size of the in-progress record queue (should be the
# same or greater than the maximum simultaneous dovecot/imap
# connections allowed, which is dovecot settings
# `process_limit` times `client_limit`, which defaults to 100
# * 1000)
self.max_inprogress_recs = 100 * 1000
# 1a. imap-login: Info: Login: user=<keith@just1w.com>, method=PLAIN, rip=146.168.130.9, lip=192.155.92.185, mpid=5866, TLS, session=<IF3v7ze27dKSqIIJ>
# 1=date
# 2=service ("imap-login" or "pop3-login")
# 3=sasl_username ("user@domain.com")
# 4=sasl_method ("PLAIN")
# 5=remote_ip ("146.168.130.9")
# 6=local_ip
# 7=service_tid ("5866")
# 8=connection_security ("TLS")
self.re_connect_success = re.compile('^' + self.date_regexp + ' (imap-login|pop3-login): Info: Login: user=<([^>]*)>, method=([^,]*), rip=([^,]+), lip=([^,]+), mpid=([^,]+), ([^,]+)')
# 1a. imap-login: Info: Disconnected (auth failed, 1 attempts in 4 secs): user=<fernando@athigo.com>, method=PLAIN, rip=152.67.63.172, lip=192.155.92.185, TLS: Disconnected, session=<rho/Rjq2EqSYQz+s>
# 1=date
# 2=service ("imap-login" or "pop3-login")
# 3=dissconnect_reason
# 4=remote_auth_attempts
# 5=sasl_username
# 6=sasl_method
# 7=remote_ip
# 8=local_ip
# 9=connection_security
self.re_connect_fail_1 = re.compile('^' + self.date_regexp + ' (imap-login|pop3-login): Info: (?:Disconnected|Aborted login) \(([^,]+), (\d+) attempts[^\)]*\): user=<([^>]*)>, method=([^,]+), rip=([^,]+), lip=([^,]+), ([^,]+)')
# 2a. pop3-login: Info: Disconnected (no auth attempts in 2 secs): user=<>, rip=196.52.43.85, lip=192.155.92.185, TLS handshaking: SSL_accept() failed: error:14209102:SSL routines:tls_early_post_process_client_hello:unsupported protocol, session=<ehaSaDm2x9vENCtV>
# 2b. imap-login: Info: Disconnected (no auth attempts in 2 secs): user=<>, rip=92.118.160.61, lip=192.155.92.185, TLS handshaking: SSL_accept() failed: error:14209102:SSL routines:tls_early_post_process_client_hello:unsupported protocol, session=<cvmKhjq2qtJcdqA9>
# 1=date
# 2=service ("imap-login" or "pop3-login")
# 3=disconnect_reason
# 4=sasl_username ("")
# 5=remote_ip ("146.168.130.9")
# 6=local_ip
# 7=connection_security
self.re_connect_fail_2 = re.compile('^' + self.date_regexp + ' (imap-login|pop3-login): Info: (?:Disconnected|Aborted login) \(([^\)]*)\): user=<([^>]*)>, rip=([^,]+), lip=([^,]+), ([^,]+)')
#3a. imap-login: Info: Disconnected (client didn't finish SASL auth, waited 0 secs): user=<>, method=PLAIN, rip=107.107.63.148, lip=192.155.92.185, TLS: SSL_read() syscall failed: Connection reset by peer, session=<rmBsIP21Zsdraz+U>
# 1=date
# 2=service ("imap-login" or "pop3-login")
# 3=disconnect_reason
# 4=sasl_username ("")
# 5=sasl_method
# 6=remote_ip ("146.168.130.9")
# 7=local_ip
# 8=connection_security
self.re_connect_fail_3 = re.compile('^' + self.date_regexp + ' (imap-login|pop3-login): Info: (?:Disconnected|Aborted login) \(([^\)]*)\): user=<([^>]*)>, method=([^,]+), rip=([^,]+), lip=([^,]+), ([^,]+)')
# 4a. pop3-login: Info: Disconnected: Too many bad commands (no auth attempts in 0 secs): user=<>, rip=83.97.20.35, lip=192.155.92.185, TLS, session=<BH8JRCi2nJ5TYRQj>
# 1=date
# 2=service ("imap-login" or "pop3-login")
# 3=disconnect_reason
# 4=sasl_username ("")
# 5=remote_ip ("146.168.130.9")
# 6=local_ip
# 7=connection_security
self.re_connect_fail_4 = re.compile('^' + self.date_regexp + ' (imap-login|pop3-login): Info: Disconnected: ([^\(]+) \(no auth attempts [^\)]+\): user=<([^>]*)>, rip=([^,]+), lip=([^,]+), ([^,]+)')
# 5a. imap-login: Info: Disconnected: Too many bad commands (auth failed, 1 attempts in 4 secs): user=<fernando@athigo.com>, method=PLAIN, rip=152.67.63.172, lip=192.155.92.185, TLS: Disconnected, session=<rho/Rjq2EqSYQz+s>
# 1=date
# 2=service ("imap-login" or "pop3-login")
# 3=disconnect_reason
# 4=remote_auth_attempts
# 5=sasl_username
# 6=sasl_method
# 7=remote_ip
# 8=local_ip
# 9=connection_security
self.re_connect_fail_5 = re.compile('^' + self.date_regexp + ' (imap-login|pop3-login): Info: (?:Disconnected|Aborted login): ([^\(]+) \(auth failed, (\d+) attempts [^\)]+\): user=<([^>]*)>, method=([^,]+), rip=([^,]+), lip=([^,]+), ([^,]+)')
# 1a. imap(jzw@just1w.com): Info: Logged out in=29 out=496
#
# 1b. imap(jzw@just1w.com): Info: Connection closed (IDLE running for 0.001 + waiting input for 5.949 secs, 2 B in + 10 B out, state=wait-input) in=477 out=6171
# 1c. imap(jzw@just1w.com): Info: Connection closed (UID STORE finished 0.225 secs ago) in=8099 out=21714
# 1d. imap(jzw@just1w.com): Info: Connection closed (LIST finished 115.637 secs ago) in=629 out=11130
#
# 1e. imap(jzw@just1w.com): Info: Connection closed (APPEND finished 0.606 secs ago) in=11792 out=10697
#
# 1f. imap(jzw@just1w.com): Info: Disconnected for inactivity in=1518 out=2962
#
# 1g. imap(keith@just1w.com): Info: Server shutting down. in=720 out=7287
# 1=date
# 2=service ("imap" or "pop3")
# 3=sasl_username
# 4=disconnect_reason ("Disconnected for inactivity")
# 5=in_bytes
# 6=out_bytes
self.re_disconnect = re.compile('^' + self.date_regexp + ' (imap|pop3)\(([^\)]*)\): Info: ((?:Logged out|Connection closed|Disconnected|Server shutting down).*) in=(\d+) out=(\d+)')
def add_new_connection(self, imap_conn):
''' queue an imap_connection record '''
threshold = self.max_inprogress_recs + ( len(self.recs) * 0.05 )
if len(self.recs) > threshold:
backoff = len(self.recs) - self.max_inprogress_recs + int( self.max_inprogress_recs * 0.10 )
log.warning('dropping %s old imap records', backoff)
self.recs = self.recs[min(len(self.recs),backoff):]
self.recs.append(imap_conn)
return imap_conn
def remove_connection(self, imap_conn):
''' remove a imap_connection record from queue '''
self.recs.remove(imap_conn)
def find_by(self, imap_conn_q, debug=False):
'''find records using field-matching queries
return a list of imap_conn matching query `imap_conn_q`
'''
if debug:
log.debug('imap_accept_q: %s', imap_accept_q)
# find all candidate recs with matching imap_conn_q, ordered by most
# recent last
candidates = DictQuery.find(self.recs, imap_conn_q, reverse=False)
if len(candidates)==0:
if debug: log.debug('no candidates')
return []
elif not candidates[0]['exact']:
# there were no exact matches. apply autosets to the best
# match requiring the fewest number of autosets (index 0)
if debug: log.debug('no exact candidates')
DictQuery.autoset(candidates[0])
candidates[0]['exact'] = True
candidates = [ candidates[0] ]
else:
# there is at least one exact match - remove all non-exact
# candidates
candidates = [
candidate for candidate in candidates if candidate['exact']
]
return [ candidate['item'] for candidate in candidates ]
def find_first(self, *args, **kwargs):
'''find the "best" result and return it - find_by() returns the list
ordered, with the first being the "best"
'''
r = self.find_by(*args, **kwargs)
if len(r)==0:
return None
return r[0]
def match_connect_success(self, line):
# 1=date
# 2=service ("imap-login" or "pop3-login")
# 3=sasl_username ("user@domain.com")
# 4=sasl_method ("PLAIN")
# 5=remote_ip ("146.168.130.9")
# 6=local_ip
# 7=service_tid ("5866")
# 8=connection_security ("TLS")
m = self.re_connect_success.search(line)
if m:
imap_conn = {
"connect_time": self.parse_date(m.group(1)), # "YYYY-MM-DD HH:MM:SS"
"service": m.group(2),
"sasl_username": m.group(3),
"sasl_method": m.group(4),
"remote_host": "unknown",
"remote_ip": m.group(5),
"service_tid": m.group(7),
"connection_security": m.group(8),
"remote_auth_success": 1,
"remote_auth_attempts": 1
}
self.add_new_connection(imap_conn)
return { 'imap_conn': imap_conn }
def match_connect_fail(self, line):
m = self.re_connect_fail_1.search(line)
if m:
# 1a. imap-login: Info: Disconnected (auth failed, 1 attempts in 4 secs): user=<fernando@athigo.com>, method=PLAIN, rip=152.67.63.172, lip=192.155.92.185, TLS: Disconnected, session=<rho/Rjq2EqSYQz+s>
# 1=date
# 2=service ("imap-login" or "pop3-login")
# 3=dissconnect_reason
# 4=remote_auth_attempts
# 5=sasl_username
# 6=sasl_method
# 7=remote_ip
# 8=local_ip
# 9=connection_security
d = self.parse_date(m.group(1)) # "YYYY-MM-DD HH:MM:SS"
imap_conn = {
"connect_time": d,
"disconnect_time": d,
"disconnect_reason": m.group(3),
"service": m.group(2),
"sasl_username": m.group(5),
"sasl_method": m.group(6),
"remote_host": "unknown",
"remote_ip": m.group(7),
"connection_security": m.group(9),
"remote_auth_success": 0,
"remote_auth_attempts": int(m.group(4))
}
self.add_new_connection(imap_conn)
return { 'imap_conn': imap_conn }
m = self.re_connect_fail_2.search(line)
if m:
# 2a. pop3-login: Info: Disconnected (no auth attempts in 2 secs): user=<>, rip=196.52.43.85, lip=192.155.92.185, TLS handshaking: SSL_accept() failed: error:14209102:SSL routines:tls_early_post_process_client_hello:unsupported protocol, session=<ehaSaDm2x9vENCtV>
# 1=date
# 2=service ("imap-login" or "pop3-login")
# 3=disconnect_reason
# 4=sasl_username ("")
# 5=remote_ip ("146.168.130.9")
# 6=local_ip
# 7=connection_security
d = self.parse_date(m.group(1)) # "YYYY-MM-DD HH:MM:SS"
imap_conn = {
"connect_time": d,
"disconnect_time": d,
"disconnect_reason": m.group(3),
"service": m.group(2),
"sasl_username": m.group(4),
"remote_host": "unknown",
"remote_ip": m.group(5),
"connection_security": m.group(7),
"remote_auth_success": 0,
"remote_auth_attempts": 0
}
self.add_new_connection(imap_conn)
return { 'imap_conn': imap_conn }
m = self.re_connect_fail_3.search(line)
if m:
#3a. imap-login: Info: Disconnected (client didn't finish SASL auth, waited 0 secs): user=<>, method=PLAIN, rip=107.107.63.148, lip=192.155.92.185, TLS: SSL_read() syscall failed: Connection reset by peer, session=<rmBsIP21Zsdraz+U>
# 1=date
# 2=service ("imap-login" or "pop3-login")
# 3=disconnect_reason
# 4=sasl_username ("")
# 5=sasl_method
# 6=remote_ip ("146.168.130.9")
# 7=local_ip
# 8=connection_security
d = self.parse_date(m.group(1)) # "YYYY-MM-DD HH:MM:SS"
imap_conn = {
"connect_time": d,
"disconnect_time": d,
"disconnect_reason": m.group(3),
"service": m.group(2),
"sasl_username": m.group(4),
"sasl_method": m.group(5),
"remote_host": "unknown",
"remote_ip": m.group(6),
"connection_security": m.group(8),
"remote_auth_success": 0,
"remote_auth_attempts": 0
}
self.add_new_connection(imap_conn)
return { 'imap_conn': imap_conn }
m = self.re_connect_fail_4.search(line)
if m:
# 4a. pop3-login: Info: Disconnected: Too many bad commands (no auth attempts in 0 secs): user=<>, rip=83.97.20.35, lip=192.155.92.185, TLS, session=<BH8JRCi2nJ5TYRQj>
# 1=date
# 2=service ("imap-login" or "pop3-login")
# 3=disconnect_reason
# 4=sasl_username ("")
# 5=remote_ip ("146.168.130.9")
# 6=local_ip
# 7=connection_security
d = self.parse_date(m.group(1)) # "YYYY-MM-DD HH:MM:SS"
imap_conn = {
"connect_time": d,
"disconnect_time": d,
"disconnect_reason": m.group(3),
"service": m.group(2),
"sasl_username": m.group(4),
"remote_host": "unknown",
"remote_ip": m.group(5),
"connection_security": m.group(6),
"remote_auth_success": 0,
"remote_auth_attempts": 0
}
self.add_new_connection(imap_conn)
return { 'imap_conn': imap_conn }
m = self.re_connect_fail_5.search(line)
if m:
# 5a. imap-login: Info: Disconnected: Too many bad commands (auth failed, 1 attempts in 4 secs): user=<fernando@athigo.com>, method=PLAIN, rip=152.67.63.172, lip=192.155.92.185, TLS: Disconnected, session=<rho/Rjq2EqSYQz+s>
# 1=date
# 2=service ("imap-login" or "pop3-login")
# 3=disconnect_reason
# 4=remote_auth_attempts
# 5=sasl_username
# 6=sasl_method
# 7=remote_ip
# 8=local_ip
# 9=connection_security
d = self.parse_date(m.group(1)) # "YYYY-MM-DD HH:MM:SS"
imap_conn = {
"connect_time": d,
"disconnect_time": d,
"disconnect_reason": m.group(3),
"service": m.group(2),
"sasl_username": m.group(5),
"sasl_method": m.group(6),
"remote_host": "unknown",
"remote_ip": m.group(7),
"connection_security": m.group(9),
"remote_auth_success": 0,
"remote_auth_attempts": int(m.group(4))
}
self.add_new_connection(imap_conn)
return { 'imap_conn': imap_conn }
def match_disconnect(self, line):
# 1=date
# 2=service ("imap" or "pop3")
# 3=sasl_username
# 4=disconnect_reason ("Logged out")
# 5=in_bytes
# 6=out_bytes
#
# NOTE: there is no way to match up the disconnect with the
# actual connection because Dovecot does not log a service_tid
# or an ip address or anything else that could be used to
# match the two up. We'll just assign the disconnect to the
# oldest connection for the user.
m = self.re_disconnect.search(line)
if m:
v = {
"service": m.group(2),
"disconnect_time": self.parse_date(m.group(1)),
"disconnect_reason": m.group(4),
"in_bytes": int(m.group(5)),
"out_bytes": int(m.group(6))
}
imap_conn_q = [
{ 'key':'service', 'value':m.group(2) + '-login' },
{ 'key':'sasl_username', 'value':m.group(3),
'ignorecase': True }
]
log.debug(imap_conn_q)
imap_conn = self.find_first(imap_conn_q)
if imap_conn:
imap_conn.update(v)
return { 'imap_conn': imap_conn }
return True
def store(self, imap_conn):
if 'disposition' not in imap_conn:
if imap_conn.get('remote_auth_success') == 0 and \
imap_conn.get('remote_auth_attempts') == 0:
imap_conn.update({
'disposition': 'suspected_scanner',
})
elif imap_conn.get('remote_auth_success') == 0 and \
imap_conn.get('remote_auth_attempts', 0) > 0:
imap_conn.update({
'disposition': 'failed_login_attempt',
})
elif imap_conn.get('connection_security') != 'TLS' and \
imap_conn.get('remote_ip') != '127.0.0.1':
imap_conn.update({
'disposition': 'insecure'
})
else:
imap_conn.update({
'disposition': 'ok',
})
drop = self.test_drop_disposition(imap_conn['disposition'])
if not drop:
log.debug('store: %s', imap_conn)
try:
self.record_store.store('imap_mail', imap_conn)
except Exception as e:
log.exception(e)
self.remove_connection(imap_conn)
def log_match(self, match_str, match_result, line):
if match_result is True:
log.info('%s [unmatched]: %s', match_str, line)
elif match_result:
if match_result.get('deferred', False):
log.debug('%s [deferred]: %s', match_str, line)
elif 'imap_conn' in match_result:
log.debug('%s: %s: %s', match_str, line, match_result['imap_conn'])
else:
log.error('no imap_conn in match_result: ', match_result)
else:
log.debug('%s: %s', match_str, line)
def test_end_of_rec(self, match_result):
if not match_result or match_result is True or match_result.get('deferred', False):
return False
return self.end_of_rec(match_result['imap_conn'])
def end_of_rec(self, imap_conn):
'''a client must be disconnected for the record to be "complete"
'''
if 'disconnect_time' not in imap_conn:
return False
return True
def handle(self, line):
'''overrides ReadLineHandler method
This function is called by the main log reading thread in
TailFile. All additional log reading is blocked until this
function completes.
The storage engine (`record_store`, a SqliteEventStore
instance) does not block, so this function will return before
the record is saved to disk.
IMPORTANT:
The data structures and methods in this class are not thread
safe. It is not okay to call any of them when the instance is
registered with TailFile.
'''
if not self.capture_enabled:
return
self.update_inprogress_count()
log.debug('imap recs in progress: %s', len(self.recs))
match = self.match_connect_success(line)
if match:
self.log_match('connect', match, line)
return
match = self.match_connect_fail(line)
if match:
self.log_match('connect_fail', match, line)
if self.test_end_of_rec(match):
# we're done - not queued and disconnected ... save it
self.store(match['imap_conn'])
return
match = self.match_disconnect(line)
if match:
self.log_match('disconnect', match, line)
if self.test_end_of_rec(match):
# we're done - not queued and disconnected ... save it
self.store(match['imap_conn'])
return
self.log_match('IGNORED', None, line)

View File

@ -10,6 +10,7 @@ from db.EventStore import EventStore
from util.DictQuery import DictQuery from util.DictQuery import DictQuery
from util.safe import (safe_int, safe_append, safe_del) from util.safe import (safe_int, safe_append, safe_del)
from .PostfixLogParser import PostfixLogParser from .PostfixLogParser import PostfixLogParser
from .CommonHandler import CommonHandler
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -17,7 +18,7 @@ log = logging.getLogger(__name__)
STATE_CACHE_OWNER_ID = 1 STATE_CACHE_OWNER_ID = 1
class InboundMailLogHandler(ReadLineHandler): class PostfixLogHandler(CommonHandler):
''' '''
''' '''
def __init__(self, record_store, def __init__(self, record_store,
@ -26,9 +27,20 @@ class InboundMailLogHandler(ReadLineHandler):
capture_enabled = True, capture_enabled = True,
drop_disposition = None drop_disposition = None
): ):
''' EventStore instance for persisting "records" ''' super(PostfixLogHandler, self).__init__(
self.record_store = record_store STATE_CACHE_OWNER_ID,
self.set_capture_enabled(capture_enabled) record_store,
date_regexp,
date_parser_fn,
capture_enabled,
drop_disposition
)
# maximum size of the in-progress record queue
self.current_inprogress_recs = len(self.recs)
self.max_inprogress_recs = 100
# A "record" is composed by parsing all the syslog output from # A "record" is composed by parsing all the syslog output from
# the activity generated by the MTA (postfix) from a single # the activity generated by the MTA (postfix) from a single
@ -66,31 +78,6 @@ class InboundMailLogHandler(ReadLineHandler):
# thread-safe. # thread-safe.
# #
# our in-progress record queue is a simple list
self.recs = self.get_cached_state(clear=True)
# maximum size of the in-progress record queue
self.current_inprogress_recs = len(self.recs)
self.max_inprogress_recs = 100
# records that have these dispositions will be dropped (not
# recorded in the record store
self.drop_disposition_lock = threading.Lock()
self.drop_disposition = {
'failed_login_attempt': False,
'suspected_scanner': False,
'reject': False
}
self.update_drop_disposition(drop_disposition)
# regular expression that matches a syslog date (always anchored)
self.date_regexp = date_regexp
if date_regexp.startswith('^'):
self.date_regexp = date_regexp[1:]
# function that parses the syslog date
self.date_parser_fn = date_parser_fn
# 1. 1a. postfix/smtpd[13698]: connect from host.tld[1.2.3.4] # 1. 1a. postfix/smtpd[13698]: connect from host.tld[1.2.3.4]
# 1=date # 1=date
@ -233,70 +220,9 @@ class InboundMailLogHandler(ReadLineHandler):
def set_capture_enabled(self, capture_enabled):
''' thread-safe '''
self.capture_enabled = capture_enabled
def update_drop_disposition(self, drop_disposition):
''' thread-safe '''
with self.drop_disposition_lock:
self.drop_disposition.update(drop_disposition)
def get_inprogress_count(self): def get_inprogress_count(self):
''' thread-safe ''' ''' thread-safe '''
return self.current_inprogress_recs return self.current_inprogress_recs
def datetime_as_str(self, d):
# iso-8601 time friendly to sqlite3
timestamp = d.isoformat(sep=' ', timespec='seconds')
# strip the utc offset from the iso format (ie. remove "+00:00")
idx = timestamp.find('+00:00')
if idx>0:
timestamp = timestamp[0:idx]
return timestamp
def parse_date(self, str):
# we're expecting UTC times from date_parser()
d = self.date_parser_fn(str)
return self.datetime_as_str(d)
def get_cached_state(self, clear=True):
conn = None
try:
# obtain the cached records from the record store
conn = self.record_store.connect()
recs = self.record_store.read_rec(conn, 'state', {
"owner_id": STATE_CACHE_OWNER_ID,
"clear": clear
})
log.info('read %s incomplete records from cache', len(recs))
# eliminate stale records - "stale" should be longer than
# the "give-up" time for postfix (4-5 days)
stale = datetime.timedelta(days=7)
cutoff = self.datetime_as_str(
datetime.datetime.now(datetime.timezone.utc) - stale
)
newlist = [ rec for rec in recs if rec['connect_time'] >= cutoff ]
if len(newlist) < len(recs):
log.warning('dropping %s stale incomplete records',
len(recs) - len(newlist))
return newlist
finally:
if conn: self.record_store.close(conn)
def save_state(self):
log.info('saving state to cache: %s records', len(self.recs))
conn = None
try:
conn = self.record_store.connect()
self.record_store.write_rec(conn, 'state', {
'owner_id': STATE_CACHE_OWNER_ID,
'state': self.recs
})
finally:
if conn: self.record_store.close(conn)
def add_new_connection(self, mta_conn): def add_new_connection(self, mta_conn):
@ -304,7 +230,7 @@ class InboundMailLogHandler(ReadLineHandler):
threshold = self.max_inprogress_recs + ( len(self.recs) * 0.05 ) threshold = self.max_inprogress_recs + ( len(self.recs) * 0.05 )
if len(self.recs) > threshold: if len(self.recs) > threshold:
backoff = len(self.recs) - self.max_inprogress_recs + int( self.max_inprogress_recs * 0.10 ) backoff = len(self.recs) - self.max_inprogress_recs + int( self.max_inprogress_recs * 0.10 )
log.warning('dropping %s old records', backoff) log.warning('dropping %s old mta records', backoff)
self.recs = self.recs[min(len(self.recs),backoff):] self.recs = self.recs[min(len(self.recs),backoff):]
self.recs.append(mta_conn) self.recs.append(mta_conn)
@ -1314,14 +1240,12 @@ class InboundMailLogHandler(ReadLineHandler):
'disposition': 'ok', 'disposition': 'ok',
}) })
drop = False drop = self.test_drop_disposition(mta_conn['disposition'])
with self.drop_disposition_lock:
drop = self.drop_disposition.get(mta_conn['disposition'], False)
if not drop: if not drop:
log.debug('store: %s', mta_conn) log.debug('store: %s', mta_conn)
try: try:
self.record_store.store('inbound_mail', mta_conn) self.record_store.store('mta_mail', mta_conn)
except Exception as e: except Exception as e:
log.exception(e) log.exception(e)
@ -1389,9 +1313,9 @@ class InboundMailLogHandler(ReadLineHandler):
if not self.capture_enabled: if not self.capture_enabled:
return return
self.current_inprogress_recs = len(self.recs) self.update_inprogress_count()
log.debug('recs in progress: %s, dds_by_tid=%s', log.debug('mta recs in progress: %s, dds_by_tid=%s',
len(self.recs), len(self.recs),
len(self.dds_by_tid) len(self.dds_by_tid)
) )
@ -1474,11 +1398,3 @@ class InboundMailLogHandler(ReadLineHandler):
self.log_match('IGNORED', None, line) self.log_match('IGNORED', None, line)
def end_of_callbacks(self, thread):
'''overrides ReadLineHandler method
save incomplete records so we can pick up where we left off
'''
self.save_state()