mirror of
https://github.com/mailcow/mailcow-dockerized.git
synced 2026-01-08 06:29:23 +00:00
Merge branch 'staging' into feat/valkey
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
FROM alpine:3.20
|
||||
FROM alpine:3.21
|
||||
|
||||
LABEL maintainer = "The Infrastructure Company GmbH <info@servercow.de>"
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
backend=iptables
|
||||
backend=nftables
|
||||
|
||||
nft list table ip filter &>/dev/null
|
||||
nftables_found=$?
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
DEBUG = False
|
||||
|
||||
import re
|
||||
import os
|
||||
import sys
|
||||
@@ -20,10 +22,13 @@ from modules.Logger import Logger
|
||||
from modules.IPTables import IPTables
|
||||
from modules.NFTables import NFTables
|
||||
|
||||
def logdebug(msg):
|
||||
if DEBUG:
|
||||
logger.logInfo("DEBUG: %s" % msg)
|
||||
|
||||
# globals
|
||||
# Globals
|
||||
WHITELIST = []
|
||||
BLACKLIST= []
|
||||
BLACKLIST = []
|
||||
bans = {}
|
||||
quit_now = False
|
||||
exit_code = 0
|
||||
@@ -33,12 +38,10 @@ r = None
|
||||
pubsub = None
|
||||
clear_before_quit = False
|
||||
|
||||
|
||||
def refreshF2boptions():
|
||||
global f2boptions
|
||||
global quit_now
|
||||
global exit_code
|
||||
|
||||
f2boptions = {}
|
||||
|
||||
if not valkey.get('F2B_OPTIONS'):
|
||||
@@ -61,15 +64,15 @@ def refreshF2boptions():
|
||||
valkey.set('F2B_OPTIONS', json.dumps(f2boptions, ensure_ascii=False))
|
||||
|
||||
def verifyF2boptions(f2boptions):
|
||||
verifyF2boption(f2boptions,'ban_time', 1800)
|
||||
verifyF2boption(f2boptions,'max_ban_time', 10000)
|
||||
verifyF2boption(f2boptions,'ban_time_increment', True)
|
||||
verifyF2boption(f2boptions,'max_attempts', 10)
|
||||
verifyF2boption(f2boptions,'retry_window', 600)
|
||||
verifyF2boption(f2boptions,'netban_ipv4', 32)
|
||||
verifyF2boption(f2boptions,'netban_ipv6', 128)
|
||||
verifyF2boption(f2boptions,'banlist_id', str(uuid.uuid4()))
|
||||
verifyF2boption(f2boptions,'manage_external', 0)
|
||||
verifyF2boption(f2boptions, 'ban_time', 1800)
|
||||
verifyF2boption(f2boptions, 'max_ban_time', 10000)
|
||||
verifyF2boption(f2boptions, 'ban_time_increment', True)
|
||||
verifyF2boption(f2boptions, 'max_attempts', 10)
|
||||
verifyF2boption(f2boptions, 'retry_window', 600)
|
||||
verifyF2boption(f2boptions, 'netban_ipv4', 32)
|
||||
verifyF2boption(f2boptions, 'netban_ipv6', 128)
|
||||
verifyF2boption(f2boptions, 'banlist_id', str(uuid.uuid4()))
|
||||
verifyF2boption(f2boptions, 'manage_external', 0)
|
||||
|
||||
def verifyF2boption(f2boptions, f2boption, f2bdefault):
|
||||
f2boptions[f2boption] = f2boptions[f2boption] if f2boption in f2boptions and f2boptions[f2boption] is not None else f2bdefault
|
||||
@@ -111,7 +114,7 @@ def get_ip(address):
|
||||
def ban(address):
|
||||
global f2boptions
|
||||
global lock
|
||||
|
||||
logdebug("ban() called with address=%s" % address)
|
||||
refreshF2boptions()
|
||||
MAX_ATTEMPTS = int(f2boptions['max_attempts'])
|
||||
RETRY_WINDOW = int(f2boptions['retry_window'])
|
||||
@@ -119,31 +122,43 @@ def ban(address):
|
||||
NETBAN_IPV6 = '/' + str(f2boptions['netban_ipv6'])
|
||||
|
||||
ip = get_ip(address)
|
||||
if not ip: return
|
||||
if not ip:
|
||||
logdebug("No valid IP -- skipping ban()")
|
||||
return
|
||||
address = str(ip)
|
||||
self_network = ipaddress.ip_network(address)
|
||||
|
||||
with lock:
|
||||
temp_whitelist = set(WHITELIST)
|
||||
if temp_whitelist:
|
||||
for wl_key in temp_whitelist:
|
||||
wl_net = ipaddress.ip_network(wl_key, False)
|
||||
if wl_net.overlaps(self_network):
|
||||
logger.logInfo('Address %s is whitelisted by rule %s' % (self_network, wl_net))
|
||||
return
|
||||
logdebug("Checking if %s overlaps with any WHITELIST entries" % self_network)
|
||||
if temp_whitelist:
|
||||
for wl_key in temp_whitelist:
|
||||
wl_net = ipaddress.ip_network(wl_key, False)
|
||||
logdebug("Checking overlap between %s and %s" % (self_network, wl_net))
|
||||
if wl_net.overlaps(self_network):
|
||||
logger.logInfo(
|
||||
'Address %s is allowlisted by rule %s' % (self_network, wl_net))
|
||||
return
|
||||
|
||||
net = ipaddress.ip_network((address + (NETBAN_IPV4 if type(ip) is ipaddress.IPv4Address else NETBAN_IPV6)), strict=False)
|
||||
net = ipaddress.ip_network(
|
||||
(address + (NETBAN_IPV4 if type(ip) is ipaddress.IPv4Address else NETBAN_IPV6)), strict=False)
|
||||
net = str(net)
|
||||
logdebug("Ban net: %s" % net)
|
||||
|
||||
if not net in bans:
|
||||
bans[net] = {'attempts': 0, 'last_attempt': 0, 'ban_counter': 0}
|
||||
logdebug("Initing new ban counter for %s" % net)
|
||||
|
||||
current_attempt = time.time()
|
||||
logdebug("Current attempt ts=%s, previous: %s, retry_window: %s" %
|
||||
(current_attempt, bans[net]['last_attempt'], RETRY_WINDOW))
|
||||
if current_attempt - bans[net]['last_attempt'] > RETRY_WINDOW:
|
||||
bans[net]['attempts'] = 0
|
||||
logdebug("Ban counter for %s reset as window expired" % net)
|
||||
|
||||
bans[net]['attempts'] += 1
|
||||
bans[net]['last_attempt'] = current_attempt
|
||||
logdebug("%s attempts now %d" % (net, bans[net]['attempts']))
|
||||
|
||||
if bans[net]['attempts'] >= MAX_ATTEMPTS:
|
||||
cur_time = int(round(time.time()))
|
||||
@@ -151,34 +166,41 @@ def ban(address):
|
||||
logger.logCrit('Banning %s for %d minutes' % (net, NET_BAN_TIME / 60 ))
|
||||
if type(ip) is ipaddress.IPv4Address and int(f2boptions['manage_external']) != 1:
|
||||
with lock:
|
||||
logdebug("Calling tables.banIPv4(%s)" % net)
|
||||
tables.banIPv4(net)
|
||||
elif int(f2boptions['manage_external']) != 1:
|
||||
with lock:
|
||||
logdebug("Calling tables.banIPv6(%s)" % net)
|
||||
tables.banIPv6(net)
|
||||
|
||||
logdebug("Updating F2B_ACTIVE_BANS[%s]=%d" %
|
||||
(net, cur_time + NET_BAN_TIME))
|
||||
valkey.hset('F2B_ACTIVE_BANS', '%s' % net, cur_time + NET_BAN_TIME)
|
||||
else:
|
||||
logger.logWarn('%d more attempts in the next %d seconds until %s is banned' % (MAX_ATTEMPTS - bans[net]['attempts'], RETRY_WINDOW, net))
|
||||
logger.logWarn('%d more attempts in the next %d seconds until %s is banned' % (
|
||||
MAX_ATTEMPTS - bans[net]['attempts'], RETRY_WINDOW, net))
|
||||
|
||||
def unban(net):
|
||||
global lock
|
||||
|
||||
logdebug("Calling unban() with net=%s" % net)
|
||||
if not net in bans:
|
||||
logger.logInfo('%s is not banned, skipping unban and deleting from queue (if any)' % net)
|
||||
valkey.hdel('F2B_QUEUE_UNBAN', '%s' % net)
|
||||
return
|
||||
|
||||
logger.logInfo(
|
||||
'%s is not banned, skipping unban and deleting from queue (if any)' % net)
|
||||
valkey.hdel('F2B_QUEUE_UNBAN', '%s' % net)
|
||||
return
|
||||
logger.logInfo('Unbanning %s' % net)
|
||||
if type(ipaddress.ip_network(net)) is ipaddress.IPv4Network:
|
||||
with lock:
|
||||
logdebug("Calling tables.unbanIPv4(%s)" % net)
|
||||
tables.unbanIPv4(net)
|
||||
else:
|
||||
with lock:
|
||||
logdebug("Calling tables.unbanIPv6(%s)" % net)
|
||||
tables.unbanIPv6(net)
|
||||
|
||||
valkey.hdel('F2B_ACTIVE_BANS', '%s' % net)
|
||||
valkey.hdel('F2B_QUEUE_UNBAN', '%s' % net)
|
||||
if net in bans:
|
||||
logdebug("Unban for %s, setting attempts=0, ban_counter+=1" % net)
|
||||
bans[net]['attempts'] = 0
|
||||
bans[net]['ban_counter'] += 1
|
||||
|
||||
@@ -204,17 +226,19 @@ def permBan(net, unban=False):
|
||||
|
||||
if is_unbanned:
|
||||
valkey.hdel('F2B_PERM_BANS', '%s' % net)
|
||||
logger.logCrit('Removed host/network %s from blacklist' % net)
|
||||
logger.logCrit('Removed host/network %s from denylist' % net)
|
||||
elif is_banned:
|
||||
valkey.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time())))
|
||||
logger.logCrit('Added host/network %s to blacklist' % net)
|
||||
logger.logCrit('Added host/network %s to denylist' % net)
|
||||
|
||||
def clear():
|
||||
global lock
|
||||
logger.logInfo('Clearing all bans')
|
||||
for net in bans.copy():
|
||||
logdebug("Unbanning net: %s" % net)
|
||||
unban(net)
|
||||
with lock:
|
||||
logdebug("Clearing IPv4/IPv6 table")
|
||||
tables.clearIPv4Table()
|
||||
tables.clearIPv6Table()
|
||||
try:
|
||||
@@ -275,21 +299,35 @@ def snat6(snat_target):
|
||||
|
||||
def autopurge():
|
||||
global f2boptions
|
||||
|
||||
logdebug("autopurge thread started")
|
||||
while not quit_now:
|
||||
logdebug("autopurge tick")
|
||||
time.sleep(10)
|
||||
refreshF2boptions()
|
||||
MAX_ATTEMPTS = int(f2boptions['max_attempts'])
|
||||
QUEUE_UNBAN = valkey.hgetall('F2B_QUEUE_UNBAN')
|
||||
logdebug("QUEUE_UNBAN: %s" % QUEUE_UNBAN)
|
||||
if QUEUE_UNBAN:
|
||||
for net in QUEUE_UNBAN:
|
||||
logdebug("Autopurge: unbanning queued net: %s" % net)
|
||||
unban(str(net))
|
||||
for net in bans.copy():
|
||||
if bans[net]['attempts'] >= MAX_ATTEMPTS:
|
||||
NET_BAN_TIME = calcNetBanTime(bans[net]['ban_counter'])
|
||||
TIME_SINCE_LAST_ATTEMPT = time.time() - bans[net]['last_attempt']
|
||||
if TIME_SINCE_LAST_ATTEMPT > NET_BAN_TIME:
|
||||
unban(net)
|
||||
# Only check expiry for actively banned IPs:
|
||||
active_bans = r.hgetall('F2B_ACTIVE_BANS')
|
||||
now = time.time()
|
||||
for net_str, expire_str in active_bans.items():
|
||||
logdebug("Checking ban expiry for (actively banned): %s" % net_str)
|
||||
# Defensive: always process if timer missing or expired
|
||||
try:
|
||||
expire = float(expire_str)
|
||||
except Exception:
|
||||
logdebug("Invalid expire time for %s; unbanning" % net_str)
|
||||
unban(net_str)
|
||||
continue
|
||||
time_left = expire - now
|
||||
logdebug("Time left for %s: %.1f seconds" % (net_str, time_left))
|
||||
if time_left <= 0:
|
||||
logdebug("Ban expired for %s" % net_str)
|
||||
unban(net_str)
|
||||
|
||||
def mailcowChainOrder():
|
||||
global lock
|
||||
@@ -359,7 +397,7 @@ def whitelistUpdate():
|
||||
with lock:
|
||||
if Counter(new_whitelist) != Counter(WHITELIST):
|
||||
WHITELIST = new_whitelist
|
||||
logger.logInfo('Whitelist was changed, it has %s entries' % len(WHITELIST))
|
||||
logger.logInfo('Allowlist was changed, it has %s entries' % len(WHITELIST))
|
||||
time.sleep(60.0 - ((time.time() - start_time) % 60.0))
|
||||
|
||||
def blacklistUpdate():
|
||||
@@ -375,7 +413,7 @@ def blacklistUpdate():
|
||||
addban = set(new_blacklist).difference(BLACKLIST)
|
||||
delban = set(BLACKLIST).difference(new_blacklist)
|
||||
BLACKLIST = new_blacklist
|
||||
logger.logInfo('Blacklist was changed, it has %s entries' % len(BLACKLIST))
|
||||
logger.logInfo('Denylist was changed, it has %s entries' % len(BLACKLIST))
|
||||
if addban:
|
||||
for net in addban:
|
||||
permBan(net=net)
|
||||
@@ -386,42 +424,43 @@ def blacklistUpdate():
|
||||
|
||||
def sigterm_quit(signum, frame):
|
||||
global clear_before_quit
|
||||
logdebug("SIGTERM received, setting clear_before_quit to True and exiting")
|
||||
clear_before_quit = True
|
||||
sys.exit(exit_code)
|
||||
|
||||
def berfore_quit():
|
||||
def before_quit():
|
||||
logdebug("before_quit called, clear_before_quit=%s" % clear_before_quit)
|
||||
if clear_before_quit:
|
||||
clear()
|
||||
if pubsub is not None:
|
||||
pubsub.unsubscribe()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
atexit.register(berfore_quit)
|
||||
logger = Logger()
|
||||
logdebug("Sys.argv: %s" % sys.argv)
|
||||
atexit.register(before_quit)
|
||||
signal.signal(signal.SIGTERM, sigterm_quit)
|
||||
|
||||
# init Logger
|
||||
logger = Logger()
|
||||
|
||||
# init backend
|
||||
backend = sys.argv[1]
|
||||
logdebug("Backend: %s" % backend)
|
||||
if backend == "nftables":
|
||||
logger.logInfo('Using NFTables backend')
|
||||
tables = NFTables(chain_name, logger)
|
||||
else:
|
||||
logger.logInfo('Using IPTables backend')
|
||||
logger.logWarn(
|
||||
"DEPRECATION: iptables-legacy is deprecated and will be removed in future releases. "
|
||||
"Please switch to nftables on your host to ensure complete compatibility."
|
||||
)
|
||||
time.sleep(5)
|
||||
tables = IPTables(chain_name, logger)
|
||||
|
||||
# In case a previous session was killed without cleanup
|
||||
clear()
|
||||
|
||||
# Reinit MAILCOW chain
|
||||
# Is called before threads start, no locking
|
||||
logger.logInfo("Initializing mailcow netfilter chain")
|
||||
tables.initChainIPv4()
|
||||
tables.initChainIPv6()
|
||||
|
||||
if os.getenv("DISABLE_NETFILTER_ISOLATION_RULE").lower() in ("y", "yes"):
|
||||
if os.getenv("DISABLE_NETFILTER_ISOLATION_RULE", "").lower() in ("y", "yes"):
|
||||
logger.logInfo(f"Skipping {chain_name} isolation")
|
||||
else:
|
||||
logger.logInfo(f"Setting {chain_name} isolation")
|
||||
@@ -432,23 +471,28 @@ if __name__ == '__main__':
|
||||
try:
|
||||
valkey_slaveof_ip = os.getenv('VALKEY_SLAVEOF_IP', '')
|
||||
valkey_slaveof_port = os.getenv('VALKEY_SLAVEOF_PORT', '')
|
||||
logdebug(
|
||||
"Connecting valkey (SLAVEOF_IP:%s, PORT:%s)" % (valkey_slaveof_ip, valkey_slaveof_port))
|
||||
if "".__eq__(valkey_slaveof_ip):
|
||||
valkey = redis.StrictRedis(host=os.getenv('IPV4_NETWORK', '172.22.1') + '.249', decode_responses=True, port=6379, db=0, password=os.environ['VALKEYPASS'])
|
||||
valkey = redis.StrictRedis(
|
||||
host=os.getenv('IPV4_NETWORK', '172.22.1') + '.249', decode_responses=True, port=6379, db=0, password=os.environ['VALKEYPASS'])
|
||||
else:
|
||||
valkey = redis.StrictRedis(host=valkey_slaveof_ip, decode_responses=True, port=valkey_slaveof_port, db=0, password=os.environ['VALKEYPASS'])
|
||||
valkey = redis.StrictRedis(
|
||||
host=valkey_slaveof_ip, decode_responses=True, port=valkey_slaveof_port, db=0, password=os.environ['VALKEYPASS'])
|
||||
valkey.ping()
|
||||
pubsub = valkey.pubsub()
|
||||
except Exception as ex:
|
||||
print('%s - trying again in 3 seconds' % (ex))
|
||||
logdebug(
|
||||
'Redis connection failed: %s - trying again in 3 seconds' % (ex))
|
||||
time.sleep(3)
|
||||
else:
|
||||
break
|
||||
logger.set_valkey(r)
|
||||
logger.set_valkey(valkey)
|
||||
logdebug("Valkey connection established, setting up F2B keys")
|
||||
|
||||
# rename fail2ban to netfilter
|
||||
if valkey.exists('F2B_LOG'):
|
||||
logdebug("Renaming F2B_LOG to NETFILTER_LOG")
|
||||
valkey.rename('F2B_LOG', 'NETFILTER_LOG')
|
||||
# clear bans in valkey
|
||||
valkey.delete('F2B_ACTIVE_BANS')
|
||||
valkey.delete('F2B_PERM_BANS')
|
||||
|
||||
@@ -463,7 +507,7 @@ if __name__ == '__main__':
|
||||
snat_ip = os.getenv('SNAT_TO_SOURCE')
|
||||
snat_ipo = ipaddress.ip_address(snat_ip)
|
||||
if type(snat_ipo) is ipaddress.IPv4Address:
|
||||
snat4_thread = Thread(target=snat4,args=(snat_ip,))
|
||||
snat4_thread = Thread(target=snat4, args=(snat_ip,))
|
||||
snat4_thread.daemon = True
|
||||
snat4_thread.start()
|
||||
except ValueError:
|
||||
@@ -499,4 +543,5 @@ if __name__ == '__main__':
|
||||
while not quit_now:
|
||||
time.sleep(0.5)
|
||||
|
||||
sys.exit(exit_code)
|
||||
logdebug("Exiting with code %s" % exit_code)
|
||||
sys.exit(exit_code)
|
||||
@@ -1,5 +1,6 @@
|
||||
import time
|
||||
import json
|
||||
import datetime
|
||||
|
||||
class Logger:
|
||||
def __init__(self):
|
||||
@@ -8,17 +9,28 @@ class Logger:
|
||||
def set_valkey(self, valkey):
|
||||
self.valkey = valkey
|
||||
|
||||
def _format_timestamp(self):
|
||||
# Local time with milliseconds
|
||||
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
def log(self, priority, message):
|
||||
tolog = {}
|
||||
tolog['time'] = int(round(time.time()))
|
||||
tolog['priority'] = priority
|
||||
tolog['message'] = message
|
||||
print(message)
|
||||
# build valkey-friendly dict
|
||||
tolog = {
|
||||
'time': int(round(time.time())), # keep raw timestamp for Valkey
|
||||
'priority': priority,
|
||||
'message': message
|
||||
}
|
||||
|
||||
# print human-readable message with timestamp
|
||||
ts = self._format_timestamp()
|
||||
print(f"{ts} {priority.upper()}: {message}", flush=True)
|
||||
|
||||
# also push JSON to Redis if connected
|
||||
if self.valkey is not None:
|
||||
try:
|
||||
self.valkey.lpush('NETFILTER_LOG', json.dumps(tolog, ensure_ascii=False))
|
||||
except Exception as ex:
|
||||
print('Failed logging to valkey: %s' % (ex))
|
||||
print(f'{ts} WARN: Failed logging to valkey: {ex}', flush=True)
|
||||
|
||||
def logWarn(self, message):
|
||||
self.log('warn', message)
|
||||
@@ -27,4 +39,4 @@ class Logger:
|
||||
self.log('crit', message)
|
||||
|
||||
def logInfo(self, message):
|
||||
self.log('info', message)
|
||||
self.log('info', message)
|
||||
Reference in New Issue
Block a user