mirror of
https://github.com/mail-in-a-box/mailinabox.git
synced 2026-03-23 18:57:23 +01:00
Merge remote-tracking branch 'fspoettel/admin-panel-2fa' into totp
# Conflicts: # management/auth.py # management/daemon.py # setup/mail-users.sh # setup/management.sh # setup/migrate.py
This commit is contained in:
@@ -1,14 +1,15 @@
|
||||
import os, os.path, re, json, time
|
||||
import subprocess
|
||||
import multiprocessing.pool, subprocess
|
||||
|
||||
from functools import wraps
|
||||
|
||||
from flask import Flask, request, render_template, abort, Response, send_from_directory, make_response
|
||||
|
||||
import auth, utils, multiprocessing.pool
|
||||
import auth, utils, totp
|
||||
from mailconfig import get_mail_users, get_mail_users_ex, get_admins, add_mail_user, set_mail_password, set_mail_display_name, remove_mail_user
|
||||
from mailconfig import get_mail_user_privileges, add_remove_mail_user_privilege
|
||||
from mailconfig import get_mail_aliases, get_mail_aliases_ex, get_mail_domains, add_mail_alias, remove_mail_alias
|
||||
from mailconfig import get_mfa_state, create_totp_credential, delete_totp_credential
|
||||
|
||||
env = utils.load_environment()
|
||||
|
||||
@@ -37,15 +38,22 @@ def authorized_personnel_only(viewfunc):
|
||||
def newview(*args, **kwargs):
|
||||
# Authenticate the passed credentials, which is either the API key or a username:password pair.
|
||||
error = None
|
||||
privs = []
|
||||
|
||||
try:
|
||||
email, privs = auth_service.authenticate(request, env)
|
||||
except ValueError as e:
|
||||
# Authentication failed.
|
||||
privs = []
|
||||
error = "Incorrect username or password"
|
||||
|
||||
except totp.MissingTokenError as e:
|
||||
error = str(e)
|
||||
except totp.BadTokenError as e:
|
||||
# Write a line in the log recording the failed login
|
||||
log_failed_login(request)
|
||||
error = str(e)
|
||||
except ValueError as e:
|
||||
# Write a line in the log recording the failed login
|
||||
log_failed_login(request)
|
||||
# Authentication failed.
|
||||
error = "Incorrect username or password"
|
||||
|
||||
# Authorized to access an API view?
|
||||
if "admin" in privs:
|
||||
@@ -83,8 +91,8 @@ def authorized_personnel_only(viewfunc):
|
||||
def unauthorized(error):
|
||||
return auth_service.make_unauthorized_response()
|
||||
|
||||
def json_response(data):
|
||||
return Response(json.dumps(data, indent=2, sort_keys=True)+'\n', status=200, mimetype='application/json')
|
||||
def json_response(data, status=200):
|
||||
return Response(json.dumps(data, indent=2, sort_keys=True)+'\n', status=status, mimetype='application/json')
|
||||
|
||||
###################################
|
||||
|
||||
@@ -118,6 +126,20 @@ def me():
|
||||
# Is the caller authorized?
|
||||
try:
|
||||
email, privs = auth_service.authenticate(request, env)
|
||||
except totp.MissingTokenError as e:
|
||||
return json_response({
|
||||
"status": "missing_token",
|
||||
"reason": str(e),
|
||||
})
|
||||
except totp.BadTokenError as e:
|
||||
# Log the failed login
|
||||
log_failed_login(request)
|
||||
|
||||
return json_response({
|
||||
"status": "bad_token",
|
||||
"reason": str(e),
|
||||
})
|
||||
|
||||
except ValueError as e:
|
||||
# Log the failed login
|
||||
log_failed_login(request)
|
||||
@@ -125,7 +147,7 @@ def me():
|
||||
return json_response({
|
||||
"status": "invalid",
|
||||
"reason": "Incorrect username or password",
|
||||
})
|
||||
})
|
||||
|
||||
resp = {
|
||||
"status": "ok",
|
||||
@@ -343,7 +365,7 @@ def ssl_get_status():
|
||||
|
||||
# What domains can we provision certificates for? What unexpected problems do we have?
|
||||
provision, cant_provision = get_certificates_to_provision(env, show_valid_certs=False)
|
||||
|
||||
|
||||
# What's the current status of TLS certificates on all of the domain?
|
||||
domains_status = get_web_domains_info(env)
|
||||
domains_status = [
|
||||
@@ -392,6 +414,51 @@ def ssl_provision_certs():
|
||||
requests = provision_certificates(env, limit_domains=None)
|
||||
return json_response({ "requests": requests })
|
||||
|
||||
# multi-factor auth
|
||||
|
||||
@app.route('/mfa/status', methods=['GET'])
|
||||
@authorized_personnel_only
|
||||
def two_factor_auth_get_status():
|
||||
email, _ = auth_service.authenticate(request, env)
|
||||
|
||||
mfa_state = get_mfa_state(email, env)
|
||||
|
||||
if mfa_state['type'] == 'totp':
|
||||
return json_response({ "type": 'totp' })
|
||||
|
||||
secret = totp.get_secret()
|
||||
secret_url = totp.get_otp_uri(secret, email)
|
||||
secret_qr = totp.get_qr_code(secret_url)
|
||||
|
||||
return json_response({
|
||||
"type": None,
|
||||
"totp_secret": secret,
|
||||
"totp_qr": secret_qr
|
||||
})
|
||||
|
||||
@app.route('/mfa/totp/enable', methods=['POST'])
|
||||
@authorized_personnel_only
|
||||
def totp_post_enable():
|
||||
email, _ = auth_service.authenticate(request, env)
|
||||
|
||||
secret = request.form.get('secret')
|
||||
token = request.form.get('token')
|
||||
|
||||
if type(secret) != str or type(token) != str or len(token) != 6 or len(secret) != 32:
|
||||
return json_response({ "error": 'bad_input' }, 400)
|
||||
|
||||
if totp.validate(secret, token):
|
||||
create_totp_credential(email, secret, token, env)
|
||||
return json_response({})
|
||||
|
||||
return json_response({ "error": 'token_mismatch' }, 400)
|
||||
|
||||
@app.route('/mfa/totp/disable', methods=['POST'])
|
||||
@authorized_personnel_only
|
||||
def totp_post_disable():
|
||||
email, _ = auth_service.authenticate(request, env)
|
||||
delete_totp_credential(email, env)
|
||||
return json_response({})
|
||||
|
||||
# WEB
|
||||
|
||||
|
||||
Reference in New Issue
Block a user