From 405860cac5a3a0a08bfe545299bbc755b7070353 Mon Sep 17 00:00:00 2001 From: NewbieOrange Date: Wed, 28 Jul 2021 00:39:48 +0800 Subject: [PATCH] Add token authentication for munin routes --- management/auth.py | 23 +++++++++++++++++++---- management/daemon.py | 11 ++++++++--- management/mfa.py | 6 ++++++ 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/management/auth.py b/management/auth.py index fd143c76..a36a2b90 100644 --- a/management/auth.py +++ b/management/auth.py @@ -1,6 +1,6 @@ -import base64, os, os.path, hmac, json +import base64, os, os.path, hmac, json, secrets -from flask import make_response +from expiringdict import ExpiringDict import utils from mailconfig import get_mail_password, get_mail_user_privileges @@ -16,6 +16,8 @@ class KeyAuthService: requests. The key is passed as the username field in the standard HTTP Basic Auth header. """ + token_dict = ExpiringDict(max_len=1024, max_age_seconds=600) + def __init__(self): self.auth_realm = DEFAULT_AUTH_REALM self.key = self._generate_key() @@ -74,11 +76,16 @@ class KeyAuthService: raise ValueError("Authorization header invalid.") elif username == self.key: # The user passed the master API key which grants administrative privs. - return (None, ["admin"]) + return (None, ["admin"], None) else: # The user is trying to log in with a username and either a password # (and possibly a MFA token) or a user-specific API key. - return (username, self.check_user_auth(username, password, request, env)) + token = None + privs = self.check_user_auth(username, password, request, env) + if not self.validate_user_token(username, request, env): + token = secrets.token_hex(16) + KeyAuthService.token_dict[username] = token + return (username, privs, token) def check_user_auth(self, email, pw, request, env): # Validate a user's login email address and password. If MFA is enabled, @@ -130,6 +137,14 @@ class KeyAuthService: # Return a list of privileges. return privs + def check_user_token(self, email, token, request, env): + # Check whether a token matches the one we stored for the user. + return token is not None and KeyAuthService.token_dict.get(email) == token + + def validate_user_token(self, email, request, env): + # Check whether the provided token in request cookie matches the one we stored for the user. + return self.check_user_token(email, request.cookies.get("token"), request, env) + def create_user_key(self, email, env): # Create a user API key, which is a shared secret that we can re-generate from # static information in our database. The shared secret contains the user's diff --git a/management/daemon.py b/management/daemon.py index 8490ee44..956b1cfe 100755 --- a/management/daemon.py +++ b/management/daemon.py @@ -51,7 +51,7 @@ def authorized_personnel_only(viewfunc): privs = [] try: - email, privs = auth_service.authenticate(request, env) + email, privs, _ = auth_service.authenticate(request, env) except ValueError as e: # Write a line in the log recording the failed login log_failed_login(request) @@ -135,7 +135,7 @@ def index(): def me(): # Is the caller authorized? try: - email, privs = auth_service.authenticate(request, env) + email, privs, token = auth_service.authenticate(request, env) except ValueError as e: if "missing-totp-token" in str(e): return json_response({ @@ -160,8 +160,13 @@ def me(): if "admin" in privs: resp["api_key"] = auth_service.create_user_key(email, env) + resp = json_response(resp) + # Set authentication token for admin munin routes. + if "admin" in privs and token: + resp.set_cookie("token", value=token, secure=True, httponly=True, samesite='Lax') + # Return. - return json_response(resp) + return resp # MAIL diff --git a/management/mfa.py b/management/mfa.py index 32eb5183..c4da78a2 100644 --- a/management/mfa.py +++ b/management/mfa.py @@ -110,6 +110,12 @@ def validate_auth_mfa(email, request, env): if len(mfa_state) == 0: return (True, []) + # Try token authentication first for munin routes. + if request.full_path.startswith("/munin"): + from daemon import auth_service + if auth_service.validate_user_token(email, request, env): + return (True, []) + # Try the enabled MFA modes. hints = set() for mfa_mode in mfa_state: