196 lines
5.2 KiB
Python
196 lines
5.2 KiB
Python
# Test that a box's fail2ban setting are working
|
|
# correctly by attempting a bunch of failed logins.
|
|
# Specify SSH login information the command line -
|
|
# we use that to reset fail2ban after each test,
|
|
# and we extract the hostname from that to open
|
|
# connections to.
|
|
######################################################################
|
|
|
|
import sys, os, time, functools
|
|
|
|
# parse command line
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Usage: tests/fail2ban.py user@hostname")
|
|
sys.exit(1)
|
|
|
|
ssh_user, hostname = sys.argv[1].split("@", 1)
|
|
|
|
# define some test types
|
|
|
|
import socket
|
|
socket.setdefaulttimeout(10)
|
|
|
|
class IsBlocked(Exception):
|
|
"""Tests raise this exception when it appears that a fail2ban
|
|
jail is in effect, i.e. on a connection refused error."""
|
|
pass
|
|
|
|
def smtp_test():
|
|
import smtplib
|
|
|
|
try:
|
|
server = smtplib.SMTP(hostname, 587)
|
|
except ConnectionRefusedError:
|
|
# looks like fail2ban worked
|
|
raise IsBlocked()
|
|
server.starttls()
|
|
server.ehlo_or_helo_if_needed()
|
|
|
|
try:
|
|
server.login("fakeuser", "fakepassword")
|
|
raise Exception("authentication didn't fail")
|
|
except smtplib.SMTPAuthenticationError:
|
|
# athentication should fail
|
|
pass
|
|
|
|
try:
|
|
server.quit()
|
|
except:
|
|
# ignore errors here
|
|
pass
|
|
|
|
def imap_test():
|
|
import imaplib
|
|
|
|
try:
|
|
M = imaplib.IMAP4_SSL(hostname)
|
|
except ConnectionRefusedError:
|
|
# looks like fail2ban worked
|
|
raise IsBlocked()
|
|
|
|
try:
|
|
M.login("fakeuser", "fakepassword")
|
|
raise Exception("authentication didn't fail")
|
|
except imaplib.IMAP4.error:
|
|
# authentication should fail
|
|
pass
|
|
finally:
|
|
M.logout() # shuts down connection, has nothing to do with login()
|
|
|
|
def http_test(url, expected_status, postdata=None, qsargs=None, auth=None):
|
|
import urllib.parse
|
|
import requests
|
|
from requests.auth import HTTPBasicAuth
|
|
|
|
# form request
|
|
url = urllib.parse.urljoin("https://" + hostname, url)
|
|
if qsargs: url += "?" + urllib.parse.urlencode(qsargs)
|
|
urlopen = requests.get if not postdata else requests.post
|
|
|
|
try:
|
|
# issue request
|
|
r = urlopen(
|
|
url,
|
|
auth=HTTPBasicAuth(*auth) if auth else None,
|
|
data=postdata,
|
|
headers={'User-Agent': 'Mail-in-a-Box fail2ban tester'},
|
|
timeout=8)
|
|
except requests.exceptions.ConnectTimeout as e:
|
|
raise IsBlocked()
|
|
except requests.exceptions.ConnectionError as e:
|
|
if "Connection refused" in str(e):
|
|
raise IsBlocked()
|
|
raise # some other unexpected condition
|
|
|
|
# return response status code
|
|
if r.status_code != expected_status:
|
|
r.raise_for_status() # anything but 200
|
|
raise IOError("Got unexpected status code %s." % r.status_code)
|
|
|
|
# define how to run a test
|
|
|
|
def restart_fail2ban_service(final=False):
|
|
# Log in over SSH to restart fail2ban.
|
|
command = "sudo fail2ban-client reload"
|
|
if not final:
|
|
# Stop recidive jails during testing.
|
|
command += " && sudo fail2ban-client stop recidive"
|
|
os.system("ssh %s@%s \"%s\"" % (ssh_user, hostname, command))
|
|
|
|
def testfunc_runner(i, testfunc, *args):
|
|
print(i+1, end=" ", flush=True)
|
|
testfunc(*args)
|
|
|
|
def run_test(testfunc, args, count, within_seconds, parallel):
|
|
# Run testfunc count times in within_seconds seconds (and actually
|
|
# within a little less time so we're sure we're under the limit).
|
|
#
|
|
# Because some services are slow, like IMAP, we can't necessarily
|
|
# run testfunc sequentially and still get to count requests within
|
|
# the required time. So we split the requests across threads.
|
|
|
|
import requests.exceptions
|
|
from multiprocessing import Pool
|
|
|
|
restart_fail2ban_service()
|
|
|
|
# Log.
|
|
print(testfunc.__name__, " ".join(str(a) for a in args), "...")
|
|
|
|
# Record the start time so we can know how to evenly space our
|
|
# calls to testfunc.
|
|
start_time = time.time()
|
|
|
|
with Pool(parallel) as p:
|
|
# Distribute the requests across the pool.
|
|
asyncresults = []
|
|
for i in range(count):
|
|
ar = p.apply_async(testfunc_runner, [i, testfunc] + list(args))
|
|
asyncresults.append(ar)
|
|
|
|
# Wait for all runs to finish.
|
|
p.close()
|
|
p.join()
|
|
|
|
# Check for errors.
|
|
for ar in asyncresults:
|
|
try:
|
|
ar.get()
|
|
except IsBlocked:
|
|
print("Test machine prematurely blocked!")
|
|
return False
|
|
|
|
# Did we make enough requests within the limit?
|
|
if (time.time()-start_time) > within_seconds:
|
|
raise Exception("Test failed to make %s requests in %d seconds." % (count, within_seconds))
|
|
|
|
# Wait a moment for the block to be put into place.
|
|
time.sleep(4)
|
|
|
|
# The next call should fail.
|
|
print("*", end=" ", flush=True)
|
|
try:
|
|
testfunc(*args)
|
|
except IsBlocked:
|
|
# Success -- this one is supposed to be refused.
|
|
print("blocked [OK]")
|
|
return True # OK
|
|
|
|
print("not blocked!")
|
|
return False
|
|
|
|
######################################################################
|
|
|
|
if __name__ == "__main__":
|
|
# run tests
|
|
|
|
# SMTP bans at 10 even though we say 20 in the config because we get
|
|
# doubled-up warnings in the logs, we'll let that be for now
|
|
run_test(smtp_test, [], 10, 30, 8)
|
|
|
|
# IMAP
|
|
run_test(imap_test, [], 20, 30, 4)
|
|
|
|
# Mail-in-a-Box control panel
|
|
run_test(http_test, ["/admin/me", 200], 20, 30, 1)
|
|
|
|
# Munin via the Mail-in-a-Box control panel
|
|
run_test(http_test, ["/admin/munin/", 401], 20, 30, 1)
|
|
|
|
# ownCloud
|
|
run_test(http_test, ["/cloud/remote.php/webdav", 401, None, None, ["aa", "aa"]], 20, 120, 1)
|
|
|
|
# restart fail2ban so that this client machine is no longer blocked
|
|
restart_fail2ban_service(final=True)
|