mirror of
				https://github.com/mail-in-a-box/mailinabox.git
				synced 2025-11-03 19:30:54 +00:00 
			
		
		
		
	Fixed RET505 (superfluous-else-return)
This commit is contained in:
		
							parent
							
								
									d34a2059df
								
							
						
					
					
						commit
						f4cef66d93
					
				@ -236,7 +236,7 @@ def get_duplicity_additional_args(env):
 | 
			
		||||
			f"--ssh-options='-i /root/.ssh/id_rsa_miab -p {port}'",
 | 
			
		||||
			f"--rsync-options='-e \"/usr/bin/ssh -oStrictHostKeyChecking=no -oBatchMode=yes -p {port} -i /root/.ssh/id_rsa_miab\"'",
 | 
			
		||||
		]
 | 
			
		||||
	elif get_target_type(config) == 's3':
 | 
			
		||||
	if get_target_type(config) == 's3':
 | 
			
		||||
		# See note about hostname in get_duplicity_target_url.
 | 
			
		||||
		# The region name, which is required by some non-AWS endpoints,
 | 
			
		||||
		# is saved inside the username portion of the URL.
 | 
			
		||||
@ -447,7 +447,7 @@ def list_target_files(config):
 | 
			
		||||
	if target.scheme == "file":
 | 
			
		||||
		return [(fn, os.path.getsize(os.path.join(target.path, fn))) for fn in os.listdir(target.path)]
 | 
			
		||||
 | 
			
		||||
	elif target.scheme == "rsync":
 | 
			
		||||
	if target.scheme == "rsync":
 | 
			
		||||
		rsync_fn_size_re = re.compile(r'.*    ([^ ]*) [^ ]* [^ ]* (.*)')
 | 
			
		||||
		rsync_target = '{host}:{path}'
 | 
			
		||||
 | 
			
		||||
@ -485,21 +485,20 @@ def list_target_files(config):
 | 
			
		||||
				if match:
 | 
			
		||||
					ret.append( (match.groups()[1], int(match.groups()[0].replace(',',''))) )
 | 
			
		||||
			return ret
 | 
			
		||||
		if 'Permission denied (publickey).' in listing:
 | 
			
		||||
			reason = "Invalid user or check you correctly copied the SSH key."
 | 
			
		||||
		elif 'No such file or directory' in listing:
 | 
			
		||||
			reason = f"Provided path {target_path} is invalid."
 | 
			
		||||
		elif 'Network is unreachable' in listing:
 | 
			
		||||
			reason = f"The IP address {target.hostname} is unreachable."
 | 
			
		||||
		elif 'Could not resolve hostname' in listing:
 | 
			
		||||
			reason = f"The hostname {target.hostname} cannot be resolved."
 | 
			
		||||
		else:
 | 
			
		||||
			if 'Permission denied (publickey).' in listing:
 | 
			
		||||
				reason = "Invalid user or check you correctly copied the SSH key."
 | 
			
		||||
			elif 'No such file or directory' in listing:
 | 
			
		||||
				reason = f"Provided path {target_path} is invalid."
 | 
			
		||||
			elif 'Network is unreachable' in listing:
 | 
			
		||||
				reason = f"The IP address {target.hostname} is unreachable."
 | 
			
		||||
			elif 'Could not resolve hostname' in listing:
 | 
			
		||||
				reason = f"The hostname {target.hostname} cannot be resolved."
 | 
			
		||||
			else:
 | 
			
		||||
				reason = ("Unknown error."
 | 
			
		||||
						"Please check running 'management/backup.py --verify'"
 | 
			
		||||
						"from mailinabox sources to debug the issue.")
 | 
			
		||||
			msg = f"Connection to rsync host failed: {reason}"
 | 
			
		||||
			raise ValueError(msg)
 | 
			
		||||
			reason = ("Unknown error."
 | 
			
		||||
					"Please check running 'management/backup.py --verify'"
 | 
			
		||||
					"from mailinabox sources to debug the issue.")
 | 
			
		||||
		msg = f"Connection to rsync host failed: {reason}"
 | 
			
		||||
		raise ValueError(msg)
 | 
			
		||||
 | 
			
		||||
	elif target.scheme == "s3":
 | 
			
		||||
		import boto3.s3
 | 
			
		||||
 | 
			
		||||
@ -92,12 +92,11 @@ def authorized_personnel_only(viewfunc):
 | 
			
		||||
		if request.headers.get('Accept') in {None, "", "*/*"}:
 | 
			
		||||
			# Return plain text output.
 | 
			
		||||
			return Response(error+"\n", status=status, mimetype='text/plain', headers=headers)
 | 
			
		||||
		else:
 | 
			
		||||
			# Return JSON output.
 | 
			
		||||
			return Response(json.dumps({
 | 
			
		||||
				"status": "error",
 | 
			
		||||
				"reason": error,
 | 
			
		||||
				})+"\n", status=status, mimetype='application/json', headers=headers)
 | 
			
		||||
		# Return JSON output.
 | 
			
		||||
		return Response(json.dumps({
 | 
			
		||||
			"status": "error",
 | 
			
		||||
			"reason": error,
 | 
			
		||||
			})+"\n", status=status, mimetype='application/json', headers=headers)
 | 
			
		||||
 | 
			
		||||
	return newview
 | 
			
		||||
 | 
			
		||||
@ -147,13 +146,12 @@ def login():
 | 
			
		||||
				"status": "missing-totp-token",
 | 
			
		||||
				"reason": str(e),
 | 
			
		||||
			})
 | 
			
		||||
		else:
 | 
			
		||||
			# Log the failed login
 | 
			
		||||
			log_failed_login(request)
 | 
			
		||||
			return json_response({
 | 
			
		||||
				"status": "invalid",
 | 
			
		||||
				"reason": str(e),
 | 
			
		||||
			})
 | 
			
		||||
		# Log the failed login
 | 
			
		||||
		log_failed_login(request)
 | 
			
		||||
		return json_response({
 | 
			
		||||
			"status": "invalid",
 | 
			
		||||
			"reason": str(e),
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	# Return a new session for the user.
 | 
			
		||||
	resp = {
 | 
			
		||||
@ -185,8 +183,7 @@ def logout():
 | 
			
		||||
def mail_users():
 | 
			
		||||
	if request.args.get("format", "") == "json":
 | 
			
		||||
		return json_response(get_mail_users_ex(env, with_archived=True))
 | 
			
		||||
	else:
 | 
			
		||||
		return "".join(x+"\n" for x in get_mail_users(env))
 | 
			
		||||
	return "".join(x+"\n" for x in get_mail_users(env))
 | 
			
		||||
 | 
			
		||||
@app.route('/mail/users/add', methods=['POST'])
 | 
			
		||||
@authorized_personnel_only
 | 
			
		||||
@ -233,8 +230,7 @@ def mail_user_privs_remove():
 | 
			
		||||
def mail_aliases():
 | 
			
		||||
	if request.args.get("format", "") == "json":
 | 
			
		||||
		return json_response(get_mail_aliases_ex(env))
 | 
			
		||||
	else:
 | 
			
		||||
		return "".join(address+"\t"+receivers+"\t"+(senders or "")+"\n" for address, receivers, senders, auto in get_mail_aliases(env))
 | 
			
		||||
	return "".join(address+"\t"+receivers+"\t"+(senders or "")+"\n" for address, receivers, senders, auto in get_mail_aliases(env))
 | 
			
		||||
 | 
			
		||||
@app.route('/mail/aliases/add', methods=['POST'])
 | 
			
		||||
@authorized_personnel_only
 | 
			
		||||
@ -354,7 +350,7 @@ def dns_set_record(qname, rtype="A"):
 | 
			
		||||
			# Get the existing records matching the qname and rtype.
 | 
			
		||||
			return dns_get_records(qname, rtype)
 | 
			
		||||
 | 
			
		||||
		elif request.method in {"POST", "PUT"}:
 | 
			
		||||
		if request.method in {"POST", "PUT"}:
 | 
			
		||||
			# There is a default value for A/AAAA records.
 | 
			
		||||
			if rtype in {"A", "AAAA"} and value == "":
 | 
			
		||||
				value = request.environ.get("HTTP_X_FORWARDED_FOR") # normally REMOTE_ADDR but we're behind nginx as a reverse proxy
 | 
			
		||||
@ -512,8 +508,8 @@ def totp_post_disable():
 | 
			
		||||
		return (str(e), 400)
 | 
			
		||||
	if result: # success
 | 
			
		||||
		return "OK"
 | 
			
		||||
	else: # error
 | 
			
		||||
		return ("Invalid user or MFA id.", 400)
 | 
			
		||||
	# error
 | 
			
		||||
	return ("Invalid user or MFA id.", 400)
 | 
			
		||||
 | 
			
		||||
# WEB
 | 
			
		||||
 | 
			
		||||
@ -597,8 +593,7 @@ def needs_reboot():
 | 
			
		||||
	from status_checks import is_reboot_needed_due_to_package_installation
 | 
			
		||||
	if is_reboot_needed_due_to_package_installation():
 | 
			
		||||
		return json_response(True)
 | 
			
		||||
	else:
 | 
			
		||||
		return json_response(False)
 | 
			
		||||
	return json_response(False)
 | 
			
		||||
 | 
			
		||||
@app.route('/system/reboot', methods=["POST"])
 | 
			
		||||
@authorized_personnel_only
 | 
			
		||||
@ -607,8 +602,7 @@ def do_reboot():
 | 
			
		||||
	from status_checks import is_reboot_needed_due_to_package_installation
 | 
			
		||||
	if is_reboot_needed_due_to_package_installation():
 | 
			
		||||
		return utils.shell("check_output", ["/sbin/shutdown", "-r", "now"], capture_stderr=True)
 | 
			
		||||
	else:
 | 
			
		||||
		return "No reboot is required, so it is not allowed."
 | 
			
		||||
	return "No reboot is required, so it is not allowed."
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@app.route('/system/backup/status')
 | 
			
		||||
 | 
			
		||||
@ -124,8 +124,7 @@ def do_dns_update(env, force=False):
 | 
			
		||||
	if len(updated_domains) == 0:
 | 
			
		||||
		# if nothing was updated (except maybe OpenDKIM's files), don't show any output
 | 
			
		||||
		return ""
 | 
			
		||||
	else:
 | 
			
		||||
		return "updated DNS: " + ",".join(updated_domains) + "\n"
 | 
			
		||||
	return "updated DNS: " + ",".join(updated_domains) + "\n"
 | 
			
		||||
 | 
			
		||||
########################################################################
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -355,7 +355,7 @@ def scan_mail_log_line(line, collector):
 | 
			
		||||
    if date > END_DATE:
 | 
			
		||||
        # Don't process, and halt
 | 
			
		||||
        return False
 | 
			
		||||
    elif date < START_DATE:
 | 
			
		||||
    if date < START_DATE:
 | 
			
		||||
        # Don't process, but continue
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
@ -634,8 +634,7 @@ def print_time_table(labels, data, do_print=True):
 | 
			
		||||
    if do_print:
 | 
			
		||||
        print("\n".join(lines))
 | 
			
		||||
        return None
 | 
			
		||||
    else:
 | 
			
		||||
        return lines
 | 
			
		||||
    return lines
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def print_user_table(users, data=None, sub_data=None, activity=None, latest=None, earliest=None,
 | 
			
		||||
 | 
			
		||||
@ -93,8 +93,7 @@ def open_database(env, with_connection=False):
 | 
			
		||||
	conn = sqlite3.connect(env["STORAGE_ROOT"] + "/mail/users.sqlite")
 | 
			
		||||
	if not with_connection:
 | 
			
		||||
		return conn.cursor()
 | 
			
		||||
	else:
 | 
			
		||||
		return conn, conn.cursor()
 | 
			
		||||
	return conn, conn.cursor()
 | 
			
		||||
 | 
			
		||||
def get_mail_users(env):
 | 
			
		||||
	# Returns a flat, sorted list of all user accounts.
 | 
			
		||||
@ -271,11 +270,11 @@ def add_mail_user(email, pw, privs, env):
 | 
			
		||||
	# validate email
 | 
			
		||||
	if email.strip() == "":
 | 
			
		||||
		return ("No email address provided.", 400)
 | 
			
		||||
	elif not validate_email(email):
 | 
			
		||||
	if not validate_email(email):
 | 
			
		||||
		return ("Invalid email address.", 400)
 | 
			
		||||
	elif not validate_email(email, mode='user'):
 | 
			
		||||
	if not validate_email(email, mode='user'):
 | 
			
		||||
		return ("User account email addresses may only use the lowercase ASCII letters a-z, the digits 0-9, underscore (_), hyphen (-), and period (.).", 400)
 | 
			
		||||
	elif is_dcv_address(email) and len(get_mail_users(env)) > 0:
 | 
			
		||||
	if is_dcv_address(email) and len(get_mail_users(env)) > 0:
 | 
			
		||||
		# Make domain control validation hijacking a little harder to mess up by preventing the usual
 | 
			
		||||
		# addresses used for DCV from being user accounts. Except let it be the first account because
 | 
			
		||||
		# during box setup the user won't know the rules.
 | 
			
		||||
@ -483,9 +482,8 @@ def add_mail_alias(address, forwards_to, permitted_senders, env, update_if_exist
 | 
			
		||||
	except sqlite3.IntegrityError:
 | 
			
		||||
		if not update_if_exists:
 | 
			
		||||
			return ("Alias already exists ({}).".format(address), 400)
 | 
			
		||||
		else:
 | 
			
		||||
			c.execute("UPDATE aliases SET destination = ?, permitted_senders = ? WHERE source = ?", (forwards_to, permitted_senders, address))
 | 
			
		||||
			return_status = "alias updated"
 | 
			
		||||
		c.execute("UPDATE aliases SET destination = ?, permitted_senders = ? WHERE source = ?", (forwards_to, permitted_senders, address))
 | 
			
		||||
		return_status = "alias updated"
 | 
			
		||||
 | 
			
		||||
	conn.commit()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -160,14 +160,13 @@ def get_domain_ssl_files(domain, ssl_certificates, env, allow_missing_cert=False
 | 
			
		||||
	wildcard_domain = re.sub(r"^[^\.]+", "*", domain)
 | 
			
		||||
	if domain in ssl_certificates:
 | 
			
		||||
		return ssl_certificates[domain]
 | 
			
		||||
	elif wildcard_domain in ssl_certificates:
 | 
			
		||||
	if wildcard_domain in ssl_certificates:
 | 
			
		||||
		return ssl_certificates[wildcard_domain]
 | 
			
		||||
	elif not allow_missing_cert:
 | 
			
		||||
	if not allow_missing_cert:
 | 
			
		||||
		# No valid certificate is available for this domain! Return default files.
 | 
			
		||||
		return system_certificate
 | 
			
		||||
	else:
 | 
			
		||||
		# No valid certificate is available for this domain.
 | 
			
		||||
		return None
 | 
			
		||||
	# No valid certificate is available for this domain.
 | 
			
		||||
	return None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# PROVISIONING CERTIFICATES FROM LETSENCRYPT
 | 
			
		||||
@ -590,34 +589,33 @@ def check_certificate(domain, ssl_certificate, ssl_private_key, warn_if_expiring
 | 
			
		||||
		# Certificate is self-signed. Probably we detected this above.
 | 
			
		||||
		return ("SELF-SIGNED", None)
 | 
			
		||||
 | 
			
		||||
	elif retcode != 0:
 | 
			
		||||
	if retcode != 0:
 | 
			
		||||
		if "unable to get local issuer certificate" in verifyoutput:
 | 
			
		||||
			return ("The certificate is missing an intermediate chain or the intermediate chain is incorrect or incomplete. ({})".format(verifyoutput), None)
 | 
			
		||||
 | 
			
		||||
		# There is some unknown problem. Return the `openssl verify` raw output.
 | 
			
		||||
		return ("There is a problem with the certificate.", verifyoutput.strip())
 | 
			
		||||
 | 
			
		||||
	# `openssl verify` returned a zero exit status so the cert is currently
 | 
			
		||||
	# good.
 | 
			
		||||
 | 
			
		||||
	# But is it expiring soon?
 | 
			
		||||
	cert_expiration_date = cert.not_valid_after
 | 
			
		||||
	ndays = (cert_expiration_date-now).days
 | 
			
		||||
	if not rounded_time or ndays <= 10:
 | 
			
		||||
		# Yikes better renew soon!
 | 
			
		||||
		expiry_info = "The certificate expires in %d days on %s." % (ndays, cert_expiration_date.date().isoformat())
 | 
			
		||||
	else:
 | 
			
		||||
		# `openssl verify` returned a zero exit status so the cert is currently
 | 
			
		||||
		# good.
 | 
			
		||||
		# We'll renew it with Lets Encrypt.
 | 
			
		||||
		expiry_info = "The certificate expires on {}.".format(cert_expiration_date.date().isoformat())
 | 
			
		||||
 | 
			
		||||
		# But is it expiring soon?
 | 
			
		||||
		cert_expiration_date = cert.not_valid_after
 | 
			
		||||
		ndays = (cert_expiration_date-now).days
 | 
			
		||||
		if not rounded_time or ndays <= 10:
 | 
			
		||||
			# Yikes better renew soon!
 | 
			
		||||
			expiry_info = "The certificate expires in %d days on %s." % (ndays, cert_expiration_date.date().isoformat())
 | 
			
		||||
		else:
 | 
			
		||||
			# We'll renew it with Lets Encrypt.
 | 
			
		||||
			expiry_info = "The certificate expires on {}.".format(cert_expiration_date.date().isoformat())
 | 
			
		||||
	if warn_if_expiring_soon and ndays <= warn_if_expiring_soon:
 | 
			
		||||
		# Warn on day 10 to give 4 days for us to automatically renew the
 | 
			
		||||
		# certificate, which occurs on day 14.
 | 
			
		||||
		return ("The certificate is expiring soon: " + expiry_info, None)
 | 
			
		||||
 | 
			
		||||
		if warn_if_expiring_soon and ndays <= warn_if_expiring_soon:
 | 
			
		||||
			# Warn on day 10 to give 4 days for us to automatically renew the
 | 
			
		||||
			# certificate, which occurs on day 14.
 | 
			
		||||
			return ("The certificate is expiring soon: " + expiry_info, None)
 | 
			
		||||
 | 
			
		||||
		# Return the special OK code.
 | 
			
		||||
		return ("OK", expiry_info)
 | 
			
		||||
	# Return the special OK code.
 | 
			
		||||
	return ("OK", expiry_info)
 | 
			
		||||
 | 
			
		||||
def load_cert_chain(pemfile):
 | 
			
		||||
	# A certificate .pem file may contain a chain of certificates.
 | 
			
		||||
@ -671,8 +669,7 @@ def get_certificate_domains(cert):
 | 
			
		||||
	def idna_decode_dns_name(dns_name):
 | 
			
		||||
		if dns_name.startswith("*."):
 | 
			
		||||
			return "*." + idna.encode(dns_name[2:]).decode('ascii')
 | 
			
		||||
		else:
 | 
			
		||||
			return idna.encode(dns_name).decode('ascii')
 | 
			
		||||
		return idna.encode(dns_name).decode('ascii')
 | 
			
		||||
 | 
			
		||||
	try:
 | 
			
		||||
		sans = cert.extensions.get_extension_for_oid(OID_SUBJECT_ALTERNATIVE_NAME).value.get_values_for_type(DNSName)
 | 
			
		||||
 | 
			
		||||
@ -653,11 +653,11 @@ def check_dnssec(domain, env, output, dns_zonefiles, is_checking_primary=False):
 | 
			
		||||
			if {r[1] for r in matched_ds} == { '13' } and {r[2] for r in matched_ds} <= { '2', '4' }: # all are alg 13 and digest type 2 or 4
 | 
			
		||||
				output.print_ok("DNSSEC 'DS' record is set correctly at registrar.")
 | 
			
		||||
				return
 | 
			
		||||
			elif len([r for r in matched_ds if r[1] == '13' and r[2] in { '2', '4' }]) > 0: # some but not all are alg 13
 | 
			
		||||
			if len([r for r in matched_ds if r[1] == '13' and r[2] in { '2', '4' }]) > 0: # some but not all are alg 13
 | 
			
		||||
				output.print_ok("DNSSEC 'DS' record is set correctly at registrar. (Records using algorithm other than ECDSAP256SHA256 and digest types other than SHA-256/384 should be removed.)")
 | 
			
		||||
				return
 | 
			
		||||
			else: # no record uses alg 13
 | 
			
		||||
				output.print_warning("""DNSSEC 'DS' record set at registrar is valid but should be updated to ECDSAP256SHA256 and SHA-256 (see below).
 | 
			
		||||
			# no record uses alg 13
 | 
			
		||||
			output.print_warning("""DNSSEC 'DS' record set at registrar is valid but should be updated to ECDSAP256SHA256 and SHA-256 (see below).
 | 
			
		||||
				IMPORTANT: Do not delete existing DNSSEC 'DS' records for this domain until confirmation that the new DNSSEC 'DS' record
 | 
			
		||||
				for this domain is valid.""")
 | 
			
		||||
		else:
 | 
			
		||||
 | 
			
		||||
@ -135,8 +135,7 @@ def shell(method, cmd_args, env=None, capture_stderr=False, return_bytes=False,
 | 
			
		||||
    if not return_bytes and isinstance(ret, bytes): ret = ret.decode("utf8")
 | 
			
		||||
    if not trap:
 | 
			
		||||
        return ret
 | 
			
		||||
    else:
 | 
			
		||||
        return code, ret
 | 
			
		||||
    return code, ret
 | 
			
		||||
 | 
			
		||||
def create_syslog_handler():
 | 
			
		||||
    import logging.handlers
 | 
			
		||||
 | 
			
		||||
@ -256,10 +256,9 @@ def get_web_domains_info(env):
 | 
			
		||||
		cert_status, cert_status_details = check_certificate(domain, tls_cert["certificate"], tls_cert["private-key"])
 | 
			
		||||
		if cert_status == "OK":
 | 
			
		||||
			return ("success", "Signed & valid. " + cert_status_details)
 | 
			
		||||
		elif cert_status == "SELF-SIGNED":
 | 
			
		||||
		if cert_status == "SELF-SIGNED":
 | 
			
		||||
			return ("warning", "Self-signed. Get a signed certificate to stop warnings.")
 | 
			
		||||
		else:
 | 
			
		||||
			return ("danger", "Certificate has a problem: " + cert_status)
 | 
			
		||||
		return ("danger", "Certificate has a problem: " + cert_status)
 | 
			
		||||
 | 
			
		||||
	return [
 | 
			
		||||
		{
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user