Decouple totp from users table by moving to totp_credentials table
* this allows implementation of other mfa schemes in the future (webauthn) * also makes key management easier and enforces one totp credentials per user on db-level
This commit is contained in:
parent
89b301afc7
commit
ee01eae55e
|
@ -9,7 +9,7 @@ import auth, utils, totp
|
||||||
from mailconfig import get_mail_users, get_mail_users_ex, get_admins, add_mail_user, set_mail_password, remove_mail_user
|
from mailconfig import get_mail_users, get_mail_users_ex, get_admins, add_mail_user, set_mail_password, remove_mail_user
|
||||||
from mailconfig import get_mail_user_privileges, add_remove_mail_user_privilege
|
from mailconfig import get_mail_user_privileges, add_remove_mail_user_privilege
|
||||||
from mailconfig import get_mail_aliases, get_mail_aliases_ex, get_mail_domains, add_mail_alias, remove_mail_alias
|
from mailconfig import get_mail_aliases, get_mail_aliases_ex, get_mail_domains, add_mail_alias, remove_mail_alias
|
||||||
from mailconfig import get_two_factor_info, set_two_factor_secret, remove_two_factor_secret
|
from mailconfig import get_mfa_state, create_totp_credential, delete_totp_credential
|
||||||
|
|
||||||
env = utils.load_environment()
|
env = utils.load_environment()
|
||||||
|
|
||||||
|
@ -410,18 +410,17 @@ def ssl_provision_certs():
|
||||||
requests = provision_certificates(env, limit_domains=None)
|
requests = provision_certificates(env, limit_domains=None)
|
||||||
return json_response({ "requests": requests })
|
return json_response({ "requests": requests })
|
||||||
|
|
||||||
# Two Factor Auth
|
# multi-factor auth
|
||||||
|
|
||||||
@app.route('/mfa/status', methods=['GET'])
|
@app.route('/mfa/status', methods=['GET'])
|
||||||
@authorized_personnel_only
|
@authorized_personnel_only
|
||||||
def two_factor_auth_get_status():
|
def two_factor_auth_get_status():
|
||||||
email, _ = auth_service.authenticate(request, env)
|
email, _ = auth_service.authenticate(request, env)
|
||||||
two_factor_secret, _ = get_two_factor_info(email, env)
|
|
||||||
|
|
||||||
if two_factor_secret != None:
|
mfa_state = get_mfa_state(email, env)
|
||||||
return json_response({
|
|
||||||
"type": 'totp'
|
if mfa_state['type'] == 'totp':
|
||||||
})
|
return json_response({ "type": 'totp' })
|
||||||
|
|
||||||
secret = totp.get_secret()
|
secret = totp.get_secret()
|
||||||
secret_url = totp.get_otp_uri(secret, email)
|
secret_url = totp.get_otp_uri(secret, email)
|
||||||
|
@ -446,7 +445,7 @@ def totp_post_enable():
|
||||||
return json_response({ "error": 'bad_input' }, 400)
|
return json_response({ "error": 'bad_input' }, 400)
|
||||||
|
|
||||||
if (totp.validate(secret, token)):
|
if (totp.validate(secret, token)):
|
||||||
set_two_factor_secret(email, secret, token, env)
|
create_totp_credential(email, secret, token, env)
|
||||||
return json_response({})
|
return json_response({})
|
||||||
|
|
||||||
return json_response({ "error": 'token_mismatch' }, 400)
|
return json_response({ "error": 'token_mismatch' }, 400)
|
||||||
|
@ -455,7 +454,7 @@ def totp_post_enable():
|
||||||
@authorized_personnel_only
|
@authorized_personnel_only
|
||||||
def totp_post_disable():
|
def totp_post_disable():
|
||||||
email, _ = auth_service.authenticate(request, env)
|
email, _ = auth_service.authenticate(request, env)
|
||||||
remove_two_factor_secret(email, env)
|
delete_totp_credential(email, env)
|
||||||
return json_response({})
|
return json_response({})
|
||||||
|
|
||||||
# WEB
|
# WEB
|
||||||
|
|
|
@ -547,38 +547,42 @@ def get_required_aliases(env):
|
||||||
|
|
||||||
return aliases
|
return aliases
|
||||||
|
|
||||||
def get_two_factor_info(email, env):
|
# multi-factor auth
|
||||||
|
|
||||||
|
def get_mfa_state(email, env):
|
||||||
c = open_database(env)
|
c = open_database(env)
|
||||||
|
c.execute('SELECT secret, mru_token FROM totp_credentials WHERE user_email=?', (email,))
|
||||||
|
|
||||||
c.execute('SELECT two_factor_secret, two_factor_last_used_token FROM users WHERE email=?', (email,))
|
credential_row = c.fetchone()
|
||||||
rows = c.fetchall()
|
if (credential_row == None):
|
||||||
if len(rows) != 1:
|
return { 'type': None }
|
||||||
raise ValueError("That's not a user (%s)." % email)
|
|
||||||
return (rows[0][0], rows[0][1])
|
|
||||||
|
|
||||||
def set_two_factor_secret(email, secret, token, env):
|
return {
|
||||||
|
'type': 'totp',
|
||||||
|
'secret': credential_row[0],
|
||||||
|
'mru_token': credential_row[1]
|
||||||
|
}
|
||||||
|
|
||||||
|
def create_totp_credential(email, secret, token, env):
|
||||||
validate_two_factor_secret(secret)
|
validate_two_factor_secret(secret)
|
||||||
|
|
||||||
conn, c = open_database(env, with_connection=True)
|
conn, c = open_database(env, with_connection=True)
|
||||||
c.execute("UPDATE users SET two_factor_secret=?, two_factor_last_used_token=? WHERE email=?", (secret, token, email))
|
c.execute('INSERT INTO totp_credentials (user_email, secret, mru_token) VALUES (?, ?, ?)', (email, secret, token))
|
||||||
|
conn.commit()
|
||||||
|
return "OK"
|
||||||
|
|
||||||
|
def set_mru_totp_code(email, token, env):
|
||||||
|
conn, c = open_database(env, with_connection=True)
|
||||||
|
c.execute('UPDATE totp_credentials SET mru_token=? WHERE user_email=?', (token, email))
|
||||||
|
|
||||||
if c.rowcount != 1:
|
if c.rowcount != 1:
|
||||||
raise ValueError("That's not a user (%s)." % email)
|
raise ValueError("That's not a user (%s)." % email)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
return "OK"
|
return "OK"
|
||||||
|
|
||||||
def set_two_factor_last_used_token(email, token, env):
|
def delete_totp_credential(email, env):
|
||||||
conn, c = open_database(env, with_connection=True)
|
conn, c = open_database(env, with_connection=True)
|
||||||
c.execute("UPDATE users SET two_factor_last_used_token=? WHERE email=?", (token, email))
|
c.execute('DELETE FROM totp_credentials WHERE user_email=?', (email,))
|
||||||
if c.rowcount != 1:
|
|
||||||
raise ValueError("That's not a user (%s)." % email)
|
|
||||||
conn.commit()
|
|
||||||
return "OK"
|
|
||||||
|
|
||||||
def remove_two_factor_secret(email, env):
|
|
||||||
conn, c = open_database(env, with_connection=True)
|
|
||||||
c.execute("UPDATE users SET two_factor_secret=null, two_factor_last_used_token=null WHERE email=?", (email,))
|
|
||||||
if c.rowcount != 1:
|
|
||||||
raise ValueError("That's not a user (%s)." % email)
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
return "OK"
|
return "OK"
|
||||||
|
|
||||||
|
|
|
@ -93,7 +93,7 @@
|
||||||
<li class="dropdown-header">Advanced Pages</li>
|
<li class="dropdown-header">Advanced Pages</li>
|
||||||
<li><a href="#custom_dns" onclick="return show_panel(this);">Custom DNS</a></li>
|
<li><a href="#custom_dns" onclick="return show_panel(this);">Custom DNS</a></li>
|
||||||
<li><a href="#external_dns" onclick="return show_panel(this);">External DNS</a></li>
|
<li><a href="#external_dns" onclick="return show_panel(this);">External DNS</a></li>
|
||||||
<li><a href="#two_factor_auth" onclick="return show_panel(this);">Two Factor Authentication</a></li>
|
<li><a href="#two_factor_auth" onclick="return show_panel(this);">Two-Factor Authentication</a></li>
|
||||||
<li><a href="/admin/munin" target="_blank">Munin Monitoring</a></li>
|
<li><a href="/admin/munin" target="_blank">Munin Monitoring</a></li>
|
||||||
</ul>
|
</ul>
|
||||||
</li>
|
</li>
|
||||||
|
|
|
@ -72,7 +72,7 @@ sudo tools/mail.py user make-admin me@{{hostname}}</pre>
|
||||||
</div>
|
</div>
|
||||||
<div class="form-group" id="loginOtp">
|
<div class="form-group" id="loginOtp">
|
||||||
<div class="col-sm-offset-3 col-sm-9">
|
<div class="col-sm-offset-3 col-sm-9">
|
||||||
<label for="loginOtpInput" class="control-label">Two Factor Code</label>
|
<label for="loginOtpInput" class="control-label">Two-Factor Code</label>
|
||||||
<input type="text" class="form-control" id="loginOtpInput" placeholder="6-digit code">
|
<input type="text" class="form-control" id="loginOtpInput" placeholder="6-digit code">
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
|
@ -31,7 +31,7 @@
|
||||||
}
|
}
|
||||||
</style>
|
</style>
|
||||||
|
|
||||||
<h2>Two Factor Authentication</h2>
|
<h2>Two-Factor Authentication</h2>
|
||||||
|
|
||||||
<div class="twofactor">
|
<div class="twofactor">
|
||||||
<div class="loading-indicator">Loading...</div>
|
<div class="loading-indicator">Loading...</div>
|
||||||
|
|
|
@ -6,7 +6,7 @@ import struct
|
||||||
import time
|
import time
|
||||||
import pyotp
|
import pyotp
|
||||||
import qrcode
|
import qrcode
|
||||||
from mailconfig import get_two_factor_info, set_two_factor_last_used_token
|
from mailconfig import get_mfa_state, set_mru_totp_code
|
||||||
|
|
||||||
def get_secret():
|
def get_secret():
|
||||||
return base64.b32encode(os.urandom(20)).decode('utf-8')
|
return base64.b32encode(os.urandom(20)).decode('utf-8')
|
||||||
|
@ -45,13 +45,13 @@ class TOTPStrategy():
|
||||||
self.email = email
|
self.email = email
|
||||||
|
|
||||||
def store_successful_login(self, token, env):
|
def store_successful_login(self, token, env):
|
||||||
return set_two_factor_last_used_token(self.email, token, env)
|
return set_mru_totp_code(self.email, token, env)
|
||||||
|
|
||||||
def validate_request(self, request, env):
|
def validate_request(self, request, env):
|
||||||
secret, mru_token = get_two_factor_info(self.email, env)
|
mfa_state = get_mfa_state(self.email, env)
|
||||||
|
|
||||||
# 2FA is not enabled, we can skip further checks
|
# 2FA is not enabled, we can skip further checks
|
||||||
if secret == "" or secret == None:
|
if mfa_state['type'] != 'totp':
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# If 2FA is enabled, raise if:
|
# If 2FA is enabled, raise if:
|
||||||
|
@ -65,7 +65,7 @@ class TOTPStrategy():
|
||||||
raise MissingTokenError("Two factor code missing (no x-auth-token supplied)")
|
raise MissingTokenError("Two factor code missing (no x-auth-token supplied)")
|
||||||
|
|
||||||
# TODO: Should a token replay be handled as its own error?
|
# TODO: Should a token replay be handled as its own error?
|
||||||
if token_header == mru_token or validate(secret, token_header) != True:
|
if token_header == mfa_state['mru_token'] or validate(mfa_state['secret'], token_header) != True:
|
||||||
raise BadTokenError("Two factor code incorrect")
|
raise BadTokenError("Two factor code incorrect")
|
||||||
|
|
||||||
self.store_successful_login(token_header, env)
|
self.store_successful_login(token_header, env)
|
||||||
|
|
|
@ -20,9 +20,10 @@ db_path=$STORAGE_ROOT/mail/users.sqlite
|
||||||
# Create an empty database if it doesn't yet exist.
|
# Create an empty database if it doesn't yet exist.
|
||||||
if [ ! -f $db_path ]; then
|
if [ ! -f $db_path ]; then
|
||||||
echo Creating new user database: $db_path;
|
echo Creating new user database: $db_path;
|
||||||
# TODO: Add migration
|
echo "CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL UNIQUE, password TEXT NOT NULL, extra, privileges TEXT NOT NULL DEFAULT '');" | sqlite3 $db_path;
|
||||||
echo "CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL UNIQUE, password TEXT NOT NULL, extra, privileges TEXT NOT NULL DEFAULT '', two_factor_secret TEXT, two_factor_last_used_token TEXT);" | sqlite3 $db_path;
|
|
||||||
echo "CREATE TABLE aliases (id INTEGER PRIMARY KEY AUTOINCREMENT, source TEXT NOT NULL UNIQUE, destination TEXT NOT NULL, permitted_senders TEXT);" | sqlite3 $db_path;
|
echo "CREATE TABLE aliases (id INTEGER PRIMARY KEY AUTOINCREMENT, source TEXT NOT NULL UNIQUE, destination TEXT NOT NULL, permitted_senders TEXT);" | sqlite3 $db_path;
|
||||||
|
# TODO: Add migration
|
||||||
|
echo "CREATE TABLE totp_credentials (id INTEGER PRIMARY KEY AUTOINCREMENT, user_email TEXT NOT NULL UNIQUE, secret TEXT NOT NULL, mru_token TEXT, FOREIGN KEY (user_email) REFERENCES users(email) ON DELETE CASCADE);" | sqlite3 $db_path;
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# ### User Authentication
|
# ### User Authentication
|
||||||
|
|
Loading…
Reference in New Issue