Fixed UP015 (redundant-open-modes): Unnecessary open mode parameters
This commit is contained in:
parent
0ee64f2fe8
commit
cb922ec286
|
@ -22,7 +22,7 @@ class AuthService:
|
||||||
def init_system_api_key(self):
|
def init_system_api_key(self):
|
||||||
"""Write an API key to a local file so local processes can use the API"""
|
"""Write an API key to a local file so local processes can use the API"""
|
||||||
|
|
||||||
with open(self.key_path, 'r') as file:
|
with open(self.key_path) as file:
|
||||||
self.key = file.read()
|
self.key = file.read()
|
||||||
|
|
||||||
def authenticate(self, request, env, login_only=False, logout=False):
|
def authenticate(self, request, env, login_only=False, logout=False):
|
||||||
|
|
|
@ -578,7 +578,7 @@ def get_backup_config(env, for_save=False, for_ui=False):
|
||||||
|
|
||||||
# Merge in anything written to custom.yaml.
|
# Merge in anything written to custom.yaml.
|
||||||
try:
|
try:
|
||||||
with open(os.path.join(backup_root, 'custom.yaml'), 'r') as f:
|
with open(os.path.join(backup_root, 'custom.yaml')) as f:
|
||||||
custom_config = rtyaml.load(f)
|
custom_config = rtyaml.load(f)
|
||||||
if not isinstance(custom_config, dict): raise ValueError() # caught below
|
if not isinstance(custom_config, dict): raise ValueError() # caught below
|
||||||
config.update(custom_config)
|
config.update(custom_config)
|
||||||
|
@ -604,7 +604,7 @@ def get_backup_config(env, for_save=False, for_ui=False):
|
||||||
config["target"] = "file://" + config["file_target_directory"]
|
config["target"] = "file://" + config["file_target_directory"]
|
||||||
ssh_pub_key = os.path.join('/root', '.ssh', 'id_rsa_miab.pub')
|
ssh_pub_key = os.path.join('/root', '.ssh', 'id_rsa_miab.pub')
|
||||||
if os.path.exists(ssh_pub_key):
|
if os.path.exists(ssh_pub_key):
|
||||||
with open(ssh_pub_key, 'r') as f:
|
with open(ssh_pub_key) as f:
|
||||||
config["ssh_pub_key"] = f.read()
|
config["ssh_pub_key"] = f.read()
|
||||||
|
|
||||||
return config
|
return config
|
||||||
|
|
|
@ -47,7 +47,7 @@ def read_password():
|
||||||
return first
|
return first
|
||||||
|
|
||||||
def setup_key_auth(mgmt_uri):
|
def setup_key_auth(mgmt_uri):
|
||||||
with open('/var/lib/mailinabox/api.key', 'r') as f:
|
with open('/var/lib/mailinabox/api.key') as f:
|
||||||
key = f.read().strip()
|
key = f.read().strip()
|
||||||
|
|
||||||
auth_handler = urllib.request.HTTPBasicAuthHandler()
|
auth_handler = urllib.request.HTTPBasicAuthHandler()
|
||||||
|
|
|
@ -455,7 +455,7 @@ def build_sshfp_records():
|
||||||
# specify that port to sshkeyscan.
|
# specify that port to sshkeyscan.
|
||||||
|
|
||||||
port = 22
|
port = 22
|
||||||
with open('/etc/ssh/sshd_config', 'r') as f:
|
with open('/etc/ssh/sshd_config') as f:
|
||||||
for line in f:
|
for line in f:
|
||||||
s = line.rstrip().split()
|
s = line.rstrip().split()
|
||||||
if len(s) == 2 and s[0] == 'Port':
|
if len(s) == 2 and s[0] == 'Port':
|
||||||
|
@ -606,7 +606,7 @@ def get_dns_zonefile(zone, env):
|
||||||
raise ValueError("%s is not a domain name that corresponds to a zone." % zone)
|
raise ValueError("%s is not a domain name that corresponds to a zone." % zone)
|
||||||
|
|
||||||
nsd_zonefile = "/etc/nsd/zones/" + fn
|
nsd_zonefile = "/etc/nsd/zones/" + fn
|
||||||
with open(nsd_zonefile, "r") as f:
|
with open(nsd_zonefile) as f:
|
||||||
return f.read()
|
return f.read()
|
||||||
|
|
||||||
########################################################################
|
########################################################################
|
||||||
|
@ -676,7 +676,7 @@ def hash_dnssec_keys(domain, env):
|
||||||
oldkeyfn = os.path.join(env['STORAGE_ROOT'], 'dns/dnssec', keyfn + ".private")
|
oldkeyfn = os.path.join(env['STORAGE_ROOT'], 'dns/dnssec', keyfn + ".private")
|
||||||
keydata.append(keytype)
|
keydata.append(keytype)
|
||||||
keydata.append(keyfn)
|
keydata.append(keyfn)
|
||||||
with open(oldkeyfn, "r") as fr:
|
with open(oldkeyfn) as fr:
|
||||||
keydata.append( fr.read() )
|
keydata.append( fr.read() )
|
||||||
keydata = "".join(keydata).encode("utf8")
|
keydata = "".join(keydata).encode("utf8")
|
||||||
return hashlib.sha1(keydata).hexdigest()
|
return hashlib.sha1(keydata).hexdigest()
|
||||||
|
@ -704,7 +704,7 @@ def sign_zone(domain, zonefile, env):
|
||||||
# Use os.umask and open().write() to securely create a copy that only
|
# Use os.umask and open().write() to securely create a copy that only
|
||||||
# we (root) can read.
|
# we (root) can read.
|
||||||
oldkeyfn = os.path.join(env['STORAGE_ROOT'], 'dns/dnssec', keyfn + ext)
|
oldkeyfn = os.path.join(env['STORAGE_ROOT'], 'dns/dnssec', keyfn + ext)
|
||||||
with open(oldkeyfn, "r") as fr:
|
with open(oldkeyfn) as fr:
|
||||||
keydata = fr.read()
|
keydata = fr.read()
|
||||||
keydata = keydata.replace("_domain_", domain)
|
keydata = keydata.replace("_domain_", domain)
|
||||||
prev_umask = os.umask(0o77) # ensure written file is not world-readable
|
prev_umask = os.umask(0o77) # ensure written file is not world-readable
|
||||||
|
@ -815,7 +815,7 @@ def write_opendkim_tables(domains, env):
|
||||||
|
|
||||||
def get_custom_dns_config(env, only_real_records=False):
|
def get_custom_dns_config(env, only_real_records=False):
|
||||||
try:
|
try:
|
||||||
with open(os.path.join(env['STORAGE_ROOT'], 'dns/custom.yaml'), 'r') as f:
|
with open(os.path.join(env['STORAGE_ROOT'], 'dns/custom.yaml')) as f:
|
||||||
custom_dns = rtyaml.load(f)
|
custom_dns = rtyaml.load(f)
|
||||||
if not isinstance(custom_dns, dict): raise ValueError() # caught below
|
if not isinstance(custom_dns, dict): raise ValueError() # caught below
|
||||||
except:
|
except:
|
||||||
|
|
|
@ -212,7 +212,7 @@ def check_ssh_password(env, output):
|
||||||
# the configuration file.
|
# the configuration file.
|
||||||
if not os.path.exists("/etc/ssh/sshd_config"):
|
if not os.path.exists("/etc/ssh/sshd_config"):
|
||||||
return
|
return
|
||||||
with open("/etc/ssh/sshd_config", "r") as f:
|
with open("/etc/ssh/sshd_config") as f:
|
||||||
sshd = f.read()
|
sshd = f.read()
|
||||||
if re.search("\nPasswordAuthentication\s+yes", sshd) \
|
if re.search("\nPasswordAuthentication\s+yes", sshd) \
|
||||||
or not re.search("\nPasswordAuthentication\s+no", sshd):
|
or not re.search("\nPasswordAuthentication\s+no", sshd):
|
||||||
|
@ -600,7 +600,7 @@ def check_dnssec(domain, env, output, dns_zonefiles, is_checking_primary=False):
|
||||||
# record that we suggest using is for the KSK (and that's how the DS records were generated).
|
# record that we suggest using is for the KSK (and that's how the DS records were generated).
|
||||||
# We'll also give the nice name for the key algorithm.
|
# We'll also give the nice name for the key algorithm.
|
||||||
dnssec_keys = load_env_vars_from_file(os.path.join(env['STORAGE_ROOT'], 'dns/dnssec/%s.conf' % alg_name_map[ds_alg]))
|
dnssec_keys = load_env_vars_from_file(os.path.join(env['STORAGE_ROOT'], 'dns/dnssec/%s.conf' % alg_name_map[ds_alg]))
|
||||||
with open(os.path.join(env['STORAGE_ROOT'], 'dns/dnssec/' + dnssec_keys['KSK'] + '.key'), 'r') as f:
|
with open(os.path.join(env['STORAGE_ROOT'], 'dns/dnssec/' + dnssec_keys['KSK'] + '.key')) as f:
|
||||||
dnsssec_pubkey = f.read().split("\t")[3].split(" ")[3]
|
dnsssec_pubkey = f.read().split("\t")[3].split(" ")[3]
|
||||||
|
|
||||||
expected_ds_records[ (ds_keytag, ds_alg, ds_digalg, ds_digest) ] = {
|
expected_ds_records[ (ds_keytag, ds_alg, ds_digalg, ds_digest) ] = {
|
||||||
|
@ -963,7 +963,7 @@ def run_and_output_changes(env, pool):
|
||||||
# Load previously saved status checks.
|
# Load previously saved status checks.
|
||||||
cache_fn = "/var/cache/mailinabox/status_checks.json"
|
cache_fn = "/var/cache/mailinabox/status_checks.json"
|
||||||
if os.path.exists(cache_fn):
|
if os.path.exists(cache_fn):
|
||||||
with open(cache_fn, 'r') as f:
|
with open(cache_fn) as f:
|
||||||
try:
|
try:
|
||||||
prev = json.load(f)
|
prev = json.load(f)
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
|
|
|
@ -14,7 +14,7 @@ def load_env_vars_from_file(fn):
|
||||||
# Load settings from a KEY=VALUE file.
|
# Load settings from a KEY=VALUE file.
|
||||||
import collections
|
import collections
|
||||||
env = collections.OrderedDict()
|
env = collections.OrderedDict()
|
||||||
with open(fn, 'r') as f:
|
with open(fn) as f:
|
||||||
for line in f:
|
for line in f:
|
||||||
env.setdefault(*line.strip().split("=", 1))
|
env.setdefault(*line.strip().split("=", 1))
|
||||||
return env
|
return env
|
||||||
|
@ -36,7 +36,7 @@ def load_settings(env):
|
||||||
import rtyaml
|
import rtyaml
|
||||||
fn = os.path.join(env['STORAGE_ROOT'], 'settings.yaml')
|
fn = os.path.join(env['STORAGE_ROOT'], 'settings.yaml')
|
||||||
try:
|
try:
|
||||||
with open(fn, "r") as f:
|
with open(fn) as f:
|
||||||
config = rtyaml.load(f)
|
config = rtyaml.load(f)
|
||||||
if not isinstance(config, dict): raise ValueError() # caught below
|
if not isinstance(config, dict): raise ValueError() # caught below
|
||||||
return config
|
return config
|
||||||
|
|
|
@ -63,7 +63,7 @@ def get_web_domains_with_root_overrides(env):
|
||||||
root_overrides = { }
|
root_overrides = { }
|
||||||
nginx_conf_custom_fn = os.path.join(env["STORAGE_ROOT"], "www/custom.yaml")
|
nginx_conf_custom_fn = os.path.join(env["STORAGE_ROOT"], "www/custom.yaml")
|
||||||
if os.path.exists(nginx_conf_custom_fn):
|
if os.path.exists(nginx_conf_custom_fn):
|
||||||
with open(nginx_conf_custom_fn, 'r') as f:
|
with open(nginx_conf_custom_fn) as f:
|
||||||
custom_settings = rtyaml.load(f)
|
custom_settings = rtyaml.load(f)
|
||||||
for domain, settings in custom_settings.items():
|
for domain, settings in custom_settings.items():
|
||||||
for type, value in [('redirect', settings.get('redirects', {}).get('/')),
|
for type, value in [('redirect', settings.get('redirects', {}).get('/')),
|
||||||
|
@ -78,7 +78,7 @@ def do_web_update(env):
|
||||||
|
|
||||||
# Helper for reading config files and templates
|
# Helper for reading config files and templates
|
||||||
def read_conf(conf_fn):
|
def read_conf(conf_fn):
|
||||||
with open(os.path.join(os.path.dirname(__file__), "../conf", conf_fn), "r") as f:
|
with open(os.path.join(os.path.dirname(__file__), "../conf", conf_fn)) as f:
|
||||||
return f.read()
|
return f.read()
|
||||||
|
|
||||||
# Build an nginx configuration file.
|
# Build an nginx configuration file.
|
||||||
|
@ -156,7 +156,7 @@ def make_domain_config(domain, templates, ssl_certificates, env):
|
||||||
hsts = "yes"
|
hsts = "yes"
|
||||||
nginx_conf_custom_fn = os.path.join(env["STORAGE_ROOT"], "www/custom.yaml")
|
nginx_conf_custom_fn = os.path.join(env["STORAGE_ROOT"], "www/custom.yaml")
|
||||||
if os.path.exists(nginx_conf_custom_fn):
|
if os.path.exists(nginx_conf_custom_fn):
|
||||||
with open(nginx_conf_custom_fn, 'r') as f:
|
with open(nginx_conf_custom_fn) as f:
|
||||||
yaml = rtyaml.load(f)
|
yaml = rtyaml.load(f)
|
||||||
if domain in yaml:
|
if domain in yaml:
|
||||||
yaml = yaml[domain]
|
yaml = yaml[domain]
|
||||||
|
|
|
@ -76,7 +76,7 @@ for setting in settings:
|
||||||
|
|
||||||
found = set()
|
found = set()
|
||||||
buf = ""
|
buf = ""
|
||||||
with open(filename, "r") as f:
|
with open(filename) as f:
|
||||||
input_lines = list(f)
|
input_lines = list(f)
|
||||||
|
|
||||||
while len(input_lines) > 0:
|
while len(input_lines) > 0:
|
||||||
|
|
|
@ -38,7 +38,7 @@ for date, ip in accesses:
|
||||||
# Since logs are rotated, store the statistics permanently in a JSON file.
|
# Since logs are rotated, store the statistics permanently in a JSON file.
|
||||||
# Load in the stats from an existing file.
|
# Load in the stats from an existing file.
|
||||||
if os.path.exists(outfn):
|
if os.path.exists(outfn):
|
||||||
with open(outfn, "r") as f:
|
with open(outfn) as f:
|
||||||
existing_data = json.load(f)
|
existing_data = json.load(f)
|
||||||
for date, count in existing_data:
|
for date, count in existing_data:
|
||||||
if date not in by_date:
|
if date not in by_date:
|
||||||
|
|
Loading…
Reference in New Issue