import logging
import re
import datetime
import traceback
import ipaddress
import threading
from logs.ReadLineHandler import ReadLineHandler
import logs.DateParser
from db.EventStore import EventStore
from util.DictQuery import DictQuery
from util.safe import (safe_int, safe_append, safe_del)
from .PostfixLogParser import PostfixLogParser

log = logging.getLogger(__name__)


STATE_CACHE_OWNER_ID = 1


class InboundMailLogHandler(ReadLineHandler):
    '''
    '''
    def __init__(self, record_store,
                 date_regexp = logs.DateParser.rsyslog_traditional_regexp,
                 date_parser_fn = logs.DateParser.rsyslog_traditional,
                 capture_enabled = True,
                 drop_disposition = None
    ):
        ''' EventStore instance for persisting "records" '''
        self.record_store = record_store
        self.set_capture_enabled(capture_enabled)

        # A "record" is composed by parsing all the syslog output from
        # the activity generated by the MTA (postfix) from a single
        # remote connection. Once a full history of the connection,
        # including delivery of any messages is complete, the record
        # is written to the record_store.
        #
        # `recs` is an array holding incomplete, in-progress
        # "records". This array has the following format:
        #
        # (For convenience, it's easier to refer to the table column
        # names found in SqliteEventStore for the dict member names that
        # are used here since they're all visible in one place.)
        #
        # [{
        #   ... fields of the mta_connection table ...
        #   mta_accept: [
        #      {
        #         ... fields of the mta_accept table ...
        #         mta_delivery: [
        #             {
        #                ... fields of the mta_delivery table ...
        #             },
        #             ...
        #         ],
        #      },
        #      ...
        #   ],
        # }]
        #
        # IMPORTANT:
        #
        # No methods in this class are safe to call by any thread
        # other than the caller of handle(), unless marked as
        # thread-safe.
        #

        # our in-progress record queue is a simple list
        self.recs = self.get_cached_state(clear=True)

        # maximum size of the in-progress record queue
        self.current_inprogress_recs = len(self.recs)
        self.max_inprogress_recs = 100

        # records that have these dispositions will be dropped (not
        # recorded in the record store
        self.drop_disposition_lock = threading.Lock()
        self.drop_disposition = {
            'failed_login_attempt': False,
            'suspected_scanner': False,
            'reject': False
        }
        self.update_drop_disposition(drop_disposition)

        # regular expression that matches a syslog date (always anchored)
        self.date_regexp = date_regexp
        if date_regexp.startswith('^'):
            self.date_regexp = date_regexp[1:]

        # function that parses the syslog date
        self.date_parser_fn = date_parser_fn
        
        
        # 1. 1a. postfix/smtpd[13698]: connect from host.tld[1.2.3.4]
        #    1=date
        #    2=service ("submission/smptd" or "smtpd")
        #    3=service_tid
        #    4=remote_host
        #    5=remote_ip
        self.re_connect_from = re.compile('^' + self.date_regexp + ' [^ ]+ postfix/(submission/smtpd|smtpd)\[(\d+)\]: connect from ([^\[]+)\[([^\]]+)\]')

        # 1b. Dec  6 07:01:39 mail postfix/pickup[7853]: A684B1F787: uid=0 from=<root>
        #    1=date
        #    2=service ("pickup")
        #    3=service_tid
        #    4=postfix_msg_id
        self.re_local_pickup = re.compile('^' + self.date_regexp + ' [^ ]+ postfix/(pickup)\[(\d+)\]: ([A-F0-9]+): ')

        # 2. policyd-spf[13703]: prepend Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=1.2.3.4 helo=host.tld; envelope-from=alice@post.com; receiver=<UNKNOWN>
        #    1=spf_tid
        #    2=spf_result
        #    3=spf_reason "(mailfrom)"
        self.re_spf_1 = re.compile('policyd-spf\[(\d+)\]: prepend Received-SPF: ([^ ]+) (\([^\)]+\)){0,1}')

        # 2a. policyd-spf[26231]: 550 5.7.23 Message rejected due to: Receiver policy for SPF Softfail. Please see http://www.openspf.net/Why?s=mfrom;id=test@google.com;ip=1.2.3.4;r=<UNKNOWN>
        #   1=spf_tid
        #   2=spf_reason
        self.re_spf_2 = re.compile('policyd-spf\[(\d+)\]: (\d\d\d \d+\.\d+\.\d+ Message rejected [^;]+)')

        # 3. Dec  9 14:46:57 mail postgrey[879]: action=greylist, reason=new, client_name=host.tld, client_address=1.2.3.4/32, sender=alice@post.com, recipient=mia@myhost.com
        # 3a. Dec  6 18:31:28 mail postgrey[879]: 0E98D1F787: action=pass, reason=triplet found, client_name=host.tld, client_address=1.2.3.4/32, sender=alice@post.com, recipient=mia@myhost.com
        #   1=postgrey_tid
        #   2=postfix_msg_id (re-1 only)
        self.re_postgrey_1 = re.compile('postgrey\[(\d+)\]: ([A-F0-9]+):')
        self.re_postgrey_2 = re.compile('postgrey\[(\d+)\]:')
        
        # 4b. postfix/submission/smtpd[THREAD-ID]: NOQUEUE: reject: RCPT from unknown[1.2.3.4]: 550 5.7.23 <alice@somedomain.com>: Recipient address rejected: Message rejected due to: Receiver policy for SPF Softfail. Please see http://www.openspf.net/Why?s=mfrom;id=test@google.com;ip=10.0.2.15;r=<UNKNOWN>; from=<test@google.com> to=<alice@somedomain.com> proto=ESMTP helo=<qa2.abc.com>
        # 4c. postfix/smtpd[THREAD-ID]: "NOQUEUE: reject: ....: Recipient address rejected: Greylisted, seehttp://postgrey.../; ..."
        #   1=service ("submission/smtpd" or "smtpd")
        #   2=service_tid
        #   3=accept_status ("reject")
        self.re_postfix_noqueue = re.compile('postfix/(submission/smtpd|smtpd)\[(\d+)\]: NOQUEUE: ([^:]+): ')
        
        # 4. postfix/smtpd[THREAD-ID]: "POSTFIX-MSG-ID" (eg: "DD95A1F796"): client=DNS[IP]
        # 4a. postfix/submission/smtpd[THREAD-ID]: POSTFIX-MSG-ID: client=DNS[IP], sasl_method=LOGIN, sasl_username=mia@myhost.com
        #   1=service ("submission/smtpd" or "smtpd")
        #   2=service_tid
        #   3=postfix_msg_id
        self.re_postfix_msg_id = re.compile('postfix/(submission/smtpd|smtpd)\[(\d+)\]: ([A-F0-9]+):')

        # 5. Dec 10 06:48:48 mail postfix/cleanup[7435]: 031AF20076: message-id=<20201210114848.031AF20076@myhost.com>
        #   1=postfix_msg_id
        #   2=message_id
        self.re_postfix_message_id = re.compile('postfix/cleanup\[\d+\]: ([A-F0-9]+): message-id=(<[^>]*>)')

        # 6. opendkim: POSTFIX-MSG-ID: <result>
        # Dec  6 08:21:33 mail opendkim[6267]: DD95A1F796: s=pf2014 d=github.com SSL
        # SSL:
        #  source: https://sourceforge.net/p/opendkim/git/ci/master/tree/opendkim/opendkim.c
        #  function: dkimf_log_ssl_errors(), output can be:
        #     s=pf2014 d=github.com SSL <error-msg ...>
        #     SSL <error-msg ...>
        #  if <error-msg> is empty, no error
        
        #   1=postfix_msg_id
        #   2=verification detail
        #   3=error-msg
        self.re_opendkim_ssl = re.compile('opendkim\[\d+\]: ([A-F0-9]+):(.*) SSL ?(.*)')
        #   1=postfix_msg_id
        #   2=error-msg
        self.re_opendkim_error = re.compile('opendkim\[\d+\]: ([A-F0-9]+): (.*)')

        # 7. opendmarc: POSTFIX-MSG-ID: result: [pass/fail]
        # Dec  6 08:21:33 mail opendmarc[729]: DD95A1F796 ignoring Authentication-Results at 18 from mx.google.com
        # Dec  6 08:21:33 mail opendmarc[729]: DD95A1F796: github.com pass
        # Dec  6 13:46:30 mail opendmarc[729]: 0EA8F1FB12: domain.edu fail
        #    1=postfix_msg_id
        #    2=domain
        #    3="pass","none","fail"
        self.re_opendmarc_result = re.compile('opendmarc\[\d+\]: ([A-F0-9]+): ([^ ]+) ([^\s]+)')

        
        # 13. postfix/qmgr: POSTFIX-MSG-ID: "removed"
        # Dec 11 08:30:15 mail postfix/qmgr[9021]: C01F71F787: removed
        #    1=date
        #    2=postfix_msg_id
        self.re_queue_removed = re.compile('^' + self.date_regexp + ' [^ ]+ postfix/qmgr\[\d+\]: ([A-F0-9]+): removed')

        # 8. postfix/qmgr: POSTFIX-MSG-ID: from=user@tld, size=N, nrcpt=1 (queue active)
        #    1=date
        #    2=postfix_msg_id
        #    3=all comma-separated key-value pairs (must be split)
        self.re_queue_added = re.compile('^' + self.date_regexp + ' [^ ]+ postfix/qmgr\[\d+\]: ([A-F0-9]+): (?!removed)')
        
        # 11. spampd: "clean message <MESSAGE-ID>|(unknown) (SCORE/MAX-SCORE) from <FROM> for <user@tld> in 1.51s, N bytes"
        # 11(a) spampd: "identified spam <MESSAGE-ID>|(unknown) (5.12/5.00) from <FROM> for <user@tld> in 1.51s, N bytes"
        #    1=spam_tid
        #    2="clean message" | "identified spam" (other?)
        #    3=message_id ("(unknown)" if message id is "<>")
        #    4=spam_score
        #    5=envelope_from
        #    6=rcpt_to
        self.re_spampd = re.compile('spampd\[(\d+)\]: (clean message|identified spam) (<[^>]*>|\(unknown\)) \((-?\d+\.\d+)/\d+\.\d+\) from <([^>]+)> for <([^>]+)>')


        # 10. postfix/smtpd[THREAD-ID]: disconnect-from: [starttls=1,auth=0]
        # 10a. postfix/submission/smtpd[THREAD-ID]: disconnect-from: [starttls=1,auth=1]
        #    1=date
        #    2=service ("submission/smptd" or "smtpd")
        #    3=service_tid
        #    4=remote_host
        #    5=remote_ip
        self.re_disconnect = re.compile('^' + self.date_regexp + ' [^ ]+ postfix/(submission/smtpd|smtpd)\[(\d+)\]: disconnect from ([^\[]+)\[([^\]]+)\]')


        # postfix/smtp[18333]: Trusted TLS connection established to mx01.mail.icloud.com[17.57.154.23]:25: TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)
        # postfix/smtp[566]: Untrusted TLS connection established to xyz.tld[1.2.3.4]:25: TLSv1.2 with cipher AES128-GCM-SHA256 (128/128 bits)
        # postfix/smtp[15125]: Verified TLS connection established to mx1.comcast.net[96.114.157.80]:25: TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)
        #    1=service ("smtp")
        #    2=service_tid
        #    3=delivery_connection ("trusted", "untrusted", "verified")
        #    4=tls details
        self.re_pre_delivery = re.compile('postfix/(smtp)\[(\d+)\]: ([^ ]+) (TLS connection established.*)')
        
        # 12. postfix/lmtp: POSTFIX-MSG-ID: to=<user@tld>, relay=127.0.0.1[127.0.0.1]:10025, delay=4.7, delays=1/0.01/0.01/3.7, dsn=2.0.0, status=sent (250 2.0.0 <user@domain.tld> YB5nM1eS01+lSgAAlWWVsw Saved)
        # 12a. postfix/lmtp: POSTFIX_MSG-ID: to=user@tld, status=bounced (host...said...550 5.1.1 <user@tld> User doesn't exist ....)
        # 12b. postfix/smtp[32052]: A493B1FAF1: to=<guy80@where.net>, relay=mx.where.net[64.65.66.104]:25, delay=1.2, delays=0.65/0.06/0.4/0.09, dsn=2.0.0, status=sent (250 2.0.0 OK 7E/38-26906-CDC5DCF5): None

        #    1=system ("lmtp" or "smtp")
        #    2=system_tid
        #    3=postfix_msg_id
        self.re_delivery = re.compile('postfix/(lmtp|smtp)\[(\d+)\]: ([A-F0-9]+): ')
        
        # 13. postfix/bounce: POSTFIX-MSG-ID: "sender non-delivery notification"

        # key=postfix_msg_id, value=reference to a record in `recs`
        self.index_postfix_msg_id = {}

        # deferred delivery settings indexed by delivery service tid
        # key=service_tid value={ settings:{} }
        self.dds_by_tid = {}



    def set_capture_enabled(self, capture_enabled):
        ''' thread-safe '''
        self.capture_enabled = capture_enabled

    def update_drop_disposition(self, drop_disposition):
        ''' thread-safe '''
        with self.drop_disposition_lock:
            self.drop_disposition.update(drop_disposition)

    def get_inprogress_count(self):
        ''' thread-safe '''
        return self.current_inprogress_recs


    def datetime_as_str(self, d):
        # iso-8601 time friendly to sqlite3
        timestamp = d.isoformat(sep=' ', timespec='seconds')
        # strip the utc offset from the iso format (ie. remove "+00:00")
        idx = timestamp.find('+00:00')
        if idx>0:
            timestamp = timestamp[0:idx]
        return timestamp
        
    def parse_date(self, str):
        # we're expecting UTC times from date_parser()
        d = self.date_parser_fn(str)
        return self.datetime_as_str(d)

    def get_cached_state(self, clear=True):
        conn = None
        try:
            # obtain the cached records from the record store
            conn = self.record_store.connect()
            recs = self.record_store.read_rec(conn, 'state', {
                "owner_id": STATE_CACHE_OWNER_ID,
                "clear": clear
            })
            log.info('read %s incomplete records from cache', len(recs))

            # eliminate stale records - "stale" should be longer than
            # the "give-up" time for postfix (4-5 days)
            stale = datetime.timedelta(days=7)
            cutoff = self.datetime_as_str(
                datetime.datetime.now(datetime.timezone.utc) - stale
            )            
            newlist = [ rec for rec in recs if rec['connect_time'] >= cutoff ]
            if len(newlist) < len(recs):
                log.warning('dropping %s stale incomplete records',
                            len(recs) - len(newlist))
            return newlist
        finally:
            if conn: self.record_store.close(conn)

    def save_state(self):
        log.info('saving state to cache: %s records', len(self.recs))
        conn = None
        try:
            conn = self.record_store.connect()
            self.record_store.write_rec(conn, 'state', {
                'owner_id': STATE_CACHE_OWNER_ID,
                'state': self.recs
            })
        finally:
            if conn: self.record_store.close(conn)        
        

    def add_new_connection(self, mta_conn):
        ''' queue a mta_connection record '''
        threshold = self.max_inprogress_recs + ( len(self.recs) * 0.05 )
        if len(self.recs) > threshold:
            backoff = len(self.recs) - self.max_inprogress_recs + int( self.max_inprogress_recs * 0.10 )
            log.warning('dropping %s old records', backoff)
            self.recs = self.recs[min(len(self.recs),backoff):]
            
        self.recs.append(mta_conn)
        return mta_conn

    def remove_connection(self, mta_conn):
        ''' remove a mta_connection record from queue '''
        for mta_accept in mta_conn.get('mta_accept', []):
            # remove reference in index_postfix_msg_id
            postfix_msg_id = mta_accept.get('postfix_msg_id')
            safe_del(self.index_postfix_msg_id, postfix_msg_id)
            
            # remove collected pre-delivery data
            log.debug(mta_accept.get('_delete_dds_tids'))
            for service_tid in mta_accept.get('_delete_dds_tids', {}):
                safe_del(self.dds_by_tid, service_tid)

        self.recs.remove(mta_conn)

    def failure_category(self, reason, default_value):
        if not reason:
            return default_value
        if "Greylisted" in reason:
            return 'greylisted'
        if 'SPF' in reason:
            return 'spf'
        if 'Sender address' in reason:
            return 'sender_rejected'
        if 'Relay' in reason:
            return 'relay'
        if 'spamhaus' in reason:
            return 'spamhaus'
        if 'Service unavailable' in reason:
            return 'service_unavailable'
        log.debug('no failure_category for: "%s"', reason)
        return default_value
    
        
    def append_mta_accept(self, mta_conn, vals=None):
        ''' associate a mta_accept record with a connection '''
        mta_accept = vals or {}
        safe_append(mta_conn, 'mta_accept', mta_accept)
        return mta_accept
    

    def find_by(self, mta_conn_q, mta_accept_q, auto_add=False, debug=False):
        '''find records using field-matching queries

        If mta_accept_q is None: return a list of mta_conn matching query
        `mta_conn_q`

        If `mta_accept_q` is given: return a list of
        mta_conn,mta_accept pairs matching queries `mta_conn_q` and
        `mta_accept_q` respectively.

        If there is no mta_accept record matching `mta_accept_q` and
        `mta_accept_q['autoset']` is true and there is an mta_accept record
        having no key as specified in mta_accept_q['key'], then
        automatically add one and return that mta_accept record.

        If `auto_add` is true, and there is no matching mta_accept
        record matching `mta_accept_q`, and no acceptable record
        exists in the case where `autoset` is true, then add a new
        mta_accept record with the key/value pair of `mta_accept_q` to
        the most recent mta_conn.

        '''
                    
        if debug:
            log.debug('mta_accept_q: %s', mta_accept_q)
            
        # find all candidate recs with matching mta_conn_q, ordered by most
        # recent last
        candidates = DictQuery.find(self.recs, mta_conn_q, reverse=False)
        if len(candidates)==0:
            if debug: log.debug('no candidates')
            return []
        
        elif not candidates[0]['exact']:
            # there were no exact matches. apply autosets to the best
            # match requiring the fewest number of autosets (index 0)
            if debug: log.debug('no exact candidates')
            DictQuery.autoset(candidates[0])
            candidates[0]['exact'] = True
            candidates = [ candidates[0] ]
            
        else:
            # there is at least one exact match - remove all non-exact
            # candidates
            candidates = [
                candidate for candidate in candidates if candidate['exact']
            ]
            
        if mta_accept_q is None:
            return [ (candidate['item'], None) for candidate in candidates ]
        elif type(mta_accept_q) is not list:
            mta_accept_q = [ mta_accept_q ]
            
        count_of_nonoptional_accept_q_items = 0
        for q in mta_accept_q:
            if not q.get('autoset') and not q.get('optional'):
                count_of_nonoptional_accept_q_items += 1
        
        if debug:
            log.debug('candidates: qty=%s', len(candidates))
            
        # for each candidate, issue the mta_accept_q query and assign
        # a rank to each candidate. keep track of the best `mta_accept`
        # record match
        idx = 0
        while idx<len(candidates):
            candidate = candidates[idx]
            accept_candidates=DictQuery.find(candidate['item'].get('mta_accept'), mta_accept_q, reverse=True)
            if len(accept_candidates)>0:
                if accept_candidates[0]['exact']:
                    # exact match
                    if debug:
                        log.debug('exact match accept=%s',accept_candidates[0])
                    candidate['best_accept'] = accept_candidates[0]
                    candidate['best_rank'] = '00000.{0:08d}'.format(idx)

                else:
                    candidate['best_accept'] = accept_candidates[0]
                    candidate['best_rank'] = '{0:05d}.{1:08d}'.format(
                        len(accept_candidates[0]['autoset_list']),
                        idx
                    )
            elif 'mta_accept' not in candidate['item'] and \
                 count_of_nonoptional_accept_q_items>0:
                # for auto-add: when no candidate has any mta_accept
                # matches, prefer a candidate that failed to meet the
                # query requirements because there was nothing to
                # query, versus adding another mta_accept record to an
                # existing list that didn't meet the query
                # requirements
                candidate['best_rank'] = '99998.{0:08d}'.format(idx)
            else:
                candidate['best_rank'] = '99999.{0:08d}'.format(idx)

            if debug:
                log.debug('candidate %s result: %s', idx, candidate)

            idx+=1

        # sort the candidates by least # autoset's required
        candidates.sort(key=lambda x: x['best_rank'])

        if 'best_accept' in candidates[0]:
            # at least one match was successful. if the best candidate
            # wasn't exact, apply autosets
            if not candidates[0]['best_accept']['exact']:
                DictQuery.autoset(candidates[0]['best_accept'])
                if debug:
                    log.debug('best match (auto-set) accept=%s',candidates[0]['best_accept']['item'])
                return [ (candidates[0]['item'], candidates[0]['best_accept']['item']) ]
            # otherwise return all exact matches
            else:
                rtn = []
                for candidate in candidates:
                    if not 'best_accept' in candidate:
                        break
                    best_accept = candidate['best_accept']
                    if not best_accept['exact']:
                        break
                    rtn.append( (candidate['item'], best_accept['item']) )
                if debug:
                    log.debug('best matches (exact)=%s', rtn)
                return rtn

        # if autoset is not possible, and there are no exact matches,
        # add a new accept record to the highest ranked candidate
        if auto_add:
            if debug: log.debug('auto-add new accept')
            mta_conn = candidates[0]['item']
            v = {}
            for q in mta_accept_q:
                v[q['key']] = q['value']
            mta_accept = self.append_mta_accept(mta_conn, v)
            return [ (mta_conn, mta_accept) ]

        if debug: log.debug("no matches")
        return []


    def find_first(self, *args, **kwargs):
        '''find the "best" result and return it - find_by() returns the list
        ordered, with the first being the "best"

        '''
        r = self.find_by(*args, **kwargs)
        if len(r)==0:
            return (None, None)
        return r[0]
    
    def index_by_postfix_msg_id(self, mta_conn, mta_accept):
        postfix_msg_id = mta_accept['postfix_msg_id']
        assert postfix_msg_id not in self.index_postfix_msg_id
        self.index_postfix_msg_id[postfix_msg_id] = {
            'mta_conn': mta_conn,
            'mta_accept': mta_accept
        }

    def find_by_postfix_msg_id(self, postfix_msg_id):
        '''postfix message id's are unique and we maintain a separate index
        for them to give constant-time lookups since many log entries report
        this id

        '''
        if postfix_msg_id in self.index_postfix_msg_id:
            cache_val = self.index_postfix_msg_id[postfix_msg_id]
            return cache_val['mta_conn'], cache_val['mta_accept']
        else:
            msg = 'postfix_msg_id "%s" not cached' % postfix_msg_id
            # if log.isEnabledFor(logging.DEBUG):
            #     msg = [ msg + '\n'  ] + traceback.format_stack()
            #     log.warning("".join(msg))
            # else:
            log.warning(msg)                
            return None, None

    def defer_delivery_settings_by_tid(self, service_tid, settings):
        '''select messages from smtp outbound delivery come in before knowing
        when recipient(s) they apply to. for instance, TLS details
        when making a connection to a remote relay. this function will
        defer those settings until find_delivery() can assign them to
        the recipient(s) involved by that smtp process

        '''
        if service_tid not in self.dds_by_tid:
            self.dds_by_tid[service_tid] = {
                "settings": settings
            }
        else:
            self.dds_by_tid[service_tid]['settings'].update(settings)
            
    def defer_delivery_settings_by_rcpt(self, mta_accept, rcpt_to, subsystem, settings):
        '''some recipient-oriented log entries, in particular from
        SpamAssassin and Postgrey, don't have enough information to
        unambiguously select a connection it applies to, and therefore
        can match multiple active connections. Use this function to
        defer the matchup until the ambiguity can be
        resoloved. find_delivery() will do this.

        '''
        if '_dds' not in mta_accept:
            mta_accept['_dds'] = {}
        dds = mta_accept['_dds']
        if rcpt_to not in dds:
            dds[rcpt_to] = { 'subsystems':[], 'settings':{} }
        dds[rcpt_to]['settings'].update(settings)
        dds[rcpt_to]['subsystems'].append(subsystem)
        
    def find_delivery(self, mta_accept, rcpt_to, service_tid=None, auto_add=False):
        if 'mta_delivery' not in mta_accept:
            if auto_add:
                mta_accept['mta_delivery'] = []
            else:
                return None

        rcpt_to = rcpt_to.lower()
        for delivery in mta_accept['mta_delivery']:
            if 'rcpt_to' in delivery and rcpt_to == delivery['rcpt_to'].lower():
                return delivery

        if auto_add:
            delivery = {
                'rcpt_to': rcpt_to
            }
            if '_dds' in mta_accept and rcpt_to in mta_accept['_dds']:
                dds = mta_accept['_dds'][rcpt_to]
                for subsystem in dds['subsystems']:
                    self.add_subsystem(mta_accept, subsystem)
                delivery.update(dds['settings'])
                del mta_accept['_dds'][rcpt_to]

            if service_tid is not None and service_tid in self.dds_by_tid:
                dds = self.dds_by_tid[service_tid]
                delivery.update(dds['settings'])
                # save the tid for cleanup during remove_connection()
                if '_delete_dds_tids' not in mta_accept:
                    mta_accept['_delete_dds_tids'] = { }
                mta_accept['_delete_dds_tids'][service_tid] = True
                
            mta_accept['mta_delivery'].append(delivery)
            return delivery


    def add_subsystem(self, mta_accept, subsystem):
        if 'subsystems' not in mta_accept:
            mta_accept['subsystems'] = subsystem
        #elif subsystem not in mta_accept['subsystems']:
        else:
            mta_accept['subsystems'] += ','+subsystem
            

    def match_connect(self, line):
        # 1. 1a. postfix/smtpd[13698]: connect from host.tld[1.2.3.4]
        m = self.re_connect_from.search(line)
        if m:
            mta_conn = {
                "connect_time": self.parse_date(m.group(1)), # "YYYY-MM-DD HH:MM:SS"
                "service": "smtpd" if m.group(2)=="smtpd" else "submission",
                "service_tid": m.group(3),
                "remote_host": m.group(4),
                "remote_ip": m.group(5)
            }
            self.add_new_connection(mta_conn)
            return { 'mta_conn': mta_conn }

    def match_local_pickup(self, line):
        # 1b. Dec  6 07:01:39 mail postfix/pickup[7853]: A684B1F787: uid=0 from=<root>
        #    1=date
        #    2=service ("pickup")
        #    3=service_tid
        #    4=postfix_msg_id
        m = self.re_local_pickup.search(line)
        if m:
            mta_conn = {
                "connect_time": self.parse_date(m.group(1)),
                "disconnect_time": self.parse_date(m.group(1)),
                "service": m.group(2),
                "service_tid": m.group(3),
                "remote_host": "localhost",
                "remote_ip": "127.0.0.1"
            }
            mta_accept = {
                "postfix_msg_id": m.group(4),
            }
            self.append_mta_accept(mta_conn, mta_accept)
            self.add_new_connection(mta_conn)
            self.index_by_postfix_msg_id(mta_conn, mta_accept)
            return { 'mta_conn': mta_conn, 'mta_accept': mta_accept }
        

    def match_policyd_spf(self, line):
        v = None
        client_ip = None
        envelope_from = None
        # 2. policyd-spf[13703]: prepend Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=1.2.3.4; helo=host.tld; envelope-from=alice@post.com; receiver=<UNKNOWN>
        m = self.re_spf_1.search(line)
        if m:
            v = {
                "spf_tid": m.group(1),
                "spf_result": m.group(2),
                "spf_reason": PostfixLogParser.strip_brackets(
                    m.group(3),
                    bracket_l='(',
                    bracket_r=')'
                )
            }
            pairs = line[m.end():].split(';')
            for str in pairs:
                pair = str.strip().split('=', 1)
                if len(pair)==2:
                    if  pair[0]=='client-ip':
                        client_ip=pair[1]
                    elif pair[0]=='envelope-from':
                        envelope_from = pair[1]
        
        else:
            # 2a. policyd-spf[26231]: 550 5.7.23 Message rejected due to: Receiver policy for SPF Softfail. Please see http://www.openspf.net/Why?s=mfrom;id=test@google.com;ip=1.2.3.4;r=<UNKNOWN>
             m = self.re_spf_2.search(line)
             if m:
                 v = {
                     "spf_tid": m.group(1),
                     "spf_result": "reject",
                     "spf_reason": m.group(2),
                 }
                 pairs = line[m.end():].split(';')
                 for str in pairs:
                     # note: 'id' and 'ip' are part of the url, but we need
                     # to match records somehow...
                     pair = str.strip().split('=', 1)
                     if len(pair)==2:
                        if pair[0]=='ip':
                            client_ip = pair[1]
                        elif pair[0]=='id':
                            envelope_from = pair[1]
    
        if v:
            mta_conn_q = [{
                'key':'remote_ip', 'value':client_ip,
                'ignorecase': True
            }]
            mta_accept_q = [
                { 'key': 'envelope_from', 'value': envelope_from,
                  'ignorecase': True, 'autoset': True },
                { 'key': 'spf_result', 'value': None }
            ]
            mta_conn, mta_accept = self.find_first(
                mta_conn_q,
                mta_accept_q,
                auto_add=True
            )
            if mta_accept:
                mta_accept.update(v)
                self.add_subsystem(mta_accept, "policyd-spf")
                return { 'mta_conn':mta_conn, 'mta_accept':mta_accept }
            return True
        

    def match_postgrey(self, line):
        # Dec  9 14:46:57 mail postgrey[879]: action=greylist, reason=new, client_name=host.tld, client_address=1.2.3.4/32, sender=alice@post.com, recipient=mia@myhost.com        
        # 3. postgrey: "client_address=1.2.3.4/32, sender=alice@post.com" [action="pass|greylist", reason="client whitelist|triplet found|new", delay=x-seconds]
        # 3a. Dec  6 18:31:28 mail postgrey[879]: 0E98D1F787: action=pass, reason=triplet found, client_name=host.tld, client_address=1.2.3.4/32, sender=alice@post.com, recipient=mia@myhost.com
        #   1=postgrey_tid
        #   2=postfix_msg_id (re-1 only)

        v = None
        client_ip = None
        envelope_from = None
        rcpt_to = None
        postfix_msg_id = None
        m = self.re_postgrey_1.search(line)
        if not m: m = self.re_postgrey_2.search(line)
        
        if m:
            v = {
                'postgrey_tid': m.group(1)
            }
            try:
                postfix_msg_id = m.group(2)
            except IndexError:
                pass
            
            pairs = line[m.end():].split(',')
            for str in pairs:
                pair = str.strip().split('=', 1)
                if len(pair)==2:
                    if pair[0]=='action':
                        v['postgrey_result'] = pair[1]
                    elif pair[0]=='reason':
                        v['postgrey_reason'] = pair[1]
                    elif pair[0]=='sender':
                        envelope_from = pair[1]
                    elif pair[0]=='recipient':
                        rcpt_to = pair[1]
                    elif pair[0]=='client_address':
                        client_ip=pair[1]
                        # normalize the ipv6 address
                        #   postfix:  2607:f8b0:4864:20::32b
                        #   postgrey: 2607:F8B0:4864:20:0:0:0:32B/128
                        idx = client_ip.find('/')
                        if idx>=0:
                            client_ip = client_ip[0:idx]
                        if ':' in client_ip:
                            addr = ipaddress.ip_address(client_ip)
                            client_ip = addr.__str__()
                        
                    elif pair[0]=='delay':
                        v['postgrey_delay'] = pair[1]

        if v:
            matches = []
            if postfix_msg_id:
                mta_conn, mta_accept = self.find_by_postfix_msg_id(
                    postfix_msg_id
                )
                if mta_accept:
                    matches.append( (mta_conn, mta_accept) )

            if len(matches)==0:
                mta_conn_q = [{
                    'key':'remote_ip', 'value':client_ip,
                    'ignorecase': True
                }]
                mta_accept_q = [
                    { 'key': 'envelope_from', 'value': envelope_from,
                      'ignorecase': True, 'autoset': True }
                ]
                
                matches = self.find_by(mta_conn_q, mta_accept_q, auto_add=True)
            
            if len(matches)>0:
                log.debug('MATCHES(%s): %s', len(matches), matches)
                auto_add = ( len(matches)==1 )
                for mta_conn, mta_accept in matches:
                    mta_delivery = self.find_delivery(
                        mta_accept,
                        rcpt_to,
                        auto_add=auto_add
                    )
                    if mta_delivery:
                        mta_delivery.update(v)
                        log.debug('DELIVERY(postgrey): %s', mta_accept)
                        self.add_subsystem(mta_accept, "postgrey")
                        return {
                            'mta_conn': mta_conn,
                            'mta_accept': mta_accept,
                            'mta_delivery': mta_delivery
                        }

                # ambiguous: two or more active connections sending a
                # message with the exact same FROM address! defer
                # matching until another find_delivery(auto_add=True)
                # is called elsewhere (lmtp)
                for mta_conn, mta_accept in matches:
                    self.defer_delivery_settings_by_rcpt(
                        mta_accept,
                        rcpt_to,
                        'postgrey',
                        v
                    )
                return { 'deferred': True }

            return True


    def match_postfix_msg_id(self, line):
        m = self.re_postfix_noqueue.search(line)
        if m:
            # 4b. postfix/submission/smtpd[THREAD-ID]: NOQUEUE: reject: RCPT from unknown[1.2.3.4]: 550 5.7.23 <alice@somedomain.com>: Recipient address rejected: Message rejected due to: Receiver policy for SPF Softfail. Please see http://www.openspf.net/Why?s=mfrom;id=test@google.com;ip=1.2.3.4;r=<UNKNOWN>; from=<test@google.com> to=<alice@somedomain.com> proto=ESMTP helo=<qa2.abc.com>
            # 4c. postfix/smtpd[THREAD-ID]: "NOQUEUE: reject: ....: Recipient address rejected: Greylisted, seehttp://postgrey.../; ..."
            #   1=service ("submission/smtpd" or "smtpd")
            #   2=service_tid
            #   3=accept_status ("reject")
            service_tid = m.group(2)
            reason = line[m.end():].rstrip()
            envelope_from = None
            idx = reason.find('; ')  # note the space
            if idx>=0:
                for pair in PostfixLogParser.SplitList(reason[idx+2:], delim=' '):
                    if pair['name'] == 'from':
                        envelope_from = pair['value']
                        break
                reason=reason[0:idx]
                
            v = {
                'postfix_msg_id': 'NOQUEUE',
                'accept_status': m.group(3),
                'failure_info': reason,
                'failure_category':self.failure_category(reason,'postfix_other')
            }

            mta_conn_q = [{ 'key':'service_tid', 'value':service_tid }]
            mta_accept_q = [
                { 'key': 'envelope_from', 'value': envelope_from,
                  'ignorecase': True, 'autoset': True },
                { 'key': 'postfix_msg_id', 'value': None }
            ]
            mta_conn, mta_accept = self.find_first(
                mta_conn_q,
                mta_accept_q,
                auto_add=True
            )
            if mta_accept:
                mta_accept.update(v)
                return { 'mta_conn': mta_conn, 'mta_accept': mta_accept }
            return True
        
        m = self.re_postfix_msg_id.search(line)
        if m:
            # 4. postfix/smtpd[THREAD-ID]: "POSTFIX-MSG-ID" (eg: "DD95A1F796"): client=DNS[IP]
            # 4a. postfix/submission/smtpd[THREAD-ID]: POSTFIX-MSG-ID: client=DNS[IP], sasl_method=LOGIN, sasl_username=mia@myhost.com
            #   1=service ("submission/smtpd" or "smtpd")
            #   2=service_tid
            #   3=postfix_msg_id
            service_tid = m.group(2)
            postfix_msg_id = m.group(3)
            remote_host = None
            remote_ip = None
            v = {
            }
            for pair in PostfixLogParser.SplitList(line[m.end():]):
                if pair['name']=='sasl_method':
                    v['sasl_method'] = pair['value'].strip()
                elif pair['name']=='sasl_username':
                    v['sasl_username'] = pair['value'].strip()
                elif pair['name']=='client':
                    remote_host, remote_ip = \
                        PostfixLogParser.split_host(pair['value'])
                    
            mta_conn_q = [
                { 'key': 'service_tid', 'value': service_tid },
                { 'key': 'remote_ip', 'value': remote_ip }
            ]
            mta_accept_q = { 'key': 'postfix_msg_id', 'value': None }
            mta_conn, mta_accept = self.find_first(
                mta_conn_q,
                mta_accept_q,
                auto_add=True,
                debug=False
            )
            
            if mta_accept:
                mta_conn.update(v)
                mta_accept.update({
                    'postfix_msg_id': postfix_msg_id
                })
                self.index_by_postfix_msg_id(
                    mta_conn,
                    mta_accept
                )
                return { 'mta_conn':mta_conn, 'mta_accept':mta_accept }
            return True
                

    def match_message_id(self, line):
        # 5. Dec 10 06:48:48 mail postfix/cleanup[7435]: 031AF20076: message-id=<20201210114848.031AF20076@myhost.com>        
        m = self.re_postfix_message_id.search(line)
        if m:
            postfix_msg_id = m.group(1)
            v = {
                'message_id': m.group(2)
            }
            mta_conn, mta_accept = self.find_by_postfix_msg_id(postfix_msg_id)
            # auto-add ?
            if mta_accept:
                mta_accept.update(v)
                return { 'mta_conn':mta_conn, 'mta_accept':mta_accept }
            return True



    def match_opendkim(self, line):
        #   1=postfix_msg_id
        #   2=verification detail
        #   3=error-msg
        m = self.re_opendkim_ssl.search(line)
        if m:
            postfix_msg_id = m.group(1)
            err = m.group(3).strip()
            if err != '':
                v = {
                    'dkim_result': 'error',
                    'dkim_reason': err
                }
            else:
                v = {
                    'dkim_result': 'pass',
                    'dkim_reason': m.group(2).strip()
                }
            mta_conn, mta_accept = self.find_by_postfix_msg_id(postfix_msg_id)
            if mta_accept:
                mta_accept.update(v)
                self.add_subsystem(mta_accept, "dkim")
                return { 'mta_conn': mta_conn, 'mta_accept': mta_accept }
            return True
        
        #   1=postfix_msg_id
        #   2=error-msg
        m = self.re_opendkim_error.search(line)
        if m:
            postfix_msg_id = m.group(1)
            err = m.group(2).strip()
            v = {
                'dkim_result': 'error',
                'dkim_reason': err
            }
            mta_conn, mta_accept = self.find_by_postfix_msg_id(postfix_msg_id)
            if mta_accept:
                mta_accept.update(v)
                self.add_subsystem(mta_accept, "dkim")
                return { 'mta_conn': mta_conn, 'mta_accept': mta_accept }
            return True


    def match_opendmarc(self, line):
        #    1=postfix_msg_id
        #    2=domain
        #    3="pass","none","fail"
        m = self.re_opendmarc_result.search(line)
        if m:
            postfix_msg_id = m.group(1)
            v = {
                'dmarc_result': m.group(3),
                'dmarc_reason': m.group(2)
            }
            mta_conn, mta_accept = self.find_by_postfix_msg_id(postfix_msg_id)
            if mta_accept:
                mta_accept.update(v)
                self.add_subsystem(mta_accept, "dmarc")
                return { 'mta_conn': mta_conn, 'mta_accept': mta_accept }
            return True



    def match_postfix_queue_removed(self, line):
        # 13. postfix/qmgr: POSTFIX-MSG-ID: "removed"
        #    1=date
        #    2=postfix_msg_id
        m = self.re_queue_removed.search(line)
        if m:
            postfix_msg_id = m.group(2)
            v = {
                'queue_remove_time': self.parse_date(m.group(1))
            }
            mta_conn, mta_accept = self.find_by_postfix_msg_id(postfix_msg_id)
            if mta_accept:
                mta_accept.update(v)
                return { 'mta_conn': mta_conn, 'mta_accept': mta_accept }
            return True


    def match_postfix_queue_added(self, line):
        # 8. postfix/qmgr: POSTFIX-MSG-ID: from=user@tld, size=N, nrcpt=1 (queue active)
        #    1=date
        #    2=postfix_msg_id
        m = self.re_queue_added.search(line)
        if m:
            postfix_msg_id = m.group(2)
            v = {
                'queue_time': self.parse_date(m.group(1)),
                'accept_status': 'queued',
            }
            envelope_from = None
            for pair in PostfixLogParser.SplitList(line[m.end():]):
                if pair['name']=='size':
                    v['message_size'] = safe_int(pair['value'])
                elif pair['name']=='nrcpt':
                    v['message_nrcpt'] = safe_int(pair['value'])
                elif pair['name']=='from':
                    envelope_from = pair['value']
                                
            mta_conn, mta_accept = self.find_by_postfix_msg_id(postfix_msg_id)
            if mta_accept:
                if envelope_from and 'envelope_from' not in mta_accept:
                    v['envelope_from'] = envelope_from
                mta_accept.update(v)
                return { 'mta_conn': mta_conn, 'mta_accept': mta_accept }
            return True


    def match_spampd(self, line):
        # 11. spampd: "clean message <MESSAGE-ID>|(unknown) (SCORE/MAX-SCORE) from <FROM> for <user@tld> in 1.51s, N bytes"
        # 11(a) spampd: "identified spam <MESSAGE-ID>|(unknown) (5.12/5.00) from <FROM> for <user@tld> in 1.51s, N bytes"
        #    1=spam_tid
        #    2="clean message" | "identified spam" (other?)
        #    3=message_id ("(unknown)" if message id is "<>")
        #    4=spam_score
        #    5=envelope_from
        #    6=rcpt_to
        m = self.re_spampd.search(line)
        if m:
            message_id = m.group(3)
            from_email = m.group(5)
            rcpt_to = m.group(6)
            spam_result = m.group(2)

            if message_id == '(unknown)':
                message_id = '<>'
            
            if spam_result == 'clean message':
                spam_result = 'clean'
            elif spam_result == 'identified spam':
                spam_result = 'spam'
                
            v = {
                'spam_tid': m.group(1),
                'spam_result': spam_result,
                'spam_score': m.group(4)
            }

            mta_accept_q = [
                { 'key':'message_id', 'value':message_id },
                { 'key':'envelope_from', 'value':from_email,
                  'ignorecase': True },
            ]
            matches = self.find_by('*', mta_accept_q, debug=False)

            if len(matches)==0 and message_id=='<>':
                # not sure why this happens - the message has a valid
                # message-id reported by postfix, but spampd doesn't
                # see it
                mta_accept_q = [
                    { 'key':'envelope_from', 'value':from_email,
                      'ignorecase': True },
                ]
                matches = self.find_by('*', mta_accept_q, debug=False)
                if len(matches)>1:
                    # ambiguous - can't match it
                    matches = []
                
            
            if len(matches)>0:
                #log.debug('MATCHES(%s): %s', len(matches), matches)
                auto_add = ( len(matches)==1 )
                for mta_conn, mta_accept in matches:
                    mta_delivery = self.find_delivery(
                        mta_accept,
                        rcpt_to,
                        auto_add=auto_add
                    )
                    if mta_delivery:
                        mta_delivery.update(v)
                        log.debug('DELIVERY(spam): %s', mta_accept)
                        self.add_subsystem(mta_accept, "spam")
                        return {
                            'mta_conn': mta_conn,
                            'mta_accept': mta_accept,
                            'mta_delivery': mta_delivery
                        }
                    
                # ambiguous: two or more active connections sending a
                # message with the exact same message-id from the
                # exact same FROM address! defer matching until
                # another find_delivery(auto_add=True) is called
                # elsewhere (lmtp)
                for mta_conn, mta_accept in matches:
                    self.defer_delivery_settings_by_rcpt(
                        mta_accept,
                        rcpt_to,
                        'spam',
                        v
                    )
                return { 'deferred': True }
                
            return True
        


    def match_disconnect(self, line):
        # disconnect from unknown[1.2.3.4] ehlo=1 auth=0/1 quit=1 commands=2/3
        #    1=date
        #    2=service ("submission/smptd" or "smtpd")
        #    3=service_tid
        #    4=remote_host
        #    5=remote_ip
        m = self.re_disconnect.search(line)
        if m:
            service_tid = m.group(3)
            remote_ip = m.group(5)
            v = {
                'disconnect_time': self.parse_date(m.group(1)),
                'remote_auth_success': 0,
                'remote_auth_attempts': 0,
                'remote_used_starttls': 0
            }
            pairs = line[m.end():].split(' ')
            for str in pairs:
                pair = str.strip().split('=', 1)
                if len(pair)==2:
                    if pair[0]=='auth':
                        idx = pair[1].find('/')
                        if idx>=0:
                            v['remote_auth_success'] = safe_int(pair[1][0:idx])
                            v['remote_auth_attempts'] = safe_int(pair[1][idx+1:])
                        else:
                            v['remote_auth_success'] = safe_int(pair[1])
                            v['remote_auth_attempts'] = safe_int(pair[1])
                            
                    elif pair[0]=='starttls':
                        v['remote_used_starttls'] = safe_int(pair[1])
                        

            mta_conn_q = [
                { 'key': 'service_tid', 'value': service_tid },
                { 'key': 'remote_ip', 'value': remote_ip }
            ]
            mta_conn, mta_accept = self.find_first(mta_conn_q, None)
            if mta_conn:
                mta_conn.update(v)
                return { 'mta_conn': mta_conn }
            return True
                        

    def match_pre_delivery(self, line):
        # postfix/smtp[18333]: Trusted TLS connection established to mx01.mail.icloud.com[17.57.154.23]:25: TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)
        # postfix/smtp[566]: Untrusted TLS connection established to host.tld[1.2.3.4]:25: TLSv1.2 with cipher AES128-GCM-SHA256 (128/128 bits)
        #    1=service ("smtp")
        #    2=service_tid
        #    3=delivery_connection ("Trusted", "Untrusted", "Verified")
        #    4=tls details
        m = self.re_pre_delivery.search(line)
        if m:
            service_tid = m.group(2)
            delivery_connection = m.group(3).lower()
            self.defer_delivery_settings_by_tid(service_tid, {
                'delivery_connection': delivery_connection,
                'delivery_connection_info': m.group(3) + ' ' + m.group(4)
            })
            return { "deferred": True }
        

    def match_delivery(self, line):
        # 12. postfix/lmtp: POSTFIX-MSG-ID: to=<user@tld>, relay=127.0.0.1[127.0.0.1]:10025, delay=4.7, delays=1/0.01/0.01/3.7, dsn=2.0.0, status=sent (250 2.0.0 <user@domain.tld> YB5nM1eS01+lSgAAlWWVsw Saved)
        # 12a. postfix/lmtp: POSTFIX_MSG-ID: to=user@tld, status=bounced (host...said...550 5.1.1 <user@tld> User doesn't exist ....)
        # 12b. postfix/smtp[32052]: A493B1FAF1: to=<alice@post.com>, relay=mx.post.com[1.2.3.4]:25, delay=1.2, delays=0.65/0.06/0.4/0.09, dsn=2.0.0, status=sent (250 2.0.0 OK 7E/38-26906-CDC5DCF5): None
        # 12c. postfix/smtp[21816]: BD1D31FB12: host mx2.comcast.net[2001:558:fe21:2a::6] refused to talk to me: 554 resimta-ch2-18v.sys.comcast.net resimta-ch2-18v.sys.comcast.net 2600:3c02::f03c:92ff:febb:192f found on one or more DNSBLs, see http://postmaster.comcast.net/smtp-error-codes.php#BL000001
        # 12d. postfix/lmtp[26439]: B306D1F77F: to=<user@local.com>, orig_to=<alias@local.com>, relay=127.0.0.1[127.0.0.1]:10025, delay=1.7, delays=0.53/0.01/0/1.1, dsn=2.0.0, status=sent (250 2.0.0 <user@local.com> 4BYfOjho/19oZQAAlWWVsw Saved)

        #    1=system ("lmtp" or "smtp")
        #    2=system_tid
        #    3=postfix_msg_id
        m = self.re_delivery.search(line)
        if m:
            service = m.group(1)
            service_tid = m.group(2)
            postfix_msg_id = m.group(3)
            mta_conn, mta_accept = self.find_by_postfix_msg_id(postfix_msg_id)
            if not mta_conn:
                return True

            if 'status=' not in line:
                # temporary error: postfix will keep trying
                # 12c
                reason = line[m.end():].strip()
                category = self.failure_category(reason, 'temporary_error')
                if 'failure_info' in mta_accept:
                    mta_accept['failure_info'] += '\n' + reason
                    if mta_accept['failure_category'] != category:
                        mta_accept['failure_category'] = 'multiple'
                else:
                    v = {
                        'failure_info': reason,
                        'failure_category': category
                    }
                    mta_accept.update(v)
                return { 'mta_conn': mta_conn, 'mta_accept': mta_accept }

            
            # 12, 12a, 12b, 12d
            detail = PostfixLogParser.SplitList(line[m.end():]).asDict()
            if 'to' not in detail:
                return True

            mta_delivery = self.find_delivery(
                mta_accept,
                detail['to']['value'],
                service_tid=service_tid,
                auto_add=True
            )
            
            if 'orig_to' in detail:
                # sent to an alias, then delivered to a user
                # 'to' is the final user, 'orig_to' is the alias
                mta_delivery['orig_to'] = detail['orig_to']['value']
                mta_delivery_2 = self.find_delivery(
                    mta_accept,
                    detail['orig_to']['value'],
                    service_tid=service_tid,
                    auto_add=False
                )
                if mta_delivery_2:
                    # combine first record into second, then remove the first
                    mta_delivery_2.update(mta_delivery)
                    mta_accept['mta_delivery'].remove(mta_delivery)
                    mta_delivery = mta_delivery_2
                
            mta_delivery['service'] = service
            mta_delivery['service_tid'] = service_tid
            log.debug('DELIVERY(accept): %s', mta_accept)

            if 'status' in detail:
                result = detail['status']['value'].strip()
                comment = detail['status'].get('comment')
                mta_delivery['status'] = result
                if result == 'sent':
                    safe_del(mta_delivery, 'failure_category')
                    
                else:
                    mta_delivery['failure_category'] = \
                        self.failure_category(comment, service + "_other")
                            
                if comment:
                    if mta_delivery.get('delivery_info'):
                        mta_delivery['delivery_info'] += ("\n" + comment)
                    else:
                        mta_delivery['delivery_info'] = comment
                            
            if 'delay' in detail:
                mta_delivery['delay'] = detail['delay']['value'].strip()

            if 'relay' in detail:
                mta_delivery['relay'] = detail['relay']['value'].strip()
                    
            self.add_subsystem(mta_accept, service)
            return { 'mta_conn':mta_conn, 'mta_accept':mta_accept, 'mta_delivery':mta_delivery }


    def store(self, mta_conn):
        def all_rejects(mta_accept):
            if not mta_accept: return False
            all = True
            for accept in mta_accept:
                if accept.get('accept_status') != 'reject': # or accept.get('failure_category') == 'greylisted':
                    all = False
                    break
            return all

        if 'disposition' not in mta_conn:
            if 'queue_time' not in mta_conn and \
               mta_conn.get('remote_auth_success') == 0 and \
               mta_conn.get('remote_auth_attempts', 0) > 0:
                mta_conn.update({
                    'disposition': 'failed_login_attempt',
                })
            
            elif 'mta_accept' not in mta_conn and \
                 mta_conn.get('remote_auth_success') == 0 and \
                 mta_conn.get('remote_auth_attempts') == 0:
                mta_conn.update({
                    'disposition': 'suspected_scanner',
                })

            elif all_rejects(mta_conn.get('mta_accept')):
                mta_conn.update({
                    'disposition': 'reject'
                })
            elif mta_conn.get('remote_used_starttls',0)==0 and \
                 mta_conn.get('remote_ip') != '127.0.0.1':
                mta_conn.update({
                    'disposition': 'insecure'
                })
            else:
                mta_conn.update({
                    'disposition': 'ok',
                })                    

        drop = False
        with self.drop_disposition_lock:
            drop  = self.drop_disposition.get(mta_conn['disposition'], False)
            
        if not drop:
            log.debug('store: %s', mta_conn)
            try:
                self.record_store.store('inbound_mail', mta_conn)
            except Exception as e:
                log.exception(e)
        
        self.remove_connection(mta_conn)

        
    def log_match(self, match_str, match_result, line):
        if match_result is True:
            log.info('%s [unmatched]: %s', match_str, line)
            
        elif match_result:
            if match_result.get('deferred', False):
                log.debug('%s [deferred]: %s', match_str, line)
                
            elif 'mta_conn' in match_result:
                log.debug('%s: %s: %s', match_str, line, match_result['mta_conn'])
            else:
                log.error('no mta_conn in match_result: ', match_result)
        else:
            log.debug('%s: %s', match_str, line)
                

    def test_end_of_rec(self, match_result):
        if not match_result or match_result is True or match_result.get('deferred', False):
            return False
        return self.end_of_rec(match_result['mta_conn'])
    
    def end_of_rec(self, mta_conn):
        '''a client must be disconnected and all accepted messages removed
         from queue for the record to be "complete"

        '''
        if 'disconnect_time' not in mta_conn:
            return False
        
        nothing_queued = True
        for mta_accept in mta_conn.get('mta_accept',[]):
            if 'postfix_msg_id' in mta_accept and \
               mta_accept['postfix_msg_id'] != 'NOQUEUE' and \
               'queue_remove_time' not in mta_accept:
                nothing_queued = False
                break
            
        return nothing_queued
    

    def handle(self, line):
        '''overrides ReadLineHandler method

        This function is called by the main log reading thread in
        TailFile. All additional log reading is blocked until this
        function completes.

        The storage engine (`record_store`, a SqliteEventStore
        instance) does not block, so this function will return before
        the record is saved to disk.

        IMPORTANT:

        The data structures and methods in this class are not thread
        safe. It is not okay to call any of them when the instance is
        registered with TailFile.

        '''
        if not self.capture_enabled:
            return

        self.current_inprogress_recs = len(self.recs)

        log.debug('recs in progress: %s, dds_by_tid=%s',
                  len(self.recs),
                  len(self.dds_by_tid)
        )
        match = self.match_connect(line)
        if match:
            self.log_match('connect', match, line)
            return

        match = self.match_local_pickup(line)
        if match:
            self.log_match('local_pickup', match, line)
            return

        match = self.match_policyd_spf(line)
        if match:
            self.log_match('policyd_spf', match, line)
            return
        
        match = self.match_postgrey(line)
        if match:
            self.log_match('postgrey', match, line)
            return

        match = self.match_postfix_msg_id(line)
        if match:
            self.log_match('postfix_msg_id', match, line)
            return

        match = self.match_message_id(line)
        if match:
            self.log_match('message_id', match, line)
            return

        match = self.match_opendkim(line)
        if match:
            self.log_match('opendkim', match, line)
            return

        match = self.match_opendmarc(line)
        if match:
            self.log_match('opendmarc', match, line)
            return

        match = self.match_postfix_queue_added(line)
        if match:
            self.log_match('queue added', match, line)
            return

        match = self.match_spampd(line)
        if match:
            self.log_match('spam', match, line)
            return

        match = self.match_disconnect(line)
        if match:
            self.log_match('disconnect', match, line)
            if self.test_end_of_rec(match):
                # we're done - not queued and disconnected ... save it
                self.store(match['mta_conn'])
            return

        match = self.match_pre_delivery(line)
        if match:
            self.log_match('pre-delivery', match, line)
            return

        match = self.match_delivery(line)
        if match:
            self.log_match('delivery', match, line)
            return

        match = self.match_postfix_queue_removed(line)
        if match:
            self.log_match('queue removed', match, line)
            if self.test_end_of_rec(match):
                # we're done - not queued and disconnected ... save it
                self.store(match['mta_conn'])
            return
        
        self.log_match('IGNORED', None, line)


    def end_of_callbacks(self, thread):
        '''overrides ReadLineHandler method

        save incomplete records so we can pick up where we left off

        '''
        self.save_state()