diff --git a/management/backup.py b/management/backup.py index 6ec7e3af..a8e2444f 100755 --- a/management/backup.py +++ b/management/backup.py @@ -236,7 +236,7 @@ def get_duplicity_additional_args(env): f"--ssh-options='-i /root/.ssh/id_rsa_miab -p {port}'", f"--rsync-options='-e \"/usr/bin/ssh -oStrictHostKeyChecking=no -oBatchMode=yes -p {port} -i /root/.ssh/id_rsa_miab\"'", ] - elif get_target_type(config) == 's3': + if get_target_type(config) == 's3': # See note about hostname in get_duplicity_target_url. # The region name, which is required by some non-AWS endpoints, # is saved inside the username portion of the URL. @@ -447,7 +447,7 @@ def list_target_files(config): if target.scheme == "file": return [(fn, os.path.getsize(os.path.join(target.path, fn))) for fn in os.listdir(target.path)] - elif target.scheme == "rsync": + if target.scheme == "rsync": rsync_fn_size_re = re.compile(r'.* ([^ ]*) [^ ]* [^ ]* (.*)') rsync_target = '{host}:{path}' @@ -485,21 +485,20 @@ def list_target_files(config): if match: ret.append( (match.groups()[1], int(match.groups()[0].replace(',',''))) ) return ret + if 'Permission denied (publickey).' in listing: + reason = "Invalid user or check you correctly copied the SSH key." + elif 'No such file or directory' in listing: + reason = f"Provided path {target_path} is invalid." + elif 'Network is unreachable' in listing: + reason = f"The IP address {target.hostname} is unreachable." + elif 'Could not resolve hostname' in listing: + reason = f"The hostname {target.hostname} cannot be resolved." else: - if 'Permission denied (publickey).' in listing: - reason = "Invalid user or check you correctly copied the SSH key." - elif 'No such file or directory' in listing: - reason = f"Provided path {target_path} is invalid." - elif 'Network is unreachable' in listing: - reason = f"The IP address {target.hostname} is unreachable." - elif 'Could not resolve hostname' in listing: - reason = f"The hostname {target.hostname} cannot be resolved." - else: - reason = ("Unknown error." - "Please check running 'management/backup.py --verify'" - "from mailinabox sources to debug the issue.") - msg = f"Connection to rsync host failed: {reason}" - raise ValueError(msg) + reason = ("Unknown error." + "Please check running 'management/backup.py --verify'" + "from mailinabox sources to debug the issue.") + msg = f"Connection to rsync host failed: {reason}" + raise ValueError(msg) elif target.scheme == "s3": import boto3.s3 diff --git a/management/daemon.py b/management/daemon.py index 198b6f92..dd1cc89a 100755 --- a/management/daemon.py +++ b/management/daemon.py @@ -92,12 +92,11 @@ def authorized_personnel_only(viewfunc): if request.headers.get('Accept') in {None, "", "*/*"}: # Return plain text output. return Response(error+"\n", status=status, mimetype='text/plain', headers=headers) - else: - # Return JSON output. - return Response(json.dumps({ - "status": "error", - "reason": error, - })+"\n", status=status, mimetype='application/json', headers=headers) + # Return JSON output. + return Response(json.dumps({ + "status": "error", + "reason": error, + })+"\n", status=status, mimetype='application/json', headers=headers) return newview @@ -147,13 +146,12 @@ def login(): "status": "missing-totp-token", "reason": str(e), }) - else: - # Log the failed login - log_failed_login(request) - return json_response({ - "status": "invalid", - "reason": str(e), - }) + # Log the failed login + log_failed_login(request) + return json_response({ + "status": "invalid", + "reason": str(e), + }) # Return a new session for the user. resp = { @@ -185,8 +183,7 @@ def logout(): def mail_users(): if request.args.get("format", "") == "json": return json_response(get_mail_users_ex(env, with_archived=True)) - else: - return "".join(x+"\n" for x in get_mail_users(env)) + return "".join(x+"\n" for x in get_mail_users(env)) @app.route('/mail/users/add', methods=['POST']) @authorized_personnel_only @@ -233,8 +230,7 @@ def mail_user_privs_remove(): def mail_aliases(): if request.args.get("format", "") == "json": return json_response(get_mail_aliases_ex(env)) - else: - return "".join(address+"\t"+receivers+"\t"+(senders or "")+"\n" for address, receivers, senders, auto in get_mail_aliases(env)) + return "".join(address+"\t"+receivers+"\t"+(senders or "")+"\n" for address, receivers, senders, auto in get_mail_aliases(env)) @app.route('/mail/aliases/add', methods=['POST']) @authorized_personnel_only @@ -354,7 +350,7 @@ def dns_set_record(qname, rtype="A"): # Get the existing records matching the qname and rtype. return dns_get_records(qname, rtype) - elif request.method in {"POST", "PUT"}: + if request.method in {"POST", "PUT"}: # There is a default value for A/AAAA records. if rtype in {"A", "AAAA"} and value == "": value = request.environ.get("HTTP_X_FORWARDED_FOR") # normally REMOTE_ADDR but we're behind nginx as a reverse proxy @@ -512,8 +508,8 @@ def totp_post_disable(): return (str(e), 400) if result: # success return "OK" - else: # error - return ("Invalid user or MFA id.", 400) + # error + return ("Invalid user or MFA id.", 400) # WEB @@ -597,8 +593,7 @@ def needs_reboot(): from status_checks import is_reboot_needed_due_to_package_installation if is_reboot_needed_due_to_package_installation(): return json_response(True) - else: - return json_response(False) + return json_response(False) @app.route('/system/reboot', methods=["POST"]) @authorized_personnel_only @@ -607,8 +602,7 @@ def do_reboot(): from status_checks import is_reboot_needed_due_to_package_installation if is_reboot_needed_due_to_package_installation(): return utils.shell("check_output", ["/sbin/shutdown", "-r", "now"], capture_stderr=True) - else: - return "No reboot is required, so it is not allowed." + return "No reboot is required, so it is not allowed." @app.route('/system/backup/status') diff --git a/management/dns_update.py b/management/dns_update.py index 8c931871..1cc8a0da 100755 --- a/management/dns_update.py +++ b/management/dns_update.py @@ -124,8 +124,7 @@ def do_dns_update(env, force=False): if len(updated_domains) == 0: # if nothing was updated (except maybe OpenDKIM's files), don't show any output return "" - else: - return "updated DNS: " + ",".join(updated_domains) + "\n" + return "updated DNS: " + ",".join(updated_domains) + "\n" ######################################################################## diff --git a/management/mail_log.py b/management/mail_log.py index 8192ca3f..7d1cede2 100755 --- a/management/mail_log.py +++ b/management/mail_log.py @@ -355,7 +355,7 @@ def scan_mail_log_line(line, collector): if date > END_DATE: # Don't process, and halt return False - elif date < START_DATE: + if date < START_DATE: # Don't process, but continue return True @@ -634,8 +634,7 @@ def print_time_table(labels, data, do_print=True): if do_print: print("\n".join(lines)) return None - else: - return lines + return lines def print_user_table(users, data=None, sub_data=None, activity=None, latest=None, earliest=None, diff --git a/management/mailconfig.py b/management/mailconfig.py index 9fdd1d33..4f155706 100755 --- a/management/mailconfig.py +++ b/management/mailconfig.py @@ -93,8 +93,7 @@ def open_database(env, with_connection=False): conn = sqlite3.connect(env["STORAGE_ROOT"] + "/mail/users.sqlite") if not with_connection: return conn.cursor() - else: - return conn, conn.cursor() + return conn, conn.cursor() def get_mail_users(env): # Returns a flat, sorted list of all user accounts. @@ -271,11 +270,11 @@ def add_mail_user(email, pw, privs, env): # validate email if email.strip() == "": return ("No email address provided.", 400) - elif not validate_email(email): + if not validate_email(email): return ("Invalid email address.", 400) - elif not validate_email(email, mode='user'): + if not validate_email(email, mode='user'): return ("User account email addresses may only use the lowercase ASCII letters a-z, the digits 0-9, underscore (_), hyphen (-), and period (.).", 400) - elif is_dcv_address(email) and len(get_mail_users(env)) > 0: + if is_dcv_address(email) and len(get_mail_users(env)) > 0: # Make domain control validation hijacking a little harder to mess up by preventing the usual # addresses used for DCV from being user accounts. Except let it be the first account because # during box setup the user won't know the rules. @@ -483,9 +482,8 @@ def add_mail_alias(address, forwards_to, permitted_senders, env, update_if_exist except sqlite3.IntegrityError: if not update_if_exists: return ("Alias already exists ({}).".format(address), 400) - else: - c.execute("UPDATE aliases SET destination = ?, permitted_senders = ? WHERE source = ?", (forwards_to, permitted_senders, address)) - return_status = "alias updated" + c.execute("UPDATE aliases SET destination = ?, permitted_senders = ? WHERE source = ?", (forwards_to, permitted_senders, address)) + return_status = "alias updated" conn.commit() diff --git a/management/ssl_certificates.py b/management/ssl_certificates.py index af262ba4..effb6ace 100755 --- a/management/ssl_certificates.py +++ b/management/ssl_certificates.py @@ -160,14 +160,13 @@ def get_domain_ssl_files(domain, ssl_certificates, env, allow_missing_cert=False wildcard_domain = re.sub(r"^[^\.]+", "*", domain) if domain in ssl_certificates: return ssl_certificates[domain] - elif wildcard_domain in ssl_certificates: + if wildcard_domain in ssl_certificates: return ssl_certificates[wildcard_domain] - elif not allow_missing_cert: + if not allow_missing_cert: # No valid certificate is available for this domain! Return default files. return system_certificate - else: - # No valid certificate is available for this domain. - return None + # No valid certificate is available for this domain. + return None # PROVISIONING CERTIFICATES FROM LETSENCRYPT @@ -590,34 +589,33 @@ def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring # Certificate is self-signed. Probably we detected this above. return ("SELF-SIGNED", None) - elif retcode != 0: + if retcode != 0: if "unable to get local issuer certificate" in verifyoutput: return ("The certificate is missing an intermediate chain or the intermediate chain is incorrect or incomplete. ({})".format(verifyoutput), None) # There is some unknown problem. Return the `openssl verify` raw output. return ("There is a problem with the certificate.", verifyoutput.strip()) + # `openssl verify` returned a zero exit status so the cert is currently + # good. + + # But is it expiring soon? + cert_expiration_date = cert.not_valid_after + ndays = (cert_expiration_date-now).days + if not rounded_time or ndays <= 10: + # Yikes better renew soon! + expiry_info = "The certificate expires in %d days on %s." % (ndays, cert_expiration_date.date().isoformat()) else: - # `openssl verify` returned a zero exit status so the cert is currently - # good. + # We'll renew it with Lets Encrypt. + expiry_info = "The certificate expires on {}.".format(cert_expiration_date.date().isoformat()) - # But is it expiring soon? - cert_expiration_date = cert.not_valid_after - ndays = (cert_expiration_date-now).days - if not rounded_time or ndays <= 10: - # Yikes better renew soon! - expiry_info = "The certificate expires in %d days on %s." % (ndays, cert_expiration_date.date().isoformat()) - else: - # We'll renew it with Lets Encrypt. - expiry_info = "The certificate expires on {}.".format(cert_expiration_date.date().isoformat()) + if warn_if_expiring_soon and ndays <= warn_if_expiring_soon: + # Warn on day 10 to give 4 days for us to automatically renew the + # certificate, which occurs on day 14. + return ("The certificate is expiring soon: " + expiry_info, None) - if warn_if_expiring_soon and ndays <= warn_if_expiring_soon: - # Warn on day 10 to give 4 days for us to automatically renew the - # certificate, which occurs on day 14. - return ("The certificate is expiring soon: " + expiry_info, None) - - # Return the special OK code. - return ("OK", expiry_info) + # Return the special OK code. + return ("OK", expiry_info) def load_cert_chain(pemfile): # A certificate .pem file may contain a chain of certificates. @@ -671,8 +669,7 @@ def get_certificate_domains(cert): def idna_decode_dns_name(dns_name): if dns_name.startswith("*."): return "*." + idna.encode(dns_name[2:]).decode('ascii') - else: - return idna.encode(dns_name).decode('ascii') + return idna.encode(dns_name).decode('ascii') try: sans = cert.extensions.get_extension_for_oid(OID_SUBJECT_ALTERNATIVE_NAME).value.get_values_for_type(DNSName) diff --git a/management/status_checks.py b/management/status_checks.py index 5ea9c18e..a282be88 100755 --- a/management/status_checks.py +++ b/management/status_checks.py @@ -653,11 +653,11 @@ def check_dnssec(domain, env, output, dns_zonefiles, is_checking_primary=False): if {r[1] for r in matched_ds} == { '13' } and {r[2] for r in matched_ds} <= { '2', '4' }: # all are alg 13 and digest type 2 or 4 output.print_ok("DNSSEC 'DS' record is set correctly at registrar.") return - elif len([r for r in matched_ds if r[1] == '13' and r[2] in { '2', '4' }]) > 0: # some but not all are alg 13 + if len([r for r in matched_ds if r[1] == '13' and r[2] in { '2', '4' }]) > 0: # some but not all are alg 13 output.print_ok("DNSSEC 'DS' record is set correctly at registrar. (Records using algorithm other than ECDSAP256SHA256 and digest types other than SHA-256/384 should be removed.)") return - else: # no record uses alg 13 - output.print_warning("""DNSSEC 'DS' record set at registrar is valid but should be updated to ECDSAP256SHA256 and SHA-256 (see below). + # no record uses alg 13 + output.print_warning("""DNSSEC 'DS' record set at registrar is valid but should be updated to ECDSAP256SHA256 and SHA-256 (see below). IMPORTANT: Do not delete existing DNSSEC 'DS' records for this domain until confirmation that the new DNSSEC 'DS' record for this domain is valid.""") else: diff --git a/management/utils.py b/management/utils.py index 1dbbeb7e..1a5b2195 100644 --- a/management/utils.py +++ b/management/utils.py @@ -135,8 +135,7 @@ def shell(method, cmd_args, env=None, capture_stderr=False, return_bytes=False, if not return_bytes and isinstance(ret, bytes): ret = ret.decode("utf8") if not trap: return ret - else: - return code, ret + return code, ret def create_syslog_handler(): import logging.handlers diff --git a/management/web_update.py b/management/web_update.py index b14fdfaf..825158b3 100644 --- a/management/web_update.py +++ b/management/web_update.py @@ -256,10 +256,9 @@ def get_web_domains_info(env): cert_status, cert_status_details = check_certificate(domain, tls_cert["certificate"], tls_cert["private-key"]) if cert_status == "OK": return ("success", "Signed & valid. " + cert_status_details) - elif cert_status == "SELF-SIGNED": + if cert_status == "SELF-SIGNED": return ("warning", "Self-signed. Get a signed certificate to stop warnings.") - else: - return ("danger", "Certificate has a problem: " + cert_status) + return ("danger", "Certificate has a problem: " + cert_status) return [ {