mirror of
				https://github.com/mail-in-a-box/mailinabox.git
				synced 2025-10-31 19:00:54 +00:00 
			
		
		
		
	Fixed PLW1514 (unspecified-encoding): open in text mode without explicit encoding argument
				
					
				
			This commit is contained in:
		
							parent
							
								
									a02b59d4e4
								
							
						
					
					
						commit
						0e9193651d
					
				| @ -22,7 +22,7 @@ class AuthService: | ||||
| 	def init_system_api_key(self): | ||||
| 		"""Write an API key to a local file so local processes can use the API""" | ||||
| 
 | ||||
| 		with open(self.key_path) as file: | ||||
| 		with open(self.key_path, encoding='utf-8') as file: | ||||
| 			self.key = file.read() | ||||
| 
 | ||||
| 	def authenticate(self, request, env, login_only=False, logout=False): | ||||
|  | ||||
| @ -185,7 +185,7 @@ def get_passphrase(env): | ||||
| 	# only needs to be 43 base64-characters to match AES256's key | ||||
| 	# length of 32 bytes. | ||||
| 	backup_root = os.path.join(env["STORAGE_ROOT"], 'backup') | ||||
| 	with open(os.path.join(backup_root, 'secret_key.txt')) as f: | ||||
| 	with open(os.path.join(backup_root, 'secret_key.txt'), encoding="utf-8") as f: | ||||
| 		passphrase = f.readline().strip() | ||||
| 	if len(passphrase) < 43: raise Exception("secret_key.txt's first line is too short!") | ||||
| 
 | ||||
| @ -580,7 +580,7 @@ def get_backup_config(env, for_save=False, for_ui=False): | ||||
| 
 | ||||
| 	# Merge in anything written to custom.yaml. | ||||
| 	try: | ||||
| 		with open(os.path.join(backup_root, 'custom.yaml')) as f: | ||||
| 		with open(os.path.join(backup_root, 'custom.yaml'), encoding="utf-8") as f: | ||||
| 			custom_config = rtyaml.load(f) | ||||
| 		if not isinstance(custom_config, dict): raise ValueError # caught below | ||||
| 		config.update(custom_config) | ||||
| @ -606,14 +606,14 @@ def get_backup_config(env, for_save=False, for_ui=False): | ||||
| 		config["target"] = "file://" + config["file_target_directory"] | ||||
| 	ssh_pub_key = os.path.join('/root', '.ssh', 'id_rsa_miab.pub') | ||||
| 	if os.path.exists(ssh_pub_key): | ||||
| 		with open(ssh_pub_key) as f: | ||||
| 		with open(ssh_pub_key, encoding="utf-8") as f: | ||||
| 			config["ssh_pub_key"] = f.read() | ||||
| 
 | ||||
| 	return config | ||||
| 
 | ||||
| def write_backup_config(env, newconfig): | ||||
| 	backup_root = os.path.join(env["STORAGE_ROOT"], 'backup') | ||||
| 	with open(os.path.join(backup_root, 'custom.yaml'), "w") as f: | ||||
| 	with open(os.path.join(backup_root, 'custom.yaml'), "w", encoding="utf-8") as f: | ||||
| 		f.write(rtyaml.dump(newconfig)) | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|  | ||||
| @ -47,7 +47,7 @@ def read_password(): | ||||
|     return first | ||||
| 
 | ||||
| def setup_key_auth(mgmt_uri): | ||||
| 	with open('/var/lib/mailinabox/api.key') as f: | ||||
| 	with open('/var/lib/mailinabox/api.key', encoding='utf-8') as f: | ||||
| 		key = f.read().strip() | ||||
| 
 | ||||
| 	auth_handler = urllib.request.HTTPBasicAuthHandler() | ||||
|  | ||||
| @ -36,7 +36,7 @@ except OSError: | ||||
| 
 | ||||
| # for generating CSRs we need a list of country codes | ||||
| csr_country_codes = [] | ||||
| with open(os.path.join(os.path.dirname(me), "csr_country_codes.tsv")) as f: | ||||
| with open(os.path.join(os.path.dirname(me), "csr_country_codes.tsv"), encoding="utf-8") as f: | ||||
| 	for line in f: | ||||
| 		if line.strip() == "" or line.startswith("#"): continue | ||||
| 		code, name = line.strip().split("\t")[0:2] | ||||
|  | ||||
| @ -295,7 +295,7 @@ def build_zone(domain, domain_properties, additional_records, env, is_zone=True) | ||||
| 		# Append the DKIM TXT record to the zone as generated by OpenDKIM. | ||||
| 		# Skip if the user has set a DKIM record already. | ||||
| 		opendkim_record_file = os.path.join(env['STORAGE_ROOT'], 'mail/dkim/mail.txt') | ||||
| 		with open(opendkim_record_file) as orf: | ||||
| 		with open(opendkim_record_file, encoding="utf-8") as orf: | ||||
| 			m = re.match(r'(\S+)\s+IN\s+TXT\s+\( ((?:"[^"]+"\s+)+)\)', orf.read(), re.S) | ||||
| 			val = "".join(re.findall(r'"([^"]+)"', m.group(2))) | ||||
| 			if not has_rec(m.group(1), "TXT", prefix="v=DKIM1; "): | ||||
| @ -452,7 +452,7 @@ def build_sshfp_records(): | ||||
| 	# specify that port to sshkeyscan. | ||||
| 
 | ||||
| 	port = 22 | ||||
| 	with open('/etc/ssh/sshd_config') as f: | ||||
| 	with open('/etc/ssh/sshd_config', encoding="utf-8") as f: | ||||
| 		for line in f: | ||||
| 			s = line.rstrip().split() | ||||
| 			if len(s) == 2 and s[0] == 'Port': | ||||
| @ -547,7 +547,7 @@ $TTL 86400          ; default time to live | ||||
| 		# We've signed the domain. Check if we are close to the expiration | ||||
| 		# time of the signature. If so, we'll force a bump of the serial | ||||
| 		# number so we can re-sign it. | ||||
| 		with open(zonefile + ".signed") as f: | ||||
| 		with open(zonefile + ".signed", encoding="utf-8") as f: | ||||
| 			signed_zone = f.read() | ||||
| 		expiration_times = re.findall(r"\sRRSIG\s+SOA\s+\d+\s+\d+\s\d+\s+(\d{14})", signed_zone) | ||||
| 		if len(expiration_times) == 0: | ||||
| @ -566,7 +566,7 @@ $TTL 86400          ; default time to live | ||||
| 	if os.path.exists(zonefile): | ||||
| 		# If the zone already exists, is different, and has a later serial number, | ||||
| 		# increment the number. | ||||
| 		with open(zonefile) as f: | ||||
| 		with open(zonefile, encoding="utf-8") as f: | ||||
| 			existing_zone = f.read() | ||||
| 			m = re.search(r"(\d+)\s*;\s*serial number", existing_zone) | ||||
| 			if m: | ||||
| @ -590,7 +590,7 @@ $TTL 86400          ; default time to live | ||||
| 	zone = zone.replace("__SERIAL__", serial) | ||||
| 
 | ||||
| 	# Write the zone file. | ||||
| 	with open(zonefile, "w") as f: | ||||
| 	with open(zonefile, "w", encoding="utf-8") as f: | ||||
| 		f.write(zone) | ||||
| 
 | ||||
| 	return True # file is updated | ||||
| @ -603,7 +603,7 @@ def get_dns_zonefile(zone, env): | ||||
| 		raise ValueError("%s is not a domain name that corresponds to a zone." % zone) | ||||
| 
 | ||||
| 	nsd_zonefile = "/etc/nsd/zones/" + fn | ||||
| 	with open(nsd_zonefile) as f: | ||||
| 	with open(nsd_zonefile, encoding="utf-8") as f: | ||||
| 		return f.read() | ||||
| 
 | ||||
| ######################################################################## | ||||
| @ -631,13 +631,13 @@ zone: | ||||
| 	# Check if the file is changing. If it isn't changing, | ||||
| 	# return False to flag that no change was made. | ||||
| 	if os.path.exists(nsd_conf_file): | ||||
| 		with open(nsd_conf_file) as f: | ||||
| 		with open(nsd_conf_file, encoding="utf-8") as f: | ||||
| 			if f.read() == nsdconf: | ||||
| 				return False | ||||
| 
 | ||||
| 	# Write out new contents and return True to signal that | ||||
| 	# configuration changed. | ||||
| 	with open(nsd_conf_file, "w") as f: | ||||
| 	with open(nsd_conf_file, "w", encoding="utf-8") as f: | ||||
| 		f.write(nsdconf) | ||||
| 	return True | ||||
| 
 | ||||
| @ -672,7 +672,7 @@ def hash_dnssec_keys(domain, env): | ||||
| 	for keytype, keyfn in sorted(find_dnssec_signing_keys(domain, env)): | ||||
| 		oldkeyfn = os.path.join(env['STORAGE_ROOT'], 'dns/dnssec', keyfn + ".private") | ||||
| 		keydata.extend((keytype, keyfn)) | ||||
| 		with open(oldkeyfn) as fr: | ||||
| 		with open(oldkeyfn, encoding="utf-8") as fr: | ||||
| 			keydata.append( fr.read() ) | ||||
| 	keydata = "".join(keydata).encode("utf8") | ||||
| 	return hashlib.sha1(keydata).hexdigest() | ||||
| @ -700,12 +700,12 @@ def sign_zone(domain, zonefile, env): | ||||
| 			# Use os.umask and open().write() to securely create a copy that only | ||||
| 			# we (root) can read. | ||||
| 			oldkeyfn = os.path.join(env['STORAGE_ROOT'], 'dns/dnssec', keyfn + ext) | ||||
| 			with open(oldkeyfn) as fr: | ||||
| 			with open(oldkeyfn, encoding="utf-8") as fr: | ||||
| 				keydata = fr.read() | ||||
| 			keydata = keydata.replace("_domain_", domain) | ||||
| 			prev_umask = os.umask(0o77) # ensure written file is not world-readable | ||||
| 			try: | ||||
| 				with open(newkeyfn + ext, "w") as fw: | ||||
| 				with open(newkeyfn + ext, "w", encoding="utf-8") as fw: | ||||
| 					fw.write(keydata) | ||||
| 			finally: | ||||
| 				os.umask(prev_umask) # other files we write should be world-readable | ||||
| @ -739,7 +739,7 @@ def sign_zone(domain, zonefile, env): | ||||
| 	# be used, so we'll pre-generate all for each key. One DS record per line. Only one | ||||
| 	# needs to actually be deployed at the registrar. We'll select the preferred one | ||||
| 	# in the status checks. | ||||
| 	with open("/etc/nsd/zones/" + zonefile + ".ds", "w") as f: | ||||
| 	with open("/etc/nsd/zones/" + zonefile + ".ds", "w", encoding="utf-8") as f: | ||||
| 		for key in ksk_keys: | ||||
| 			for digest_type in ('1', '2', '4'): | ||||
| 				rr_ds = shell('check_output', ["/usr/bin/ldns-key2ds", | ||||
| @ -794,12 +794,12 @@ def write_opendkim_tables(domains, env): | ||||
| 	for filename, content in config.items(): | ||||
| 		# Don't write the file if it doesn't need an update. | ||||
| 		if os.path.exists("/etc/opendkim/" + filename): | ||||
| 			with open("/etc/opendkim/" + filename) as f: | ||||
| 			with open("/etc/opendkim/" + filename, encoding="utf-8") as f: | ||||
| 				if f.read() == content: | ||||
| 					continue | ||||
| 
 | ||||
| 		# The contents needs to change. | ||||
| 		with open("/etc/opendkim/" + filename, "w") as f: | ||||
| 		with open("/etc/opendkim/" + filename, "w", encoding="utf-8") as f: | ||||
| 			f.write(content) | ||||
| 		did_update = True | ||||
| 
 | ||||
| @ -811,7 +811,7 @@ def write_opendkim_tables(domains, env): | ||||
| 
 | ||||
| def get_custom_dns_config(env, only_real_records=False): | ||||
| 	try: | ||||
| 		with open(os.path.join(env['STORAGE_ROOT'], 'dns/custom.yaml')) as f: | ||||
| 		with open(os.path.join(env['STORAGE_ROOT'], 'dns/custom.yaml'), encoding="utf-8") as f: | ||||
| 			custom_dns = rtyaml.load(f) | ||||
| 		if not isinstance(custom_dns, dict): raise ValueError # caught below | ||||
| 	except: | ||||
| @ -893,7 +893,7 @@ def write_custom_dns_config(config, env): | ||||
| 
 | ||||
| 	# Write. | ||||
| 	config_yaml = rtyaml.dump(dns) | ||||
| 	with open(os.path.join(env['STORAGE_ROOT'], 'dns/custom.yaml'), "w") as f: | ||||
| 	with open(os.path.join(env['STORAGE_ROOT'], 'dns/custom.yaml'), "w", encoding="utf-8") as f: | ||||
| 		f.write(config_yaml) | ||||
| 
 | ||||
| def set_custom_dns_record(qname, rtype, value, action, env): | ||||
|  | ||||
| @ -585,7 +585,7 @@ def scan_postfix_submission_line(date, log, collector): | ||||
| def readline(filename): | ||||
|     """ A generator that returns the lines of a file | ||||
|     """ | ||||
|     with open(filename, errors='replace') as file: | ||||
|     with open(filename, errors='replace', encoding='utf-8') as file: | ||||
|         while True: | ||||
|           line = file.readline() | ||||
|           if not line: | ||||
|  | ||||
| @ -212,7 +212,7 @@ def check_ssh_password(env, output): | ||||
| 	# the configuration file. | ||||
| 	if not os.path.exists("/etc/ssh/sshd_config"): | ||||
| 		return | ||||
| 	with open("/etc/ssh/sshd_config") as f: | ||||
| 	with open("/etc/ssh/sshd_config", encoding="utf-8") as f: | ||||
| 		sshd = f.read() | ||||
| 	if re.search("\nPasswordAuthentication\\s+yes", sshd) \ | ||||
| 		or not re.search("\nPasswordAuthentication\\s+no", sshd): | ||||
| @ -582,7 +582,7 @@ def check_dnssec(domain, env, output, dns_zonefiles, is_checking_primary=False): | ||||
| 	expected_ds_records = { } | ||||
| 	ds_file = '/etc/nsd/zones/' + dns_zonefiles[domain] + '.ds' | ||||
| 	if not os.path.exists(ds_file): return # Domain is in our database but DNS has not yet been updated. | ||||
| 	with open(ds_file) as f: | ||||
| 	with open(ds_file, encoding="utf-8") as f: | ||||
| 		for rr_ds in f: | ||||
| 			rr_ds = rr_ds.rstrip() | ||||
| 			ds_keytag, ds_alg, ds_digalg, ds_digest = rr_ds.split("\t")[4].split(" ") | ||||
| @ -591,7 +591,7 @@ def check_dnssec(domain, env, output, dns_zonefiles, is_checking_primary=False): | ||||
| 			# record that we suggest using is for the KSK (and that's how the DS records were generated). | ||||
| 			# We'll also give the nice name for the key algorithm. | ||||
| 			dnssec_keys = load_env_vars_from_file(os.path.join(env['STORAGE_ROOT'], 'dns/dnssec/%s.conf' % alg_name_map[ds_alg])) | ||||
| 			with open(os.path.join(env['STORAGE_ROOT'], 'dns/dnssec/' + dnssec_keys['KSK'] + '.key')) as f: | ||||
| 			with open(os.path.join(env['STORAGE_ROOT'], 'dns/dnssec/' + dnssec_keys['KSK'] + '.key'), encoding="utf-8") as f: | ||||
| 				dnsssec_pubkey = f.read().split("\t")[3].split(" ")[3] | ||||
| 
 | ||||
| 			expected_ds_records[ (ds_keytag, ds_alg, ds_digalg, ds_digest) ] = { | ||||
| @ -951,7 +951,7 @@ def run_and_output_changes(env, pool): | ||||
| 	# Load previously saved status checks. | ||||
| 	cache_fn = "/var/cache/mailinabox/status_checks.json" | ||||
| 	if os.path.exists(cache_fn): | ||||
| 		with open(cache_fn) as f: | ||||
| 		with open(cache_fn, encoding="utf-8") as f: | ||||
| 			try: | ||||
| 				prev = json.load(f) | ||||
| 			except json.JSONDecodeError: | ||||
| @ -1007,7 +1007,7 @@ def run_and_output_changes(env, pool): | ||||
| 
 | ||||
| 	# Store the current status checks output for next time. | ||||
| 	os.makedirs(os.path.dirname(cache_fn), exist_ok=True) | ||||
| 	with open(cache_fn, "w") as f: | ||||
| 	with open(cache_fn, "w", encoding="utf-8") as f: | ||||
| 		json.dump(cur.buf, f, indent=True) | ||||
| 
 | ||||
| def normalize_ip(ip): | ||||
|  | ||||
| @ -14,13 +14,13 @@ def load_env_vars_from_file(fn): | ||||
|     # Load settings from a KEY=VALUE file. | ||||
|     import collections | ||||
|     env = collections.OrderedDict() | ||||
|     with open(fn)  as f: | ||||
|     with open(fn, encoding="utf-8")  as f: | ||||
|         for line in f: | ||||
|             env.setdefault(*line.strip().split("=", 1)) | ||||
|     return env | ||||
| 
 | ||||
| def save_environment(env): | ||||
|     with open("/etc/mailinabox.conf", "w") as f: | ||||
|     with open("/etc/mailinabox.conf", "w", encoding="utf-8") as f: | ||||
|         for k, v in env.items(): | ||||
|             f.write(f"{k}={v}\n") | ||||
| 
 | ||||
| @ -29,14 +29,14 @@ def save_environment(env): | ||||
| def write_settings(config, env): | ||||
|     import rtyaml | ||||
|     fn = os.path.join(env['STORAGE_ROOT'], 'settings.yaml') | ||||
|     with open(fn, "w") as f: | ||||
|     with open(fn, "w", encoding="utf-8") as f: | ||||
|         f.write(rtyaml.dump(config)) | ||||
| 
 | ||||
| def load_settings(env): | ||||
|     import rtyaml | ||||
|     fn = os.path.join(env['STORAGE_ROOT'], 'settings.yaml') | ||||
|     try: | ||||
|         with open(fn) as f: | ||||
|         with open(fn, encoding="utf-8") as f: | ||||
|             config = rtyaml.load(f) | ||||
|         if not isinstance(config, dict): raise ValueError # caught below | ||||
|         return config | ||||
|  | ||||
| @ -62,7 +62,7 @@ def get_web_domains_with_root_overrides(env): | ||||
| 	root_overrides = { } | ||||
| 	nginx_conf_custom_fn = os.path.join(env["STORAGE_ROOT"], "www/custom.yaml") | ||||
| 	if os.path.exists(nginx_conf_custom_fn): | ||||
| 		with open(nginx_conf_custom_fn) as f: | ||||
| 		with open(nginx_conf_custom_fn, encoding='utf-8') as f: | ||||
| 			custom_settings = rtyaml.load(f) | ||||
| 		for domain, settings in custom_settings.items(): | ||||
| 			for type, value in [('redirect', settings.get('redirects', {}).get('/')), | ||||
| @ -77,7 +77,7 @@ def do_web_update(env): | ||||
| 
 | ||||
| 	# Helper for reading config files and templates | ||||
| 	def read_conf(conf_fn): | ||||
| 		with open(os.path.join(os.path.dirname(__file__), "../conf", conf_fn)) as f: | ||||
| 		with open(os.path.join(os.path.dirname(__file__), "../conf", conf_fn), encoding='utf-8') as f: | ||||
| 			return f.read() | ||||
| 
 | ||||
| 	# Build an nginx configuration file. | ||||
| @ -112,12 +112,12 @@ def do_web_update(env): | ||||
| 	# Did the file change? If not, don't bother writing & restarting nginx. | ||||
| 	nginx_conf_fn = "/etc/nginx/conf.d/local.conf" | ||||
| 	if os.path.exists(nginx_conf_fn): | ||||
| 		with open(nginx_conf_fn) as f: | ||||
| 		with open(nginx_conf_fn, encoding='utf-8') as f: | ||||
| 			if f.read() == nginx_conf: | ||||
| 				return "" | ||||
| 
 | ||||
| 	# Save the file. | ||||
| 	with open(nginx_conf_fn, "w") as f: | ||||
| 	with open(nginx_conf_fn, "w", encoding='utf-8') as f: | ||||
| 		f.write(nginx_conf) | ||||
| 
 | ||||
| 	# Kick nginx. Since this might be called from the web admin | ||||
| @ -155,7 +155,7 @@ def make_domain_config(domain, templates, ssl_certificates, env): | ||||
| 	hsts = "yes" | ||||
| 	nginx_conf_custom_fn = os.path.join(env["STORAGE_ROOT"], "www/custom.yaml") | ||||
| 	if os.path.exists(nginx_conf_custom_fn): | ||||
| 		with open(nginx_conf_custom_fn) as f: | ||||
| 		with open(nginx_conf_custom_fn, encoding='utf-8') as f: | ||||
| 			yaml = rtyaml.load(f) | ||||
| 		if domain in yaml: | ||||
| 			yaml = yaml[domain] | ||||
|  | ||||
| @ -212,7 +212,7 @@ def run_migrations(): | ||||
| 	migration_id_file = os.path.join(env['STORAGE_ROOT'], 'mailinabox.version') | ||||
| 	migration_id = None | ||||
| 	if os.path.exists(migration_id_file): | ||||
| 		with open(migration_id_file) as f: | ||||
| 		with open(migration_id_file, encoding='utf-8') as f: | ||||
| 			migration_id = f.read().strip() | ||||
| 
 | ||||
| 	if migration_id is None: | ||||
| @ -253,7 +253,7 @@ def run_migrations(): | ||||
| 
 | ||||
| 		# Write out our current version now. Do this sooner rather than later | ||||
| 		# in case of any problems. | ||||
| 		with open(migration_id_file, "w") as f: | ||||
| 		with open(migration_id_file, "w", encoding='utf-8') as f: | ||||
| 			f.write(str(ourver) + "\n") | ||||
| 
 | ||||
| 		# Delete the legacy location of this field. | ||||
|  | ||||
| @ -76,7 +76,7 @@ for setting in settings: | ||||
| 
 | ||||
| found = set() | ||||
| buf = "" | ||||
| with open(filename) as f: | ||||
| with open(filename, encoding="utf-8") as f: | ||||
|         input_lines = list(f) | ||||
| 
 | ||||
| while len(input_lines) > 0: | ||||
| @ -144,7 +144,7 @@ for i in range(len(settings)): | ||||
| 
 | ||||
| if not testing: | ||||
| 	# Write out the new file. | ||||
| 	with open(filename, "w") as f: | ||||
| 	with open(filename, "w", encoding="utf-8") as f: | ||||
| 		f.write(buf) | ||||
| else: | ||||
| 	# Just print the new file to stdout. | ||||
|  | ||||
| @ -38,7 +38,7 @@ for date, ip in accesses: | ||||
| # Since logs are rotated, store the statistics permanently in a JSON file. | ||||
| # Load in the stats from an existing file. | ||||
| if os.path.exists(outfn): | ||||
| 	with open(outfn) as f: | ||||
| 	with open(outfn, encoding="utf-8") as f: | ||||
| 		existing_data = json.load(f) | ||||
| 	for date, count in existing_data: | ||||
| 		if date not in by_date: | ||||
| @ -51,5 +51,5 @@ by_date = sorted(by_date.items()) | ||||
| by_date.pop(-1) | ||||
| 
 | ||||
| # Write out. | ||||
| with open(outfn, "w") as f: | ||||
| with open(outfn, "w", encoding="utf-8") as f: | ||||
| 	json.dump(by_date, f, sort_keys=True, indent=True) | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user