diff --git a/management/reporting/capture/.gitignore b/management/reporting/capture/.gitignore index fcd595d6..2bfa6a4d 100644 --- a/management/reporting/capture/.gitignore +++ b/management/reporting/capture/.gitignore @@ -1,2 +1 @@ tests/ -run.sh diff --git a/management/reporting/capture/capture.py b/management/reporting/capture/capture.py index 47b56e0f..0a96152a 100755 --- a/management/reporting/capture/capture.py +++ b/management/reporting/capture/capture.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 from logs.TailFile import TailFile -from mail.InboundMailLogHandler import InboundMailLogHandler +from mail.PostfixLogHandler import PostfixLogHandler +from mail.DovecotLogHandler import DovecotLogHandler from logs.ReadPositionStoreInFile import ReadPositionStoreInFile from db.SqliteConnFactory import SqliteConnFactory from db.SqliteEventStore import SqliteEventStore @@ -50,6 +51,7 @@ options = { 'daemon': True, 'log_level': logging.WARNING, 'log_file': "/var/log/mail.log", + 'stop_at_eof': False, 'pos_file': "/var/lib/mailinabox/capture-pos.json", 'sqlite_file': os.path.join(CAPTURE_STORAGE_ROOT, 'capture.sqlite'), 'working_dir': "/var/run/mailinabox", @@ -122,6 +124,10 @@ def process_cmdline(options): elif arg=='-logfile' and have_next: argi+=1 options['log_file'] = sys.argv[argi] + + elif arg=='-stopateof': + argi+=1 + options['stop_at_eof'] = True elif arg=='-posfile' and have_next: argi+=1 @@ -220,14 +226,21 @@ try: ) mail_tail = TailFile( options['log_file'], - position_store + position_store, + options['stop_at_eof'] ) - inbound_mail_handler = InboundMailLogHandler( + postfix_log_handler = PostfixLogHandler( event_store, capture_enabled = options['config'].get('capture',True), drop_disposition = options['config'].get('drop_disposition') ) - mail_tail.add_handler(inbound_mail_handler) + mail_tail.add_handler(postfix_log_handler) + dovecot_log_handler = DovecotLogHandler( + event_store, + capture_enabled = options['config'].get('capture',True), + drop_disposition = options['config'].get('drop_disposition') + ) + mail_tail.add_handler(dovecot_log_handler) pruner = Pruner( db_conn_factory, policy=options['config']['prune_policy'] @@ -245,17 +258,6 @@ def terminate(sig, stack): log.warning("shutting down due to SIGTERM") log.debug("stopping mail_tail") mail_tail.stop() - log.debug("stopping pruner") - pruner.stop() - log.debug("stopping position_store") - position_store.stop() - log.debug("stopping event_store") - event_store.stop() - try: - os.remove(options['_runtime_config_file']) - except Exception: - pass - log.info("stopped") # reload settings handler def reload(sig, stack): @@ -266,8 +268,10 @@ def reload(sig, stack): options['config']['default_config'] = False options['_config_file'] = config_default_file - log.info('%s records are in-progress', - inbound_mail_handler.get_inprogress_count()) + log.info('%s mta records are in-progress', + postfix_log_handler.get_inprogress_count()) + log.info('%s imap records are in-progress', + dovecot_log_handler.get_inprogress_count()) if options['_config_file']: log.info('reloading %s', options['_config_file']) @@ -276,10 +280,16 @@ def reload(sig, stack): pruner.set_policy( newconfig['prune_policy'] ) - inbound_mail_handler.set_capture_enabled( + postfix_log_handler.set_capture_enabled( newconfig.get('capture', True) ) - inbound_mail_handler.update_drop_disposition( + postfix_log_handler.update_drop_disposition( + newconfig.get('drop_disposition', {}) + ) + dovecot_log_handler.set_capture_enabled( + newconfig.get('capture', True) + ) + dovecot_log_handler.update_drop_disposition( newconfig.get('drop_disposition', {}) ) write_config(options['_runtime_config_file'], newconfig) @@ -298,3 +308,15 @@ signal.signal(signal.SIGHUP, reload) mail_tail.start() mail_tail.join() +# gracefully close other threads +log.debug("stopping pruner") +pruner.stop() +log.debug("stopping position_store") +position_store.stop() +log.debug("stopping event_store") +event_store.stop() +try: + os.remove(options['_runtime_config_file']) +except Exception: + pass +log.info("stopped") diff --git a/management/reporting/capture/db/EventStore.py b/management/reporting/capture/db/EventStore.py index f97e8056..5c3c275c 100644 --- a/management/reporting/capture/db/EventStore.py +++ b/management/reporting/capture/db/EventStore.py @@ -74,7 +74,6 @@ class EventStore(Prunable): self.t.join() def __del__(self): - log.debug('EventStore __del__') self.interrupt.set() self.have_event.set() diff --git a/management/reporting/capture/db/SqliteEventStore.py b/management/reporting/capture/db/SqliteEventStore.py index 4457b810..f6e782e8 100644 --- a/management/reporting/capture/db/SqliteEventStore.py +++ b/management/reporting/capture/db/SqliteEventStore.py @@ -71,6 +71,23 @@ mta_delivery_fields = [ 'failure_category', ] +imap_conn_fields = [ + 'service', + 'service_tid', + 'connect_time', + 'disconnect_time', + 'disconnect_reason', + 'remote_host', + 'remote_ip', + 'sasl_method', + 'sasl_username', + 'remote_auth_success', + 'remote_auth_attempts', + 'connection_security', + 'in_bytes', + 'out_bytes', + 'disposition' +] db_info_create_table_stmt = "CREATE TABLE IF NOT EXISTS db_info(id INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT NOT NULL, value TEXT NOT NULL)" @@ -162,7 +179,33 @@ schema_updates = [ [ "ALTER TABLE mta_delivery ADD COLUMN orig_to TEXT COLLATE NOCASE", "UPDATE db_info SET value='1' WHERE key='schema_version'" - ] + ], + + # update 2 + [ + "CREATE TABLE imap_connection(\ + imap_conn_id INTEGER PRIMARY KEY AUTOINCREMENT,\ + service TEXT NOT NULL, /* 'imap', 'imap-login', 'pop3-login' */\ + service_tid TEXT,\ + connect_time TEXT NOT NULL,\ + disconnect_time TEXT,\ + disconnect_reason TEXT,\ + remote_host TEXT COLLATE NOCASE,\ + remote_ip TEXT COLLATE NOCASE,\ + sasl_method TEXT, /* eg. 'PLAIN' */\ + sasl_username TEXT COLLATE NOCASE,\ + remote_auth_success INTEGER, /* count of successes */\ + remote_auth_attempts INTEGER, /* count of attempts */\ + connection_security TEXT, /* eg 'TLS' */\ + in_bytes INTEGER,\ + out_bytes INTEGER,\ + disposition TEXT /* 'ok','failed_login_attempt',etc */\ + )", + + "CREATE INDEX idx_imap_connection_connect_time ON imap_connection(connect_time, sasl_username COLLATE NOCASE)", + + "UPDATE db_info SET value='2' WHERE key='schema_version'" + ], ] @@ -209,9 +252,12 @@ class SqliteEventStore(EventStore): def write_rec(self, conn, type, rec): - if type=='inbound_mail': - #log.debug('wrote inbound_mail record') - self.write_inbound_mail(conn, rec) + if type=='mta_mail': + self.write_mta_mail(conn, rec) + + elif type=='imap_mail': + self.write_imap_mail(conn, rec) + elif type=='state': ''' rec: { owner_id: int, @@ -246,7 +292,7 @@ class SqliteEventStore(EventStore): return values - def write_inbound_mail(self, conn, rec): + def write_mta_mail(self, conn, rec): c = None try: c = conn.cursor() @@ -282,6 +328,28 @@ class SqliteEventStore(EventStore): + def write_imap_mail(self, conn, rec): + c = None + try: + c = conn.cursor() + + # imap_connection + insert = self._insert('imap_connection', imap_conn_fields) + values = self._values(imap_conn_fields, rec) + #log.debug('INSERT: %s VALUES: %s REC=%s', insert, values, rec) + c.execute(insert, values) + conn_id = c.lastrowid + + conn.commit() + + except sqlite3.Error as e: + conn.rollback() + raise e + + finally: + if c: c.close(); c=None + + def write_state(self, conn, rec): c = None try: @@ -354,7 +422,9 @@ class SqliteEventStore(EventStore): JOIN mta_connection ON mta_connection.mta_conn_id = mta_accept.mta_conn_id\ WHERE connect_time < ?)', - 'DELETE FROM mta_connection WHERE connect_time < ?' + 'DELETE FROM mta_connection WHERE connect_time < ?', + + 'DELETE FROM imap_connection WHERE connect_time < ?', ] counts = [] diff --git a/management/reporting/capture/logs/ReadPositionStoreInFile.py b/management/reporting/capture/logs/ReadPositionStoreInFile.py index 15f4c786..51eabee8 100644 --- a/management/reporting/capture/logs/ReadPositionStoreInFile.py +++ b/management/reporting/capture/logs/ReadPositionStoreInFile.py @@ -29,7 +29,6 @@ class ReadPositionStoreInFile(ReadPositionStore): self.t.start() def __del__(self): - log.debug('ReadPositionStoreInFile __del__') self.interrupt.set() def stop(self): diff --git a/management/reporting/capture/logs/TailFile.py b/management/reporting/capture/logs/TailFile.py index 6882c51c..9ac614ae 100644 --- a/management/reporting/capture/logs/TailFile.py +++ b/management/reporting/capture/logs/TailFile.py @@ -14,12 +14,13 @@ of ReadLineHandler. ''' class TailFile(threading.Thread): - def __init__(self, log_file, store=None): + def __init__(self, log_file, store=None, stop_at_eof=False): ''' log_file - the log file to monitor store - a ReadPositionStore instance ''' self.log_file = log_file self.store = store + self.stop_at_eof = stop_at_eof self.fp = None self.inode = None @@ -31,7 +32,6 @@ class TailFile(threading.Thread): super(TailFile, self).__init__(name=name, daemon=True) def stop(self, do_join=True): - log.debug('TailFile stopping') self.interrupt.set() # close must be called to unblock the thread fp.readline() call self._close() @@ -72,15 +72,11 @@ class TailFile(threading.Thread): def _issue_callbacks(self, line): for cb in self.callbacks: - if isinstance(cb, ReadLineHandler): - cb.handle(line) - else: - cb(line) + cb.handle(line) def _notify_end_of_callbacks(self): for cb in self.callbacks: - if isinstance(cb, ReadLineHandler): - cb.end_of_callbacks(self) + cb.end_of_callbacks(self) def _restore_read_position(self): if self.fp is None: @@ -122,6 +118,9 @@ class TailFile(threading.Thread): line = self.fp.readline() # blocking if line=='': log.debug('got EOF') + if self.stop_at_eof: + self.interrupt.set() + # EOF - check if file was rotated if self._is_rotated(): log.debug('rotated') @@ -144,6 +143,7 @@ class TailFile(threading.Thread): self._issue_callbacks(line) except Exception as e: + log.error('exception processing line: %s', line) log.exception(e) if self.interrupt.wait(1) is not True: if self._is_rotated(): diff --git a/management/reporting/capture/mail/CommonHandler.py b/management/reporting/capture/mail/CommonHandler.py new file mode 100644 index 00000000..314b7ed5 --- /dev/null +++ b/management/reporting/capture/mail/CommonHandler.py @@ -0,0 +1,128 @@ +import logging +import re +import datetime +import traceback +import ipaddress +import threading +from logs.ReadLineHandler import ReadLineHandler +import logs.DateParser +from db.EventStore import EventStore +from util.DictQuery import DictQuery +from util.safe import (safe_int, safe_append, safe_del) + +log = logging.getLogger(__name__) + + +class CommonHandler(ReadLineHandler): + ''' + ''' + def __init__(self, state_cache_owner_id, record_store, + date_regexp = logs.DateParser.rsyslog_traditional_regexp, + date_parser_fn = logs.DateParser.rsyslog_traditional, + capture_enabled = True, + drop_disposition = None + ): + self.state_cache_owner_id = state_cache_owner_id + + ''' EventStore instance for persisting "records" ''' + self.record_store = record_store + self.set_capture_enabled(capture_enabled) + + # our in-progress record queue is a simple list + self.recs = self.get_cached_state(clear=True) + self.current_inprogress_recs = len(self.recs) + + # records that have these dispositions will be dropped (not + # recorded in the record store + self.drop_disposition_lock = threading.Lock() + self.drop_disposition = { + 'failed_login_attempt': False, + 'suspected_scanner': False, + 'reject': False + } + self.update_drop_disposition(drop_disposition) + + # regular expression that matches a syslog date (always anchored) + self.date_regexp = date_regexp + if date_regexp.startswith('^'): + self.date_regexp = date_regexp[1:] + + # function that parses the syslog date + self.date_parser_fn = date_parser_fn + + + + def get_inprogress_count(self): + ''' thread-safe ''' + return self.current_inprogress_recs + + def update_inprogress_count(self): + self.current_inprogress_recs = len(self.recs) + + def set_capture_enabled(self, capture_enabled): + ''' thread-safe ''' + self.capture_enabled = capture_enabled + + def update_drop_disposition(self, drop_disposition): + ''' thread-safe ''' + with self.drop_disposition_lock: + self.drop_disposition.update(drop_disposition) + + def test_drop_disposition(self, disposition): + with self.drop_disposition_lock: + return self.drop_disposition.get(disposition, False) + + def datetime_as_str(self, d): + # iso-8601 time friendly to sqlite3 + timestamp = d.isoformat(sep=' ', timespec='seconds') + # strip the utc offset from the iso format (ie. remove "+00:00") + idx = timestamp.find('+00:00') + if idx>0: + timestamp = timestamp[0:idx] + return timestamp + + def parse_date(self, str): + # we're expecting UTC times from date_parser() + d = self.date_parser_fn(str) + return self.datetime_as_str(d) + + def get_cached_state(self, clear=True): + conn = None + try: + # obtain the cached records from the record store + conn = self.record_store.connect() + recs = self.record_store.read_rec(conn, 'state', { + "owner_id": self.state_cache_owner_id, + "clear": clear + }) + log.info('read %s incomplete records from cache %s', len(recs), self.state_cache_owner_id) + + # eliminate stale records - "stale" should be longer than + # the "give-up" time for postfix (4-5 days) + stale = datetime.timedelta(days=7) + cutoff = self.datetime_as_str( + datetime.datetime.now(datetime.timezone.utc) - stale + ) + newlist = [ rec for rec in recs if rec['connect_time'] >= cutoff ] + if len(newlist) < len(recs): + log.warning('dropping %s stale incomplete records', + len(recs) - len(newlist)) + return newlist + finally: + if conn: self.record_store.close(conn) + + def save_state(self): + log.info('saving state to cache %s: %s records', self.state_cache_owner_id, len(self.recs)) + self.record_store.store('state', { + 'owner_id': self.state_cache_owner_id, + 'state': self.recs + }) + + def end_of_callbacks(self, thread): + '''overrides ReadLineHandler method + + save incomplete records so we can pick up where we left off + + ''' + self.update_inprogress_count() + self.save_state() diff --git a/management/reporting/capture/mail/DovecotLogHandler.py b/management/reporting/capture/mail/DovecotLogHandler.py new file mode 100644 index 00000000..a5360f9d --- /dev/null +++ b/management/reporting/capture/mail/DovecotLogHandler.py @@ -0,0 +1,534 @@ +import logging +import re +import datetime +import ipaddress +import threading +from logs.ReadLineHandler import ReadLineHandler +import logs.DateParser +from db.EventStore import EventStore +from util.DictQuery import DictQuery +from util.safe import (safe_int, safe_append, safe_del) +from .CommonHandler import CommonHandler + +log = logging.getLogger(__name__) + + +STATE_CACHE_OWNER_ID = 2 + +class DovecotLogHandler(CommonHandler): + ''' + ''' + def __init__(self, record_store, + date_regexp = logs.DateParser.rsyslog_traditional_regexp, + date_parser_fn = logs.DateParser.rsyslog_traditional, + capture_enabled = True, + drop_disposition = None + ): + super(DovecotLogHandler, self).__init__( + STATE_CACHE_OWNER_ID, + record_store, + date_regexp, + date_parser_fn, + capture_enabled, + drop_disposition + ) + + + # A "record" is composed by parsing all the syslog output from + # the activity generated by dovecot (imap, pop3) from a single + # remote connection. Once a full history of the connection, + # the record is written to the record_store. + # + # `recs` is an array holding incomplete, in-progress + # "records". This array has the following format: + # + # (For convenience, it's easier to refer to the table column + # names found in SqliteEventStore for the dict member names that + # are used here since they're all visible in one place.) + # + # [{ + # ... fields of the imap_connection table ... + # }] + # + # IMPORTANT: + # + # No methods in this class are safe to call by any thread + # other than the caller of handle(), unless marked as + # thread-safe. + # + + # maximum size of the in-progress record queue (should be the + # same or greater than the maximum simultaneous dovecot/imap + # connections allowed, which is dovecot settings + # `process_limit` times `client_limit`, which defaults to 100 + # * 1000) + self.max_inprogress_recs = 100 * 1000 + + + # 1a. imap-login: Info: Login: user=, method=PLAIN, rip=146.168.130.9, lip=192.155.92.185, mpid=5866, TLS, session= + # 1=date + # 2=service ("imap-login" or "pop3-login") + # 3=sasl_username ("user@domain.com") + # 4=sasl_method ("PLAIN") + # 5=remote_ip ("146.168.130.9") + # 6=local_ip + # 7=service_tid ("5866") + # 8=connection_security ("TLS") + self.re_connect_success = re.compile('^' + self.date_regexp + ' (imap-login|pop3-login): Info: Login: user=<([^>]*)>, method=([^,]*), rip=([^,]+), lip=([^,]+), mpid=([^,]+), ([^,]+)') + + # 1a. imap-login: Info: Disconnected (auth failed, 1 attempts in 4 secs): user=, method=PLAIN, rip=152.67.63.172, lip=192.155.92.185, TLS: Disconnected, session= + # 1=date + # 2=service ("imap-login" or "pop3-login") + # 3=dissconnect_reason + # 4=remote_auth_attempts + # 5=sasl_username + # 6=sasl_method + # 7=remote_ip + # 8=local_ip + # 9=connection_security + self.re_connect_fail_1 = re.compile('^' + self.date_regexp + ' (imap-login|pop3-login): Info: (?:Disconnected|Aborted login) \(([^,]+), (\d+) attempts[^\)]*\): user=<([^>]*)>, method=([^,]+), rip=([^,]+), lip=([^,]+), ([^,]+)') + + # 2a. pop3-login: Info: Disconnected (no auth attempts in 2 secs): user=<>, rip=196.52.43.85, lip=192.155.92.185, TLS handshaking: SSL_accept() failed: error:14209102:SSL routines:tls_early_post_process_client_hello:unsupported protocol, session= + # 2b. imap-login: Info: Disconnected (no auth attempts in 2 secs): user=<>, rip=92.118.160.61, lip=192.155.92.185, TLS handshaking: SSL_accept() failed: error:14209102:SSL routines:tls_early_post_process_client_hello:unsupported protocol, session= + # 1=date + # 2=service ("imap-login" or "pop3-login") + # 3=disconnect_reason + # 4=sasl_username ("") + # 5=remote_ip ("146.168.130.9") + # 6=local_ip + # 7=connection_security + self.re_connect_fail_2 = re.compile('^' + self.date_regexp + ' (imap-login|pop3-login): Info: (?:Disconnected|Aborted login) \(([^\)]*)\): user=<([^>]*)>, rip=([^,]+), lip=([^,]+), ([^,]+)') + + #3a. imap-login: Info: Disconnected (client didn't finish SASL auth, waited 0 secs): user=<>, method=PLAIN, rip=107.107.63.148, lip=192.155.92.185, TLS: SSL_read() syscall failed: Connection reset by peer, session= + # 1=date + # 2=service ("imap-login" or "pop3-login") + # 3=disconnect_reason + # 4=sasl_username ("") + # 5=sasl_method + # 6=remote_ip ("146.168.130.9") + # 7=local_ip + # 8=connection_security + self.re_connect_fail_3 = re.compile('^' + self.date_regexp + ' (imap-login|pop3-login): Info: (?:Disconnected|Aborted login) \(([^\)]*)\): user=<([^>]*)>, method=([^,]+), rip=([^,]+), lip=([^,]+), ([^,]+)') + + # 4a. pop3-login: Info: Disconnected: Too many bad commands (no auth attempts in 0 secs): user=<>, rip=83.97.20.35, lip=192.155.92.185, TLS, session= + # 1=date + # 2=service ("imap-login" or "pop3-login") + # 3=disconnect_reason + # 4=sasl_username ("") + # 5=remote_ip ("146.168.130.9") + # 6=local_ip + # 7=connection_security + self.re_connect_fail_4 = re.compile('^' + self.date_regexp + ' (imap-login|pop3-login): Info: Disconnected: ([^\(]+) \(no auth attempts [^\)]+\): user=<([^>]*)>, rip=([^,]+), lip=([^,]+), ([^,]+)') + + # 5a. imap-login: Info: Disconnected: Too many bad commands (auth failed, 1 attempts in 4 secs): user=, method=PLAIN, rip=152.67.63.172, lip=192.155.92.185, TLS: Disconnected, session= + # 1=date + # 2=service ("imap-login" or "pop3-login") + # 3=disconnect_reason + # 4=remote_auth_attempts + # 5=sasl_username + # 6=sasl_method + # 7=remote_ip + # 8=local_ip + # 9=connection_security + self.re_connect_fail_5 = re.compile('^' + self.date_regexp + ' (imap-login|pop3-login): Info: (?:Disconnected|Aborted login): ([^\(]+) \(auth failed, (\d+) attempts [^\)]+\): user=<([^>]*)>, method=([^,]+), rip=([^,]+), lip=([^,]+), ([^,]+)') + + + # 1a. imap(jzw@just1w.com): Info: Logged out in=29 out=496 + # + # 1b. imap(jzw@just1w.com): Info: Connection closed (IDLE running for 0.001 + waiting input for 5.949 secs, 2 B in + 10 B out, state=wait-input) in=477 out=6171 + # 1c. imap(jzw@just1w.com): Info: Connection closed (UID STORE finished 0.225 secs ago) in=8099 out=21714 + + # 1d. imap(jzw@just1w.com): Info: Connection closed (LIST finished 115.637 secs ago) in=629 out=11130 + # + # 1e. imap(jzw@just1w.com): Info: Connection closed (APPEND finished 0.606 secs ago) in=11792 out=10697 + # + # 1f. imap(jzw@just1w.com): Info: Disconnected for inactivity in=1518 out=2962 + # + # 1g. imap(keith@just1w.com): Info: Server shutting down. in=720 out=7287 + # 1=date + # 2=service ("imap" or "pop3") + # 3=sasl_username + # 4=disconnect_reason ("Disconnected for inactivity") + # 5=in_bytes + # 6=out_bytes + self.re_disconnect = re.compile('^' + self.date_regexp + ' (imap|pop3)\(([^\)]*)\): Info: ((?:Logged out|Connection closed|Disconnected|Server shutting down).*) in=(\d+) out=(\d+)') + + + + def add_new_connection(self, imap_conn): + ''' queue an imap_connection record ''' + threshold = self.max_inprogress_recs + ( len(self.recs) * 0.05 ) + if len(self.recs) > threshold: + backoff = len(self.recs) - self.max_inprogress_recs + int( self.max_inprogress_recs * 0.10 ) + log.warning('dropping %s old imap records', backoff) + self.recs = self.recs[min(len(self.recs),backoff):] + + self.recs.append(imap_conn) + return imap_conn + + def remove_connection(self, imap_conn): + ''' remove a imap_connection record from queue ''' + self.recs.remove(imap_conn) + + def find_by(self, imap_conn_q, debug=False): + '''find records using field-matching queries + + return a list of imap_conn matching query `imap_conn_q` + ''' + + if debug: + log.debug('imap_accept_q: %s', imap_accept_q) + + # find all candidate recs with matching imap_conn_q, ordered by most + # recent last + candidates = DictQuery.find(self.recs, imap_conn_q, reverse=False) + if len(candidates)==0: + if debug: log.debug('no candidates') + return [] + + elif not candidates[0]['exact']: + # there were no exact matches. apply autosets to the best + # match requiring the fewest number of autosets (index 0) + if debug: log.debug('no exact candidates') + DictQuery.autoset(candidates[0]) + candidates[0]['exact'] = True + candidates = [ candidates[0] ] + + else: + # there is at least one exact match - remove all non-exact + # candidates + candidates = [ + candidate for candidate in candidates if candidate['exact'] + ] + + return [ candidate['item'] for candidate in candidates ] + + + def find_first(self, *args, **kwargs): + '''find the "best" result and return it - find_by() returns the list + ordered, with the first being the "best" + + ''' + r = self.find_by(*args, **kwargs) + if len(r)==0: + return None + return r[0] + + def match_connect_success(self, line): + # 1=date + # 2=service ("imap-login" or "pop3-login") + # 3=sasl_username ("user@domain.com") + # 4=sasl_method ("PLAIN") + # 5=remote_ip ("146.168.130.9") + # 6=local_ip + # 7=service_tid ("5866") + # 8=connection_security ("TLS") + m = self.re_connect_success.search(line) + if m: + imap_conn = { + "connect_time": self.parse_date(m.group(1)), # "YYYY-MM-DD HH:MM:SS" + "service": m.group(2), + "sasl_username": m.group(3), + "sasl_method": m.group(4), + "remote_host": "unknown", + "remote_ip": m.group(5), + "service_tid": m.group(7), + "connection_security": m.group(8), + "remote_auth_success": 1, + "remote_auth_attempts": 1 + } + self.add_new_connection(imap_conn) + return { 'imap_conn': imap_conn } + + def match_connect_fail(self, line): + m = self.re_connect_fail_1.search(line) + if m: + # 1a. imap-login: Info: Disconnected (auth failed, 1 attempts in 4 secs): user=, method=PLAIN, rip=152.67.63.172, lip=192.155.92.185, TLS: Disconnected, session= + # 1=date + # 2=service ("imap-login" or "pop3-login") + # 3=dissconnect_reason + # 4=remote_auth_attempts + # 5=sasl_username + # 6=sasl_method + # 7=remote_ip + # 8=local_ip + # 9=connection_security + d = self.parse_date(m.group(1)) # "YYYY-MM-DD HH:MM:SS" + imap_conn = { + "connect_time": d, + "disconnect_time": d, + "disconnect_reason": m.group(3), + "service": m.group(2), + "sasl_username": m.group(5), + "sasl_method": m.group(6), + "remote_host": "unknown", + "remote_ip": m.group(7), + "connection_security": m.group(9), + "remote_auth_success": 0, + "remote_auth_attempts": int(m.group(4)) + } + self.add_new_connection(imap_conn) + return { 'imap_conn': imap_conn } + + m = self.re_connect_fail_2.search(line) + if m: + # 2a. pop3-login: Info: Disconnected (no auth attempts in 2 secs): user=<>, rip=196.52.43.85, lip=192.155.92.185, TLS handshaking: SSL_accept() failed: error:14209102:SSL routines:tls_early_post_process_client_hello:unsupported protocol, session= + # 1=date + # 2=service ("imap-login" or "pop3-login") + # 3=disconnect_reason + # 4=sasl_username ("") + # 5=remote_ip ("146.168.130.9") + # 6=local_ip + # 7=connection_security + d = self.parse_date(m.group(1)) # "YYYY-MM-DD HH:MM:SS" + imap_conn = { + "connect_time": d, + "disconnect_time": d, + "disconnect_reason": m.group(3), + "service": m.group(2), + "sasl_username": m.group(4), + "remote_host": "unknown", + "remote_ip": m.group(5), + "connection_security": m.group(7), + "remote_auth_success": 0, + "remote_auth_attempts": 0 + } + self.add_new_connection(imap_conn) + return { 'imap_conn': imap_conn } + + m = self.re_connect_fail_3.search(line) + if m: + #3a. imap-login: Info: Disconnected (client didn't finish SASL auth, waited 0 secs): user=<>, method=PLAIN, rip=107.107.63.148, lip=192.155.92.185, TLS: SSL_read() syscall failed: Connection reset by peer, session= + # 1=date + # 2=service ("imap-login" or "pop3-login") + # 3=disconnect_reason + # 4=sasl_username ("") + # 5=sasl_method + # 6=remote_ip ("146.168.130.9") + # 7=local_ip + # 8=connection_security + d = self.parse_date(m.group(1)) # "YYYY-MM-DD HH:MM:SS" + imap_conn = { + "connect_time": d, + "disconnect_time": d, + "disconnect_reason": m.group(3), + "service": m.group(2), + "sasl_username": m.group(4), + "sasl_method": m.group(5), + "remote_host": "unknown", + "remote_ip": m.group(6), + "connection_security": m.group(8), + "remote_auth_success": 0, + "remote_auth_attempts": 0 + } + self.add_new_connection(imap_conn) + return { 'imap_conn': imap_conn } + + m = self.re_connect_fail_4.search(line) + if m: + # 4a. pop3-login: Info: Disconnected: Too many bad commands (no auth attempts in 0 secs): user=<>, rip=83.97.20.35, lip=192.155.92.185, TLS, session= + # 1=date + # 2=service ("imap-login" or "pop3-login") + # 3=disconnect_reason + # 4=sasl_username ("") + # 5=remote_ip ("146.168.130.9") + # 6=local_ip + # 7=connection_security + d = self.parse_date(m.group(1)) # "YYYY-MM-DD HH:MM:SS" + imap_conn = { + "connect_time": d, + "disconnect_time": d, + "disconnect_reason": m.group(3), + "service": m.group(2), + "sasl_username": m.group(4), + "remote_host": "unknown", + "remote_ip": m.group(5), + "connection_security": m.group(6), + "remote_auth_success": 0, + "remote_auth_attempts": 0 + } + self.add_new_connection(imap_conn) + return { 'imap_conn': imap_conn } + + m = self.re_connect_fail_5.search(line) + if m: + # 5a. imap-login: Info: Disconnected: Too many bad commands (auth failed, 1 attempts in 4 secs): user=, method=PLAIN, rip=152.67.63.172, lip=192.155.92.185, TLS: Disconnected, session= + # 1=date + # 2=service ("imap-login" or "pop3-login") + # 3=disconnect_reason + # 4=remote_auth_attempts + # 5=sasl_username + # 6=sasl_method + # 7=remote_ip + # 8=local_ip + # 9=connection_security + d = self.parse_date(m.group(1)) # "YYYY-MM-DD HH:MM:SS" + imap_conn = { + "connect_time": d, + "disconnect_time": d, + "disconnect_reason": m.group(3), + "service": m.group(2), + "sasl_username": m.group(5), + "sasl_method": m.group(6), + "remote_host": "unknown", + "remote_ip": m.group(7), + "connection_security": m.group(9), + "remote_auth_success": 0, + "remote_auth_attempts": int(m.group(4)) + } + self.add_new_connection(imap_conn) + return { 'imap_conn': imap_conn } + + + def match_disconnect(self, line): + # 1=date + # 2=service ("imap" or "pop3") + # 3=sasl_username + # 4=disconnect_reason ("Logged out") + # 5=in_bytes + # 6=out_bytes + # + # NOTE: there is no way to match up the disconnect with the + # actual connection because Dovecot does not log a service_tid + # or an ip address or anything else that could be used to + # match the two up. We'll just assign the disconnect to the + # oldest connection for the user. + m = self.re_disconnect.search(line) + if m: + v = { + "service": m.group(2), + "disconnect_time": self.parse_date(m.group(1)), + "disconnect_reason": m.group(4), + "in_bytes": int(m.group(5)), + "out_bytes": int(m.group(6)) + } + imap_conn_q = [ + { 'key':'service', 'value':m.group(2) + '-login' }, + { 'key':'sasl_username', 'value':m.group(3), + 'ignorecase': True } + ] + log.debug(imap_conn_q) + + imap_conn = self.find_first(imap_conn_q) + if imap_conn: + imap_conn.update(v) + return { 'imap_conn': imap_conn } + return True + + + def store(self, imap_conn): + if 'disposition' not in imap_conn: + if imap_conn.get('remote_auth_success') == 0 and \ + imap_conn.get('remote_auth_attempts') == 0: + imap_conn.update({ + 'disposition': 'suspected_scanner', + }) + + elif imap_conn.get('remote_auth_success') == 0 and \ + imap_conn.get('remote_auth_attempts', 0) > 0: + imap_conn.update({ + 'disposition': 'failed_login_attempt', + }) + + elif imap_conn.get('connection_security') != 'TLS' and \ + imap_conn.get('remote_ip') != '127.0.0.1': + imap_conn.update({ + 'disposition': 'insecure' + }) + + else: + imap_conn.update({ + 'disposition': 'ok', + }) + + drop = self.test_drop_disposition(imap_conn['disposition']) + + if not drop: + log.debug('store: %s', imap_conn) + try: + self.record_store.store('imap_mail', imap_conn) + except Exception as e: + log.exception(e) + + self.remove_connection(imap_conn) + + + def log_match(self, match_str, match_result, line): + if match_result is True: + log.info('%s [unmatched]: %s', match_str, line) + + elif match_result: + if match_result.get('deferred', False): + log.debug('%s [deferred]: %s', match_str, line) + + elif 'imap_conn' in match_result: + log.debug('%s: %s: %s', match_str, line, match_result['imap_conn']) + else: + log.error('no imap_conn in match_result: ', match_result) + else: + log.debug('%s: %s', match_str, line) + + + def test_end_of_rec(self, match_result): + if not match_result or match_result is True or match_result.get('deferred', False): + return False + return self.end_of_rec(match_result['imap_conn']) + + def end_of_rec(self, imap_conn): + '''a client must be disconnected for the record to be "complete" + + ''' + if 'disconnect_time' not in imap_conn: + return False + + return True + + + def handle(self, line): + '''overrides ReadLineHandler method + + This function is called by the main log reading thread in + TailFile. All additional log reading is blocked until this + function completes. + + The storage engine (`record_store`, a SqliteEventStore + instance) does not block, so this function will return before + the record is saved to disk. + + IMPORTANT: + + The data structures and methods in this class are not thread + safe. It is not okay to call any of them when the instance is + registered with TailFile. + + ''' + if not self.capture_enabled: + return + + self.update_inprogress_count() + + log.debug('imap recs in progress: %s', len(self.recs)) + + match = self.match_connect_success(line) + if match: + self.log_match('connect', match, line) + return + + match = self.match_connect_fail(line) + if match: + self.log_match('connect_fail', match, line) + if self.test_end_of_rec(match): + # we're done - not queued and disconnected ... save it + self.store(match['imap_conn']) + return + + match = self.match_disconnect(line) + if match: + self.log_match('disconnect', match, line) + if self.test_end_of_rec(match): + # we're done - not queued and disconnected ... save it + self.store(match['imap_conn']) + return + + self.log_match('IGNORED', None, line) + diff --git a/management/reporting/capture/mail/InboundMailLogHandler.py b/management/reporting/capture/mail/PostfixLogHandler.py similarity index 93% rename from management/reporting/capture/mail/InboundMailLogHandler.py rename to management/reporting/capture/mail/PostfixLogHandler.py index 97cf6092..72c382cf 100644 --- a/management/reporting/capture/mail/InboundMailLogHandler.py +++ b/management/reporting/capture/mail/PostfixLogHandler.py @@ -10,6 +10,7 @@ from db.EventStore import EventStore from util.DictQuery import DictQuery from util.safe import (safe_int, safe_append, safe_del) from .PostfixLogParser import PostfixLogParser +from .CommonHandler import CommonHandler log = logging.getLogger(__name__) @@ -17,7 +18,7 @@ log = logging.getLogger(__name__) STATE_CACHE_OWNER_ID = 1 -class InboundMailLogHandler(ReadLineHandler): +class PostfixLogHandler(CommonHandler): ''' ''' def __init__(self, record_store, @@ -26,9 +27,20 @@ class InboundMailLogHandler(ReadLineHandler): capture_enabled = True, drop_disposition = None ): - ''' EventStore instance for persisting "records" ''' - self.record_store = record_store - self.set_capture_enabled(capture_enabled) + super(PostfixLogHandler, self).__init__( + STATE_CACHE_OWNER_ID, + record_store, + date_regexp, + date_parser_fn, + capture_enabled, + drop_disposition + ) + + + # maximum size of the in-progress record queue + self.current_inprogress_recs = len(self.recs) + self.max_inprogress_recs = 100 + # A "record" is composed by parsing all the syslog output from # the activity generated by the MTA (postfix) from a single @@ -66,31 +78,6 @@ class InboundMailLogHandler(ReadLineHandler): # thread-safe. # - # our in-progress record queue is a simple list - self.recs = self.get_cached_state(clear=True) - - # maximum size of the in-progress record queue - self.current_inprogress_recs = len(self.recs) - self.max_inprogress_recs = 100 - - # records that have these dispositions will be dropped (not - # recorded in the record store - self.drop_disposition_lock = threading.Lock() - self.drop_disposition = { - 'failed_login_attempt': False, - 'suspected_scanner': False, - 'reject': False - } - self.update_drop_disposition(drop_disposition) - - # regular expression that matches a syslog date (always anchored) - self.date_regexp = date_regexp - if date_regexp.startswith('^'): - self.date_regexp = date_regexp[1:] - - # function that parses the syslog date - self.date_parser_fn = date_parser_fn - # 1. 1a. postfix/smtpd[13698]: connect from host.tld[1.2.3.4] # 1=date @@ -233,70 +220,9 @@ class InboundMailLogHandler(ReadLineHandler): - def set_capture_enabled(self, capture_enabled): - ''' thread-safe ''' - self.capture_enabled = capture_enabled - - def update_drop_disposition(self, drop_disposition): - ''' thread-safe ''' - with self.drop_disposition_lock: - self.drop_disposition.update(drop_disposition) - def get_inprogress_count(self): ''' thread-safe ''' return self.current_inprogress_recs - - - def datetime_as_str(self, d): - # iso-8601 time friendly to sqlite3 - timestamp = d.isoformat(sep=' ', timespec='seconds') - # strip the utc offset from the iso format (ie. remove "+00:00") - idx = timestamp.find('+00:00') - if idx>0: - timestamp = timestamp[0:idx] - return timestamp - - def parse_date(self, str): - # we're expecting UTC times from date_parser() - d = self.date_parser_fn(str) - return self.datetime_as_str(d) - - def get_cached_state(self, clear=True): - conn = None - try: - # obtain the cached records from the record store - conn = self.record_store.connect() - recs = self.record_store.read_rec(conn, 'state', { - "owner_id": STATE_CACHE_OWNER_ID, - "clear": clear - }) - log.info('read %s incomplete records from cache', len(recs)) - - # eliminate stale records - "stale" should be longer than - # the "give-up" time for postfix (4-5 days) - stale = datetime.timedelta(days=7) - cutoff = self.datetime_as_str( - datetime.datetime.now(datetime.timezone.utc) - stale - ) - newlist = [ rec for rec in recs if rec['connect_time'] >= cutoff ] - if len(newlist) < len(recs): - log.warning('dropping %s stale incomplete records', - len(recs) - len(newlist)) - return newlist - finally: - if conn: self.record_store.close(conn) - - def save_state(self): - log.info('saving state to cache: %s records', len(self.recs)) - conn = None - try: - conn = self.record_store.connect() - self.record_store.write_rec(conn, 'state', { - 'owner_id': STATE_CACHE_OWNER_ID, - 'state': self.recs - }) - finally: - if conn: self.record_store.close(conn) def add_new_connection(self, mta_conn): @@ -304,7 +230,7 @@ class InboundMailLogHandler(ReadLineHandler): threshold = self.max_inprogress_recs + ( len(self.recs) * 0.05 ) if len(self.recs) > threshold: backoff = len(self.recs) - self.max_inprogress_recs + int( self.max_inprogress_recs * 0.10 ) - log.warning('dropping %s old records', backoff) + log.warning('dropping %s old mta records', backoff) self.recs = self.recs[min(len(self.recs),backoff):] self.recs.append(mta_conn) @@ -1314,14 +1240,12 @@ class InboundMailLogHandler(ReadLineHandler): 'disposition': 'ok', }) - drop = False - with self.drop_disposition_lock: - drop = self.drop_disposition.get(mta_conn['disposition'], False) - + drop = self.test_drop_disposition(mta_conn['disposition']) + if not drop: log.debug('store: %s', mta_conn) try: - self.record_store.store('inbound_mail', mta_conn) + self.record_store.store('mta_mail', mta_conn) except Exception as e: log.exception(e) @@ -1389,9 +1313,9 @@ class InboundMailLogHandler(ReadLineHandler): if not self.capture_enabled: return - self.current_inprogress_recs = len(self.recs) + self.update_inprogress_count() - log.debug('recs in progress: %s, dds_by_tid=%s', + log.debug('mta recs in progress: %s, dds_by_tid=%s', len(self.recs), len(self.dds_by_tid) ) @@ -1474,11 +1398,3 @@ class InboundMailLogHandler(ReadLineHandler): self.log_match('IGNORED', None, line) - def end_of_callbacks(self, thread): - '''overrides ReadLineHandler method - - save incomplete records so we can pick up where we left off - - ''' - self.save_state() -