From 49124cc9caee228cf5595381203b667e82c8750f Mon Sep 17 00:00:00 2001 From: Teal Dulcet Date: Fri, 22 Dec 2023 07:10:25 -0800 Subject: [PATCH] Fixed PLR6201 (literal-membership): Use a `set` literal when testing for membership --- management/auth.py | 2 +- management/backup.py | 2 +- management/cli.py | 6 +++--- management/daemon.py | 6 +++--- management/dns_update.py | 10 +++++----- management/mail_log.py | 12 ++++++------ management/mailconfig.py | 2 +- management/ssl_certificates.py | 2 +- management/status_checks.py | 18 +++++++++--------- management/web_update.py | 2 +- tools/editconf.py | 2 +- 11 files changed, 32 insertions(+), 32 deletions(-) diff --git a/management/auth.py b/management/auth.py index 653d0485..2d5b8a36 100644 --- a/management/auth.py +++ b/management/auth.py @@ -48,7 +48,7 @@ class AuthService: return username, password username, password = parse_http_authorization_basic(request.headers.get('Authorization', '')) - if username in (None, ""): + if username in {None, ""}: raise ValueError("Authorization header invalid.") if username.strip() == "" and password.strip() == "": diff --git a/management/backup.py b/management/backup.py index 6ec4d28a..bb6dff9a 100755 --- a/management/backup.py +++ b/management/backup.py @@ -556,7 +556,7 @@ def backup_set_custom(env, target, target_user, target_pass, min_age): # Validate. try: - if config["target"] not in ("off", "local"): + if config["target"] not in {"off", "local"}: # these aren't supported by the following function, which expects a full url in the target key, # which is what is there except when loading the config prior to saving list_target_files(config) diff --git a/management/cli.py b/management/cli.py index 5e2e331c..115ed9f2 100755 --- a/management/cli.py +++ b/management/cli.py @@ -91,7 +91,7 @@ elif sys.argv[1] == "user" and len(sys.argv) == 2: print("*", end='') print() -elif sys.argv[1] == "user" and sys.argv[2] in ("add", "password"): +elif sys.argv[1] == "user" and sys.argv[2] in {"add", "password"}: if len(sys.argv) < 5: if len(sys.argv) < 4: email = input("email: ") @@ -109,7 +109,7 @@ elif sys.argv[1] == "user" and sys.argv[2] in ("add", "password"): elif sys.argv[1] == "user" and sys.argv[2] == "remove" and len(sys.argv) == 4: print(mgmt("/mail/users/remove", { "email": sys.argv[3] })) -elif sys.argv[1] == "user" and sys.argv[2] in ("make-admin", "remove-admin") and len(sys.argv) == 4: +elif sys.argv[1] == "user" and sys.argv[2] in {"make-admin", "remove-admin"} and len(sys.argv) == 4: if sys.argv[2] == "make-admin": action = "add" else: @@ -132,7 +132,7 @@ elif sys.argv[1] == "user" and len(sys.argv) == 5 and sys.argv[2:4] == ["mfa", " for mfa in status["enabled_mfa"]: W.writerow([mfa["id"], mfa["type"], mfa["label"]]) -elif sys.argv[1] == "user" and len(sys.argv) in (5, 6) and sys.argv[2:4] == ["mfa", "disable"]: +elif sys.argv[1] == "user" and len(sys.argv) in {5, 6} and sys.argv[2:4] == ["mfa", "disable"]: # Disable MFA (all or a particular device) for a user. print(mgmt("/mfa/disable", { "user": sys.argv[4], "mfa-id": sys.argv[5] if len(sys.argv) == 6 else None })) diff --git a/management/daemon.py b/management/daemon.py index 5081aea3..1a35fba7 100755 --- a/management/daemon.py +++ b/management/daemon.py @@ -90,7 +90,7 @@ def authorized_personnel_only(viewfunc): status = 403 headers = None - if request.headers.get('Accept') in (None, "", "*/*"): + if request.headers.get('Accept') in {None, "", "*/*"}: # Return plain text output. return Response(error+"\n", status=status, mimetype='text/plain', headers=headers) else: @@ -355,9 +355,9 @@ def dns_set_record(qname, rtype="A"): # Get the existing records matching the qname and rtype. return dns_get_records(qname, rtype) - elif request.method in ("POST", "PUT"): + elif request.method in {"POST", "PUT"}: # There is a default value for A/AAAA records. - if rtype in ("A", "AAAA") and value == "": + if rtype in {"A", "AAAA"} and value == "": value = request.environ.get("HTTP_X_FORWARDED_FOR") # normally REMOTE_ADDR but we're behind nginx as a reverse proxy # Cannot add empty records. diff --git a/management/dns_update.py b/management/dns_update.py index 0869e77d..23aee2a5 100755 --- a/management/dns_update.py +++ b/management/dns_update.py @@ -364,7 +364,7 @@ def build_zone(domain, domain_properties, additional_records, env, is_zone=True) # non-mail domain and also may include qnames from custom DNS records. # Do this once at the end of generating a zone. if is_zone: - qnames_with_a = set(qname for (qname, rtype, value, explanation) in records if rtype in ("A", "AAAA")) + qnames_with_a = set(qname for (qname, rtype, value, explanation) in records if rtype in {"A", "AAAA"}) qnames_with_mx = set(qname for (qname, rtype, value, explanation) in records if rtype == "MX") for qname in qnames_with_a - qnames_with_mx: # Mark this domain as not sending mail with hard-fail SPF and DMARC records. @@ -921,12 +921,12 @@ def set_custom_dns_record(qname, rtype, value, action, env): if not re.search(DOMAIN_RE, qname): raise ValueError("Invalid name.") - if rtype in ("A", "AAAA"): + if rtype in {"A", "AAAA"}: if value != "local": # "local" is a special flag for us v = ipaddress.ip_address(value) # raises a ValueError if there's a problem if rtype == "A" and not isinstance(v, ipaddress.IPv4Address): raise ValueError("That's an IPv6 address.") if rtype == "AAAA" and not isinstance(v, ipaddress.IPv6Address): raise ValueError("That's an IPv4 address.") - elif rtype in ("CNAME", "NS"): + elif rtype in {"CNAME", "NS"}: if rtype == "NS" and qname == zone: raise ValueError("NS records can only be set for subdomains.") @@ -936,7 +936,7 @@ def set_custom_dns_record(qname, rtype, value, action, env): if not re.search(DOMAIN_RE, value): raise ValueError("Invalid value.") - elif rtype in ("CNAME", "TXT", "SRV", "MX", "SSHFP", "CAA"): + elif rtype in {"CNAME", "TXT", "SRV", "MX", "SSHFP", "CAA"}: # anything goes pass else: @@ -979,7 +979,7 @@ def set_custom_dns_record(qname, rtype, value, action, env): # Preserve this record. newconfig.append((_qname, _rtype, _value)) - if action in ("add", "set") and needs_add and value is not None: + if action in {"add", "set"} and needs_add and value is not None: newconfig.append((qname, rtype, value)) made_change = True diff --git a/management/mail_log.py b/management/mail_log.py index 1a963966..38ae1ec6 100755 --- a/management/mail_log.py +++ b/management/mail_log.py @@ -376,9 +376,9 @@ def scan_mail_log_line(line, collector): elif service == "postfix/smtpd": if SCAN_BLOCKED: scan_postfix_smtpd_line(date, log, collector) - elif service in ("postfix/qmgr", "postfix/pickup", "postfix/cleanup", "postfix/scache", + elif service in {"postfix/qmgr", "postfix/pickup", "postfix/cleanup", "postfix/scache", "spampd", "postfix/anvil", "postfix/master", "opendkim", "postfix/lmtp", - "postfix/tlsmgr", "anvil"): + "postfix/tlsmgr", "anvil"}: # nothing to look at return True else: @@ -500,7 +500,7 @@ def add_login(user, date, protocol_name, host, collector): data["totals_by_protocol"][protocol_name] += 1 data["totals_by_protocol_and_host"][(protocol_name, host)] += 1 - if host not in ("127.0.0.1", "::1") or True: + if host not in {"127.0.0.1", "::1"} or True: data["activity-by-hour"][protocol_name][date.hour] += 1 collector["logins"][user] = data @@ -684,7 +684,7 @@ def print_user_table(users, data=None, sub_data=None, activity=None, latest=None data_accum[col] += d[row] try: - if None not in [latest, earliest]: + if None not in {latest, earliest}: vert_pos = len(line) e = earliest[row] l = latest[row] @@ -740,7 +740,7 @@ def print_user_table(users, data=None, sub_data=None, activity=None, latest=None else: header += l.rjust(max(5, len(l) + 1, col_widths[col])) - if None not in (latest, earliest): + if None not in {latest, earliest}: header += " │ timespan " lines.insert(0, header.rstrip()) @@ -765,7 +765,7 @@ def print_user_table(users, data=None, sub_data=None, activity=None, latest=None footer += temp.format(data_accum[row]) try: - if None not in [latest, earliest]: + if None not in {latest, earliest}: max_l = max(latest) min_e = min(earliest) timespan = relativedelta(max_l, min_e) diff --git a/management/mailconfig.py b/management/mailconfig.py index 4068b393..3bfc8dc0 100755 --- a/management/mailconfig.py +++ b/management/mailconfig.py @@ -588,7 +588,7 @@ def kick(env, mail_result=None): # They are now stored in the auto_aliases table. for address, forwards_to, permitted_senders, auto in get_mail_aliases(env): user, domain = address.split("@") - if user in ("postmaster", "admin", "abuse") \ + if user in {"postmaster", "admin", "abuse"} \ and address not in required_aliases \ and forwards_to == get_system_administrator(env) \ and not auto: diff --git a/management/ssl_certificates.py b/management/ssl_certificates.py index f3e83a15..77a411b6 100755 --- a/management/ssl_certificates.py +++ b/management/ssl_certificates.py @@ -637,7 +637,7 @@ def load_pem(pem): if pem_type is None: raise ValueError("File is not a valid PEM-formatted file.") pem_type = pem_type.group(1) - if pem_type in (b"RSA PRIVATE KEY", b"PRIVATE KEY"): + if pem_type in {b"RSA PRIVATE KEY", b"PRIVATE KEY"}: return serialization.load_pem_private_key(pem, password=None, backend=default_backend()) if pem_type == b"CERTIFICATE": return load_pem_x509_certificate(pem, default_backend()) diff --git a/management/status_checks.py b/management/status_checks.py index 81038300..7cdfffc6 100755 --- a/management/status_checks.py +++ b/management/status_checks.py @@ -151,7 +151,7 @@ def check_service(i, service, env): output.print_error("%s is not running (port %d)." % (service['name'], service['port'])) # Why is nginx not running? - if not running and service["port"] in (80, 443): + if not running and service["port"] in {80, 443}: output.print_line(shell('check_output', ['nginx', '-t'], capture_stderr=True, trap=True)[1].strip()) else: @@ -340,7 +340,7 @@ def run_domain_checks(rounded_time, env, output, pool, domains_to_check=None): domains_to_check = [ d for d in domains_to_check if not ( - d.split(".", 1)[0] in ("www", "autoconfig", "autodiscover", "mta-sts") + d.split(".", 1)[0] in {"www", "autoconfig", "autodiscover", "mta-sts"} and len(d.split(".", 1)) == 2 and d.split(".", 1)[1] in domains_to_check ) @@ -467,7 +467,7 @@ def check_primary_hostname_dns(domain, env, output, dns_domains, dns_zonefiles): # a DNS zone if it is a subdomain of another domain we have a zone for. existing_rdns_v4 = query_dns(dns.reversename.from_address(env['PUBLIC_IP']), "PTR") existing_rdns_v6 = query_dns(dns.reversename.from_address(env['PUBLIC_IPV6']), "PTR") if env.get("PUBLIC_IPV6") else None - if existing_rdns_v4 == domain and existing_rdns_v6 in (None, domain): + if existing_rdns_v4 == domain and existing_rdns_v6 in {None, domain}: output.print_ok("Reverse DNS is set correctly at ISP. [%s ↦ %s]" % (my_ips, env['PRIMARY_HOSTNAME'])) elif existing_rdns_v4 == existing_rdns_v6 or existing_rdns_v6 is None: output.print_error("""Your box's reverse DNS is currently %s, but it should be %s. Your ISP or cloud provider will have instructions @@ -636,7 +636,7 @@ def check_dnssec(domain, env, output, dns_zonefiles, is_checking_primary=False): if set(r[1] for r in matched_ds) == { '13' } and set(r[2] for r in matched_ds) <= { '2', '4' }: # all are alg 13 and digest type 2 or 4 output.print_ok("DNSSEC 'DS' record is set correctly at registrar.") return - elif len([r for r in matched_ds if r[1] == '13' and r[2] in ( '2', '4' )]) > 0: # some but not all are alg 13 + elif len([r for r in matched_ds if r[1] == '13' and r[2] in { '2', '4' }]) > 0: # some but not all are alg 13 output.print_ok("DNSSEC 'DS' record is set correctly at registrar. (Records using algorithm other than ECDSAP256SHA256 and digest types other than SHA-256/384 should be removed.)") return else: # no record uses alg 13 @@ -825,7 +825,7 @@ def query_dns(qname, rtype, nxdomain='[Not Set]', at=None, as_list=False): # be expressed in equivalent string forms. Canonicalize the form before # returning them. The caller should normalize any IP addresses the result # of this method is compared with. - if rtype in ("A", "AAAA"): + if rtype in {"A", "AAAA"}: response = [normalize_ip(str(r)) for r in response] if as_list: @@ -841,7 +841,7 @@ def check_ssl_cert(domain, rounded_time, ssl_certificates, env, output): # Check that TLS certificate is signed. # Skip the check if the A record is not pointed here. - if query_dns(domain, "A", None) not in (env['PUBLIC_IP'], None): return + if query_dns(domain, "A", None) not in {env['PUBLIC_IP'], None}: return # Where is the certificate file stored? tls_cert = get_domain_ssl_files(domain, ssl_certificates, env, allow_missing_cert=True) @@ -1002,14 +1002,14 @@ def run_and_output_changes(env, pool): out.add_heading(category + " -- Previously:") elif op == "delete": out.add_heading(category + " -- Removed") - if op in ("replace", "delete"): + if op in {"replace", "delete"}: BufferedOutput(with_lines=prev_lines[i1:i2]).playback(out) if op == "replace": out.add_heading(category + " -- Currently:") elif op == "insert": out.add_heading(category + " -- Added") - if op in ("replace", "insert"): + if op in {"replace", "insert"}: BufferedOutput(with_lines=cur_lines[j1:j2]).playback(out) for category, prev_lines in prev_status.items(): @@ -1095,7 +1095,7 @@ class BufferedOutput: def __init__(self, with_lines=None): self.buf = [] if not with_lines else with_lines def __getattr__(self, attr): - if attr not in ("add_heading", "print_ok", "print_error", "print_warning", "print_block", "print_line"): + if attr not in {"add_heading", "print_ok", "print_error", "print_warning", "print_block", "print_line"}: raise AttributeError # Return a function that just records the call & arguments to our buffer. def w(*args, **kwargs): diff --git a/management/web_update.py b/management/web_update.py index 46a6f2a0..14ad1379 100644 --- a/management/web_update.py +++ b/management/web_update.py @@ -53,7 +53,7 @@ def get_domains_with_a_records(env): domains = set() dns = get_custom_dns_config(env) for domain, rtype, value in dns: - if rtype == "CNAME" or (rtype in ("A", "AAAA") and value not in ("local", env['PUBLIC_IP'])): + if rtype == "CNAME" or (rtype in {"A", "AAAA"} and value not in {"local", env['PUBLIC_IP']}): domains.add(domain) return domains diff --git a/tools/editconf.py b/tools/editconf.py index ea4d9020..9e397846 100755 --- a/tools/editconf.py +++ b/tools/editconf.py @@ -84,7 +84,7 @@ while len(input_lines) > 0: # If this configuration file uses folded lines, append any folded lines # into our input buffer. - if folded_lines and line[0] not in (comment_char, " ", ""): + if folded_lines and line[0] not in {comment_char, " ", ""}: while len(input_lines) > 0 and input_lines[0][0] in " \t": line += input_lines.pop(0)