1
0
mirror of https://github.com/mailcow/mailcow-dockerized.git synced 2025-12-26 08:11:33 +00:00

Merge branch 'feat/mailcow-adm' into nightly

This commit is contained in:
FreddleSpl0it
2025-08-19 11:57:22 +02:00
53 changed files with 3450 additions and 78 deletions

View File

@@ -15,7 +15,7 @@ jobs:
images:
- "acme-mailcow"
- "clamd-mailcow"
- "dockerapi-mailcow"
- "controller-mailcow"
- "dovecot-mailcow"
- "netfilter-mailcow"
- "olefy-mailcow"

View File

@@ -48,11 +48,11 @@ if [[ "${SKIP_LETS_ENCRYPT}" =~ ^([yY][eE][sS]|[yY])+$ ]]; then
exec $(readlink -f "$0")
fi
log_f "Waiting for Docker API..."
until ping dockerapi -c1 > /dev/null; do
log_f "Waiting for Controller .."
until ping controller -c1 > /dev/null; do
sleep 1
done
log_f "Docker API OK"
log_f "Controller OK"
log_f "Waiting for Postfix..."
until ping postfix -c1 > /dev/null; do

View File

@@ -2,32 +2,32 @@
# Reading container IDs
# Wrapping as array to ensure trimmed content when calling $NGINX etc.
NGINX=($(curl --silent --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" | jq -rc "select( .name | tostring | contains(\"nginx-mailcow\")) | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id" | tr "\n" " "))
DOVECOT=($(curl --silent --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" | jq -rc "select( .name | tostring | contains(\"dovecot-mailcow\")) | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id" | tr "\n" " "))
POSTFIX=($(curl --silent --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" | jq -rc "select( .name | tostring | contains(\"postfix-mailcow\")) | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id" | tr "\n" " "))
NGINX=($(curl --silent --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" | jq -rc "select( .name | tostring | contains(\"nginx-mailcow\")) | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id" | tr "\n" " "))
DOVECOT=($(curl --silent --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" | jq -rc "select( .name | tostring | contains(\"dovecot-mailcow\")) | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id" | tr "\n" " "))
POSTFIX=($(curl --silent --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" | jq -rc "select( .name | tostring | contains(\"postfix-mailcow\")) | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id" | tr "\n" " "))
reload_nginx(){
echo "Reloading Nginx..."
NGINX_RELOAD_RET=$(curl -X POST --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${NGINX}/exec -d '{"cmd":"reload", "task":"nginx"}' --silent -H 'Content-type: application/json' | jq -r .type)
NGINX_RELOAD_RET=$(curl -X POST --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${NGINX}/exec -d '{"cmd":"reload", "task":"nginx"}' --silent -H 'Content-type: application/json' | jq -r .type)
[[ ${NGINX_RELOAD_RET} != 'success' ]] && { echo "Could not reload Nginx, restarting container..."; restart_container ${NGINX} ; }
}
reload_dovecot(){
echo "Reloading Dovecot..."
DOVECOT_RELOAD_RET=$(curl -X POST --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${DOVECOT}/exec -d '{"cmd":"reload", "task":"dovecot"}' --silent -H 'Content-type: application/json' | jq -r .type)
DOVECOT_RELOAD_RET=$(curl -X POST --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${DOVECOT}/exec -d '{"cmd":"reload", "task":"dovecot"}' --silent -H 'Content-type: application/json' | jq -r .type)
[[ ${DOVECOT_RELOAD_RET} != 'success' ]] && { echo "Could not reload Dovecot, restarting container..."; restart_container ${DOVECOT} ; }
}
reload_postfix(){
echo "Reloading Postfix..."
POSTFIX_RELOAD_RET=$(curl -X POST --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${POSTFIX}/exec -d '{"cmd":"reload", "task":"postfix"}' --silent -H 'Content-type: application/json' | jq -r .type)
POSTFIX_RELOAD_RET=$(curl -X POST --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${POSTFIX}/exec -d '{"cmd":"reload", "task":"postfix"}' --silent -H 'Content-type: application/json' | jq -r .type)
[[ ${POSTFIX_RELOAD_RET} != 'success' ]] && { echo "Could not reload Postfix, restarting container..."; restart_container ${POSTFIX} ; }
}
restart_container(){
for container in $*; do
echo "Restarting ${container}..."
C_REST_OUT=$(curl -X POST --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${container}/restart --silent | jq -r '.msg')
C_REST_OUT=$(curl -X POST --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${container}/restart --silent | jq -r '.msg')
echo "${C_REST_OUT}"
done
}

View File

@@ -6,22 +6,29 @@ ARG PIP_BREAK_SYSTEM_PACKAGES=1
WORKDIR /app
RUN apk add --update --no-cache python3 \
bash \
py3-pip \
openssl \
tzdata \
py3-psutil \
py3-redis \
py3-async-timeout \
supervisor \
curl \
&& pip3 install --upgrade pip \
fastapi \
uvicorn \
aiodocker \
docker
RUN mkdir /app/modules
COPY mailcow-adm/ /app/mailcow-adm/
RUN pip3 install -r /app/mailcow-adm/requirements.txt
COPY api/ /app/api/
COPY docker-entrypoint.sh /app/
COPY main.py /app/main.py
COPY modules/ /app/modules/
COPY supervisord.conf /etc/supervisor/supervisord.conf
COPY stop-supervisor.sh /usr/local/sbin/stop-supervisor.sh
ENTRYPOINT ["/bin/sh", "/app/docker-entrypoint.sh"]
CMD ["python", "main.py"]
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/supervisord.conf"]

View File

@@ -254,8 +254,8 @@ if __name__ == '__main__':
app,
host="0.0.0.0",
port=443,
ssl_certfile="/app/dockerapi_cert.pem",
ssl_keyfile="/app/dockerapi_key.pem",
ssl_certfile="/app/controller_cert.pem",
ssl_keyfile="/app/controller_key.pem",
log_level="info",
loop="none"
)

View File

@@ -0,0 +1,9 @@
#!/bin/bash
`openssl req -x509 -newkey rsa:4096 -sha256 -days 3650 -nodes \
-keyout /app/controller_key.pem \
-out /app/controller_cert.pem \
-subj /CN=controller/O=mailcow \
-addext subjectAltName=DNS:controller`
exec "$@"

View File

@@ -0,0 +1,61 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import sys
from models.AliasModel import AliasModel
from models.MailboxModel import MailboxModel
from models.SyncjobModel import SyncjobModel
from models.CalendarModel import CalendarModel
from models.MailerModel import MailerModel
from models.AddressbookModel import AddressbookModel
from models.MaildirModel import MaildirModel
from models.DomainModel import DomainModel
from models.DomainadminModel import DomainadminModel
from models.StatusModel import StatusModel
from modules.Utils import Utils
def main():
utils = Utils()
model_map = {
MailboxModel.parser_command: MailboxModel,
AliasModel.parser_command: AliasModel,
SyncjobModel.parser_command: SyncjobModel,
CalendarModel.parser_command: CalendarModel,
AddressbookModel.parser_command: AddressbookModel,
MailerModel.parser_command: MailerModel,
MaildirModel.parser_command: MaildirModel,
DomainModel.parser_command: DomainModel,
DomainadminModel.parser_command: DomainadminModel,
StatusModel.parser_command: StatusModel
}
parser = argparse.ArgumentParser(description="mailcow Admin Tool")
subparsers = parser.add_subparsers(dest="command", required=True)
for model in model_map.values():
model.add_parser(subparsers)
args = parser.parse_args()
for cmd, model_cls in model_map.items():
if args.command == cmd and model_cls.has_required_args(args):
instance = model_cls(**vars(args))
action = getattr(instance, args.object, None)
if callable(action):
res = action()
utils.pprint(res)
sys.exit(0)
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,140 @@
from modules.Sogo import Sogo
from models.BaseModel import BaseModel
class AddressbookModel(BaseModel):
parser_command = "addressbook"
required_args = {
"add": [["username", "name"]],
"delete": [["username", "name"]],
"get": [["username", "name"]],
"set_acl": [["username", "name", "sharee_email", "acl"]],
"get_acl": [["username", "name"]],
"delete_acl": [["username", "name", "sharee_email"]],
"add_contact": [["username", "name", "contact_name", "contact_email", "type"]],
"delete_contact": [["username", "name", "contact_name"]],
}
def __init__(
self,
username=None,
name=None,
sharee_email=None,
acl=None,
subscribe=None,
ics=None,
contact_name=None,
contact_email=None,
type=None,
**kwargs
):
self.sogo = Sogo(username)
self.name = name
self.acl = acl
self.sharee_email = sharee_email
self.subscribe = subscribe
self.ics = ics
self.contact_name = contact_name
self.contact_email = contact_email
self.type = type
def add(self):
"""
Add a new addressbook.
:return: Response from SOGo API.
"""
return self.sogo.addAddressbook(self.name)
def set_acl(self):
"""
Set ACL for the addressbook.
:return: Response from SOGo API.
"""
addressbook_id = self.sogo.getAddressbookIdByName(self.name)
if not addressbook_id:
print(f"Addressbook '{self.name}' not found for user '{self.username}'.")
return None
return self.sogo.setAddressbookACL(addressbook_id, self.sharee_email, self.acl, self.subscribe)
def delete_acl(self):
"""
Delete the addressbook ACL.
:return: Response from SOGo API.
"""
addressbook_id = self.sogo.getAddressbookIdByName(self.name)
if not addressbook_id:
print(f"Addressbook '{self.name}' not found for user '{self.username}'.")
return None
return self.sogo.deleteAddressbookACL(addressbook_id, self.sharee_email)
def get_acl(self):
"""
Get the ACL for the addressbook.
:return: Response from SOGo API.
"""
addressbook_id = self.sogo.getAddressbookIdByName(self.name)
if not addressbook_id:
print(f"Addressbook '{self.name}' not found for user '{self.username}'.")
return None
return self.sogo.getAddressbookACL(addressbook_id)
def add_contact(self):
"""
Add a new contact to the addressbook.
:return: Response from SOGo API.
"""
addressbook_id = self.sogo.getAddressbookIdByName(self.name)
if not addressbook_id:
print(f"Addressbook '{self.name}' not found for user '{self.username}'.")
return None
if self.type == "card":
return self.sogo.addAddressbookContact(addressbook_id, self.contact_name, self.contact_email)
elif self.type == "list":
return self.sogo.addAddressbookContactList(addressbook_id, self.contact_name, self.contact_email)
def delete_contact(self):
"""
Delete a contact or contactlist from the addressbook.
:return: Response from SOGo API.
"""
addressbook_id = self.sogo.getAddressbookIdByName(self.name)
if not addressbook_id:
print(f"Addressbook '{self.name}' not found for user '{self.username}'.")
return None
return self.sogo.deleteAddressbookItem(addressbook_id, self.contact_name)
def get(self):
"""
Retrieve addressbooks list.
:return: Response from SOGo API.
"""
return self.sogo.getAddressbookList()
def delete(self):
"""
Delete the addressbook.
:return: Response from SOGo API.
"""
addressbook_id = self.sogo.getAddressbookIdByName(self.name)
if not addressbook_id:
print(f"Addressbook '{self.name}' not found for user '{self.username}'.")
return None
return self.sogo.deleteAddressbook(addressbook_id)
@classmethod
def add_parser(cls, subparsers):
parser = subparsers.add_parser(
cls.parser_command,
help="Manage addressbooks (add, delete, get, set_acl, get_acl, delete_acl, add_contact, delete_contact)"
)
parser.add_argument("object", choices=list(cls.required_args.keys()), help="Action to perform: add, delete, get, set_acl, get_acl, delete_acl, add_contact, delete_contact")
parser.add_argument("--username", required=True, help="Username of the addressbook owner (e.g. user@example.com)")
parser.add_argument("--name", help="Addressbook name")
parser.add_argument("--sharee-email", help="Email address to share the addressbook with")
parser.add_argument("--acl", help="ACL rights for the sharee (e.g. r, w, rw)")
parser.add_argument("--subscribe", action='store_true', help="Subscribe the sharee to the addressbook")
parser.add_argument("--contact-name", help="Name of the contact or contactlist to add or delete")
parser.add_argument("--contact-email", help="Email address of the contact to add")
parser.add_argument("--type", choices=["card", "list"], help="Type of contact to add: card (single contact) or list (distribution list)")

View File

@@ -0,0 +1,107 @@
from modules.Mailcow import Mailcow
from models.BaseModel import BaseModel
class AliasModel(BaseModel):
parser_command = "alias"
required_args = {
"add": [["address", "goto"]],
"delete": [["id"]],
"get": [["id"]],
"edit": [["id"]]
}
def __init__(
self,
id=None,
address=None,
goto=None,
active=None,
sogo_visible=None,
**kwargs
):
self.mailcow = Mailcow()
self.id = id
self.address = address
self.goto = goto
self.active = active
self.sogo_visible = sogo_visible
@classmethod
def from_dict(cls, data):
return cls(
address=data.get("address"),
goto=data.get("goto"),
active=data.get("active", None),
sogo_visible=data.get("sogo_visible", None)
)
def getAdd(self):
"""
Get the alias details as a dictionary for adding, sets default values.
:return: Dictionary containing alias details.
"""
alias = {
"address": self.address,
"goto": self.goto,
"active": self.active if self.active is not None else 1,
"sogo_visible": self.sogo_visible if self.sogo_visible is not None else 0
}
return {key: value for key, value in alias.items() if value is not None}
def getEdit(self):
"""
Get the alias details as a dictionary for editing, sets no default values.
:return: Dictionary containing mailbox details.
"""
alias = {
"address": self.address,
"goto": self.goto,
"active": self.active,
"sogo_visible": self.sogo_visible
}
return {key: value for key, value in alias.items() if value is not None}
def get(self):
"""
Get the mailbox details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.getAlias(self.id)
def delete(self):
"""
Get the mailbox details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.deleteAlias(self.id)
def add(self):
"""
Get the mailbox details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.addAlias(self.getAdd())
def edit(self):
"""
Get the mailbox details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.editAlias(self.id, self.getEdit())
@classmethod
def add_parser(cls, subparsers):
parser = subparsers.add_parser(
cls.parser_command,
help="Manage aliases (add, delete, get, edit)"
)
parser.add_argument("object", choices=list(cls.required_args.keys()), help="Action to perform: add, delete, get, edit")
parser.add_argument("--id", help="Alias object ID (required for get, edit, delete)")
parser.add_argument("--address", help="Alias email address (e.g. alias@example.com)")
parser.add_argument("--goto", help="Destination address(es), comma-separated (e.g. user1@example.com,user2@example.com)")
parser.add_argument("--active", choices=["1", "0"], help="Activate (1) or deactivate (0) the alias")
parser.add_argument("--sogo-visible", choices=["1", "0"], help="Show alias in SOGo addressbook (1 = yes, 0 = no)")

View File

@@ -0,0 +1,35 @@
class BaseModel:
parser_command = ""
required_args = {}
@classmethod
def has_required_args(cls, args):
"""
Validate that all required arguments are present.
"""
object_name = args.object if hasattr(args, "object") else args.get("object")
required_lists = cls.required_args.get(object_name, False)
if not required_lists:
return False
for required_set in required_lists:
result = True
for required_args in required_set:
if isinstance(args, dict):
if not args.get(required_args):
result = False
break
elif not hasattr(args, required_args):
result = False
break
if result:
break
if not result:
print(f"Required arguments for '{object_name}': {required_lists}")
return result
@classmethod
def add_parser(cls, subparsers):
pass

View File

@@ -0,0 +1,111 @@
from modules.Sogo import Sogo
from models.BaseModel import BaseModel
class CalendarModel(BaseModel):
parser_command = "calendar"
required_args = {
"add": [["username", "name"]],
"delete": [["username", "name"]],
"get": [["username"]],
"import_ics": [["username", "name", "ics"]],
"set_acl": [["username", "name", "sharee_email", "acl"]],
"get_acl": [["username", "name"]],
"delete_acl": [["username", "name", "sharee_email"]],
}
def __init__(
self,
username=None,
name=None,
sharee_email=None,
acl=None,
subscribe=None,
ics=None,
**kwargs
):
self.sogo = Sogo(username)
self.name = name
self.acl = acl
self.sharee_email = sharee_email
self.subscribe = subscribe
self.ics = ics
def add(self):
"""
Add a new calendar.
:return: Response from SOGo API.
"""
return self.sogo.addCalendar(self.name)
def delete(self):
"""
Delete a calendar.
:return: Response from SOGo API.
"""
calendar_id = self.sogo.getCalendarIdByName(self.name)
if not calendar_id:
print(f"Calendar '{self.name}' not found for user '{self.username}'.")
return None
return self.sogo.deleteCalendar(calendar_id)
def get(self):
"""
Get the calendar details.
:return: Response from SOGo API.
"""
return self.sogo.getCalendar()
def set_acl(self):
"""
Set ACL for the calendar.
:return: Response from SOGo API.
"""
calendar_id = self.sogo.getCalendarIdByName(self.name)
if not calendar_id:
print(f"Calendar '{self.name}' not found for user '{self.username}'.")
return None
return self.sogo.setCalendarACL(calendar_id, self.sharee_email, self.acl, self.subscribe)
def delete_acl(self):
"""
Delete the calendar ACL.
:return: Response from SOGo API.
"""
calendar_id = self.sogo.getCalendarIdByName(self.name)
if not calendar_id:
print(f"Calendar '{self.name}' not found for user '{self.username}'.")
return None
return self.sogo.deleteCalendarACL(calendar_id, self.sharee_email)
def get_acl(self):
"""
Get the ACL for the calendar.
:return: Response from SOGo API.
"""
calendar_id = self.sogo.getCalendarIdByName(self.name)
if not calendar_id:
print(f"Calendar '{self.name}' not found for user '{self.username}'.")
return None
return self.sogo.getCalendarACL(calendar_id)
def import_ics(self):
"""
Import a calendar from an ICS file.
:return: Response from SOGo API.
"""
return self.sogo.importCalendar(self.name, self.ics)
@classmethod
def add_parser(cls, subparsers):
parser = subparsers.add_parser(
cls.parser_command,
help="Manage calendars (add, delete, get, import_ics, set_acl, get_acl, delete_acl)"
)
parser.add_argument("object", choices=list(cls.required_args.keys()), help="Action to perform: add, delete, get, import_ics, set_acl, get_acl, delete_acl")
parser.add_argument("--username", required=True, help="Username of the calendar owner (e.g. user@example.com)")
parser.add_argument("--name", help="Calendar name")
parser.add_argument("--ics", help="Path to ICS file for import")
parser.add_argument("--sharee-email", help="Email address to share the calendar with")
parser.add_argument("--acl", help="ACL rights for the sharee (e.g. r, w, rw)")
parser.add_argument("--subscribe", action='store_true', help="Subscribe the sharee to the calendar")

View File

@@ -0,0 +1,162 @@
from modules.Mailcow import Mailcow
from models.BaseModel import BaseModel
class DomainModel(BaseModel):
parser_command = "domain"
required_args = {
"add": [["domain"]],
"delete": [["domain"]],
"get": [["domain"]],
"edit": [["domain"]]
}
def __init__(
self,
domain=None,
active=None,
aliases=None,
backupmx=None,
defquota=None,
description=None,
mailboxes=None,
maxquota=None,
quota=None,
relay_all_recipients=None,
rl_frame=None,
rl_value=None,
restart_sogo=None,
tags=None,
**kwargs
):
self.mailcow = Mailcow()
self.domain = domain
self.active = active
self.aliases = aliases
self.backupmx = backupmx
self.defquota = defquota
self.description = description
self.mailboxes = mailboxes
self.maxquota = maxquota
self.quota = quota
self.relay_all_recipients = relay_all_recipients
self.rl_frame = rl_frame
self.rl_value = rl_value
self.restart_sogo = restart_sogo
self.tags = tags
@classmethod
def from_dict(cls, data):
return cls(
domain=data.get("domain"),
active=data.get("active", None),
aliases=data.get("aliases", None),
backupmx=data.get("backupmx", None),
defquota=data.get("defquota", None),
description=data.get("description", None),
mailboxes=data.get("mailboxes", None),
maxquota=data.get("maxquota", None),
quota=data.get("quota", None),
relay_all_recipients=data.get("relay_all_recipients", None),
rl_frame=data.get("rl_frame", None),
rl_value=data.get("rl_value", None),
restart_sogo=data.get("restart_sogo", None),
tags=data.get("tags", None)
)
def getAdd(self):
"""
Get the domain details as a dictionary for adding, sets default values.
:return: Dictionary containing domain details.
"""
domain = {
"domain": self.domain,
"active": self.active if self.active is not None else 1,
"aliases": self.aliases if self.aliases is not None else 400,
"backupmx": self.backupmx if self.backupmx is not None else 0,
"defquota": self.defquota if self.defquota is not None else 3072,
"description": self.description if self.description is not None else "",
"mailboxes": self.mailboxes if self.mailboxes is not None else 10,
"maxquota": self.maxquota if self.maxquota is not None else 10240,
"quota": self.quota if self.quota is not None else 10240,
"relay_all_recipients": self.relay_all_recipients if self.relay_all_recipients is not None else 0,
"rl_frame": self.rl_frame,
"rl_value": self.rl_value,
"restart_sogo": self.restart_sogo if self.restart_sogo is not None else 0,
"tags": self.tags if self.tags is not None else []
}
return {key: value for key, value in domain.items() if value is not None}
def getEdit(self):
"""
Get the domain details as a dictionary for editing, sets no default values.
:return: Dictionary containing domain details.
"""
domain = {
"domain": self.domain,
"active": self.active,
"aliases": self.aliases,
"backupmx": self.backupmx,
"defquota": self.defquota,
"description": self.description,
"mailboxes": self.mailboxes,
"maxquota": self.maxquota,
"quota": self.quota,
"relay_all_recipients": self.relay_all_recipients,
"rl_frame": self.rl_frame,
"rl_value": self.rl_value,
"restart_sogo": self.restart_sogo,
"tags": self.tags
}
return {key: value for key, value in domain.items() if value is not None}
def get(self):
"""
Get the domain details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.getDomain(self.domain)
def delete(self):
"""
Delete the domain from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.deleteDomain(self.domain)
def add(self):
"""
Add the domain to the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.addDomain(self.getAdd())
def edit(self):
"""
Edit the domain in the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.editDomain(self.domain, self.getEdit())
@classmethod
def add_parser(cls, subparsers):
parser = subparsers.add_parser(
cls.parser_command,
help="Manage domains (add, delete, get, edit)"
)
parser.add_argument("object", choices=list(cls.required_args.keys()), help="Action to perform: add, delete, get, edit")
parser.add_argument("--domain", required=True, help="Domain name (e.g. domain.tld)")
parser.add_argument("--active", choices=["1", "0"], help="Activate (1) or deactivate (0) the domain")
parser.add_argument("--aliases", help="Number of aliases allowed for the domain")
parser.add_argument("--backupmx", choices=["1", "0"], help="Enable (1) or disable (0) backup MX")
parser.add_argument("--defquota", help="Default quota for mailboxes in MB")
parser.add_argument("--description", help="Description of the domain")
parser.add_argument("--mailboxes", help="Number of mailboxes allowed for the domain")
parser.add_argument("--maxquota", help="Maximum quota for the domain in MB")
parser.add_argument("--quota", help="Quota used by the domain in MB")
parser.add_argument("--relay-all-recipients", choices=["1", "0"], help="Relay all recipients (1 = yes, 0 = no)")
parser.add_argument("--rl-frame", help="Rate limit frame (e.g., s, m, h)")
parser.add_argument("--rl-value", help="Rate limit value")
parser.add_argument("--restart-sogo", help="Restart SOGo after changes (1 = yes, 0 = no)")
parser.add_argument("--tags", nargs="*", help="Tags for the domain")

View File

@@ -0,0 +1,106 @@
from modules.Mailcow import Mailcow
from models.BaseModel import BaseModel
class DomainadminModel(BaseModel):
parser_command = "domainadmin"
required_args = {
"add": [["username", "domains", "password"]],
"delete": [["username"]],
"get": [["username"]],
"edit": [["username"]]
}
def __init__(
self,
username=None,
domains=None,
password=None,
active=None,
**kwargs
):
self.mailcow = Mailcow()
self.username = username
self.domains = domains
self.password = password
self.password2 = password
self.active = active
@classmethod
def from_dict(cls, data):
return cls(
username=data.get("username"),
domains=data.get("domains"),
password=data.get("password"),
password2=data.get("password"),
active=data.get("active", None),
)
def getAdd(self):
"""
Get the domain admin details as a dictionary for adding, sets default values.
:return: Dictionary containing domain admin details.
"""
domainadmin = {
"username": self.username,
"domains": self.domains,
"password": self.password,
"password2": self.password2,
"active": self.active if self.active is not None else "1"
}
return {key: value for key, value in domainadmin.items() if value is not None}
def getEdit(self):
"""
Get the domain admin details as a dictionary for editing, sets no default values.
:return: Dictionary containing domain admin details.
"""
domainadmin = {
"username": self.username,
"domains": self.domains,
"password": self.password,
"password2": self.password2,
"active": self.active
}
return {key: value for key, value in domainadmin.items() if value is not None}
def get(self):
"""
Get the domain admin details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.getDomainadmin(self.username)
def delete(self):
"""
Delete the domain admin from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.deleteDomainadmin(self.username)
def add(self):
"""
Add the domain admin to the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.addDomainadmin(self.getAdd())
def edit(self):
"""
Edit the domain admin in the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.editDomainadmin(self.username, self.getEdit())
@classmethod
def add_parser(cls, subparsers):
parser = subparsers.add_parser(
cls.parser_command,
help="Manage domain admins (add, delete, get, edit)"
)
parser.add_argument("object", choices=list(cls.required_args.keys()), help="Action to perform: add, delete, get, edit")
parser.add_argument("--username", help="Username for the domain admin")
parser.add_argument("--domains", help="Comma-separated list of domains")
parser.add_argument("--password", help="Password for the domain admin")
parser.add_argument("--active", choices=["1", "0"], help="Activate (1) or deactivate (0) the domain admin")

View File

@@ -0,0 +1,164 @@
from modules.Mailcow import Mailcow
from models.BaseModel import BaseModel
class MailboxModel(BaseModel):
parser_command = "mailbox"
required_args = {
"add": [["username", "password"]],
"delete": [["username"]],
"get": [["username"]],
"edit": [["username"]]
}
def __init__(
self,
password=None,
username=None,
domain=None,
local_part=None,
active=None,
sogo_access=None,
name=None,
authsource=None,
quota=None,
force_pw_update=None,
tls_enforce_in=None,
tls_enforce_out=None,
tags=None,
sender_acl=None,
**kwargs
):
self.mailcow = Mailcow()
if username is not None and "@" in username:
self.username = username
self.local_part, self.domain = username.split("@")
else:
self.username = f"{local_part}@{domain}"
self.local_part = local_part
self.domain = domain
self.password = password
self.password2 = password
self.active = active
self.sogo_access = sogo_access
self.name = name
self.authsource = authsource
self.quota = quota
self.force_pw_update = force_pw_update
self.tls_enforce_in = tls_enforce_in
self.tls_enforce_out = tls_enforce_out
self.tags = tags
self.sender_acl = sender_acl
@classmethod
def from_dict(cls, data):
return cls(
domain=data.get("domain"),
local_part=data.get("local_part"),
password=data.get("password"),
password2=data.get("password"),
active=data.get("active", None),
sogo_access=data.get("sogo_access", None),
name=data.get("name", None),
authsource=data.get("authsource", None),
quota=data.get("quota", None),
force_pw_update=data.get("force_pw_update", None),
tls_enforce_in=data.get("tls_enforce_in", None),
tls_enforce_out=data.get("tls_enforce_out", None),
tags=data.get("tags", None),
sender_acl=data.get("sender_acl", None)
)
def getAdd(self):
"""
Get the mailbox details as a dictionary for adding, sets default values.
:return: Dictionary containing mailbox details.
"""
mailbox = {
"domain": self.domain,
"local_part": self.local_part,
"password": self.password,
"password2": self.password2,
"active": self.active if self.active is not None else 1,
"name": self.name if self.name is not None else "",
"authsource": self.authsource if self.authsource is not None else "mailcow",
"quota": self.quota if self.quota is not None else 0,
"force_pw_update": self.force_pw_update if self.force_pw_update is not None else 0,
"tls_enforce_in": self.tls_enforce_in if self.tls_enforce_in is not None else 0,
"tls_enforce_out": self.tls_enforce_out if self.tls_enforce_out is not None else 0,
"tags": self.tags if self.tags is not None else []
}
return {key: value for key, value in mailbox.items() if value is not None}
def getEdit(self):
"""
Get the mailbox details as a dictionary for editing, sets no default values.
:return: Dictionary containing mailbox details.
"""
mailbox = {
"domain": self.domain,
"local_part": self.local_part,
"password": self.password,
"password2": self.password2,
"active": self.active,
"name": self.name,
"authsource": self.authsource,
"quota": self.quota,
"force_pw_update": self.force_pw_update,
"tls_enforce_in": self.tls_enforce_in,
"tls_enforce_out": self.tls_enforce_out,
"tags": self.tags
}
return {key: value for key, value in mailbox.items() if value is not None}
def get(self):
"""
Get the mailbox details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.getMailbox(self.username)
def delete(self):
"""
Get the mailbox details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.deleteMailbox(self.username)
def add(self):
"""
Get the mailbox details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.addMailbox(self.getAdd())
def edit(self):
"""
Get the mailbox details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.editMailbox(self.username, self.getEdit())
@classmethod
def add_parser(cls, subparsers):
parser = subparsers.add_parser(
cls.parser_command,
help="Manage mailboxes (add, delete, get, edit)"
)
parser.add_argument("object", choices=list(cls.required_args.keys()), help="Action to perform: add, delete, get, edit")
parser.add_argument("--username", help="Full email address of the mailbox (e.g. user@example.com)")
parser.add_argument("--password", help="Password for the mailbox (required for add)")
parser.add_argument("--active", choices=["1", "0"], help="Activate (1) or deactivate (0) the mailbox")
parser.add_argument("--sogo-access", choices=["1", "0"], help="Redirect mailbox to SOGo after web login (1 = yes, 0 = no)")
parser.add_argument("--name", help="Display name of the mailbox owner")
parser.add_argument("--authsource", help="Authentication source (default: mailcow)")
parser.add_argument("--quota", help="Mailbox quota in bytes (0 = unlimited)")
parser.add_argument("--force-pw-update", choices=["1", "0"], help="Force password update on next login (1 = yes, 0 = no)")
parser.add_argument("--tls-enforce-in", choices=["1", "0"], help="Enforce TLS for incoming emails (1 = yes, 0 = no)")
parser.add_argument("--tls-enforce-out", choices=["1", "0"], help="Enforce TLS for outgoing emails (1 = yes, 0 = no)")
parser.add_argument("--tags", help="Comma-separated list of tags for the mailbox")
parser.add_argument("--sender-acl", help="Comma-separated list of allowed sender addresses for this mailbox")

View File

@@ -0,0 +1,67 @@
from modules.Dovecot import Dovecot
from models.BaseModel import BaseModel
class MaildirModel(BaseModel):
parser_command = "maildir"
required_args = {
"encrypt": [],
"decrypt": [],
"restore": [["username", "item"], ["list"]]
}
def __init__(
self,
username=None,
source=None,
item=None,
overwrite=None,
list=None,
**kwargs
):
self.dovecot = Dovecot()
for key, value in kwargs.items():
setattr(self, key, value)
self.username = username
self.source = source
self.item = item
self.overwrite = overwrite
self.list = list
def encrypt(self):
"""
Encrypt the maildir for the specified user or all.
:return: Response from Dovecot.
"""
return self.dovecot.encryptMaildir(self.source_dir, self.output_dir)
def decrypt(self):
"""
Decrypt the maildir for the specified user or all.
:return: Response from Dovecot.
"""
return self.dovecot.decryptMaildir(self.source_dir, self.output_dir)
def restore(self):
"""
Restore or List maildir data for the specified user.
:return: Response from Dovecot.
"""
if self.list:
return self.dovecot.listDeletedMaildirs()
return self.dovecot.restoreMaildir(self.username, self.item)
@classmethod
def add_parser(cls, subparsers):
parser = subparsers.add_parser(
cls.parser_command,
help="Manage maildir (encrypt, decrypt, restore)"
)
parser.add_argument("object", choices=list(cls.required_args.keys()), help="Action to perform: encrypt, decrypt, restore")
parser.add_argument("--item", help="Item to restore")
parser.add_argument("--username", help="Username to restore the item to")
parser.add_argument("--list", action="store_true", help="List items to restore")
parser.add_argument("--source-dir", help="Path to the source maildir to import/encrypt/decrypt")
parser.add_argument("--output-dir", help="Directory to store encrypted/decrypted files inside the Dovecot container")

View File

@@ -0,0 +1,62 @@
import json
from models.BaseModel import BaseModel
from modules.Mailer import Mailer
class MailerModel(BaseModel):
parser_command = "mail"
required_args = {
"send": [["sender", "recipient", "subject", "body"]]
}
def __init__(
self,
sender=None,
recipient=None,
subject=None,
body=None,
context=None,
**kwargs
):
self.sender = sender
self.recipient = recipient
self.subject = subject
self.body = body
self.context = context
def send(self):
if self.context is not None:
try:
self.context = json.loads(self.context)
except json.JSONDecodeError as e:
return f"Invalid context JSON: {e}"
else:
self.context = {}
mailer = Mailer(
smtp_host="postfix-mailcow",
smtp_port=25,
username=self.sender,
password="",
use_tls=True
)
res = mailer.send_mail(
subject=self.subject,
from_addr=self.sender,
to_addrs=self.recipient.split(","),
template=self.body,
context=self.context
)
return res
@classmethod
def add_parser(cls, subparsers):
parser = subparsers.add_parser(
cls.parser_command,
help="Send emails via SMTP"
)
parser.add_argument("object", choices=list(cls.required_args.keys()), help="Action to perform: send")
parser.add_argument("--sender", required=True, help="Email sender address")
parser.add_argument("--recipient", required=True, help="Email recipient address (comma-separated for multiple)")
parser.add_argument("--subject", required=True, help="Email subject")
parser.add_argument("--body", required=True, help="Email body (Jinja2 template supported)")
parser.add_argument("--context", help="Context for Jinja2 template rendering (JSON format)")

View File

@@ -0,0 +1,45 @@
from modules.Mailcow import Mailcow
from models.BaseModel import BaseModel
class StatusModel(BaseModel):
parser_command = "status"
required_args = {
"version": [[]],
"vmail": [[]],
"containers": [[]]
}
def __init__(
self,
**kwargs
):
self.mailcow = Mailcow()
def version(self):
"""
Get the version of the mailcow instance.
:return: Response from the mailcow API.
"""
return self.mailcow.getStatusVersion()
def vmail(self):
"""
Get the vmail details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.getStatusVmail()
def containers(self):
"""
Get the status of containers in the mailcow instance.
:return: Response from the mailcow API.
"""
return self.mailcow.getStatusContainers()
@classmethod
def add_parser(cls, subparsers):
parser = subparsers.add_parser(
cls.parser_command,
help="Get information about mailcow (version, vmail, containers)"
)
parser.add_argument("object", choices=list(cls.required_args.keys()), help="Action to perform: version, vmail, containers")

View File

@@ -0,0 +1,221 @@
from modules.Mailcow import Mailcow
from models.BaseModel import BaseModel
class SyncjobModel(BaseModel):
parser_command = "syncjob"
required_args = {
"add": [["username", "host1", "port1", "user1", "password1", "enc1"]],
"delete": [["id"]],
"get": [["username"]],
"edit": [["id"]],
"run": [["id"]]
}
def __init__(
self,
id=None,
username=None,
host1=None,
port1=None,
user1=None,
password1=None,
enc1=None,
mins_interval=None,
subfolder2=None,
maxage=None,
maxbytespersecond=None,
timeout1=None,
timeout2=None,
exclude=None,
custom_parameters=None,
delete2duplicates=None,
delete1=None,
delete2=None,
automap=None,
skipcrossduplicates=None,
subscribeall=None,
active=None,
force=None,
**kwargs
):
self.mailcow = Mailcow()
for key, value in kwargs.items():
setattr(self, key, value)
self.id = id
self.username = username
self.host1 = host1
self.port1 = port1
self.user1 = user1
self.password1 = password1
self.enc1 = enc1
self.mins_interval = mins_interval
self.subfolder2 = subfolder2
self.maxage = maxage
self.maxbytespersecond = maxbytespersecond
self.timeout1 = timeout1
self.timeout2 = timeout2
self.exclude = exclude
self.custom_parameters = custom_parameters
self.delete2duplicates = delete2duplicates
self.delete1 = delete1
self.delete2 = delete2
self.automap = automap
self.skipcrossduplicates = skipcrossduplicates
self.subscribeall = subscribeall
self.active = active
self.force = force
@classmethod
def from_dict(cls, data):
return cls(
username=data.get("username"),
host1=data.get("host1"),
port1=data.get("port1"),
user1=data.get("user1"),
password1=data.get("password1"),
enc1=data.get("enc1"),
mins_interval=data.get("mins_interval", None),
subfolder2=data.get("subfolder2", None),
maxage=data.get("maxage", None),
maxbytespersecond=data.get("maxbytespersecond", None),
timeout1=data.get("timeout1", None),
timeout2=data.get("timeout2", None),
exclude=data.get("exclude", None),
custom_parameters=data.get("custom_parameters", None),
delete2duplicates=data.get("delete2duplicates", None),
delete1=data.get("delete1", None),
delete2=data.get("delete2", None),
automap=data.get("automap", None),
skipcrossduplicates=data.get("skipcrossduplicates", None),
subscribeall=data.get("subscribeall", None),
active=data.get("active", None),
)
def getAdd(self):
"""
Get the sync job details as a dictionary for adding, sets default values.
:return: Dictionary containing sync job details.
"""
syncjob = {
"username": self.username,
"host1": self.host1,
"port1": self.port1,
"user1": self.user1,
"password1": self.password1,
"enc1": self.enc1,
"mins_interval": self.mins_interval if self.mins_interval is not None else 20,
"subfolder2": self.subfolder2 if self.subfolder2 is not None else "",
"maxage": self.maxage if self.maxage is not None else 0,
"maxbytespersecond": self.maxbytespersecond if self.maxbytespersecond is not None else 0,
"timeout1": self.timeout1 if self.timeout1 is not None else 600,
"timeout2": self.timeout2 if self.timeout2 is not None else 600,
"exclude": self.exclude if self.exclude is not None else "(?i)spam|(?i)junk",
"custom_parameters": self.custom_parameters if self.custom_parameters is not None else "",
"delete2duplicates": 1 if self.delete2duplicates else 0,
"delete1": 1 if self.delete1 else 0,
"delete2": 1 if self.delete2 else 0,
"automap": 1 if self.automap else 0,
"skipcrossduplicates": 1 if self.skipcrossduplicates else 0,
"subscribeall": 1 if self.subscribeall else 0,
"active": 1 if self.active else 0
}
return {key: value for key, value in syncjob.items() if value is not None}
def getEdit(self):
"""
Get the sync job details as a dictionary for editing, sets no default values.
:return: Dictionary containing sync job details.
"""
syncjob = {
"username": self.username,
"host1": self.host1,
"port1": self.port1,
"user1": self.user1,
"password1": self.password1,
"enc1": self.enc1,
"mins_interval": self.mins_interval,
"subfolder2": self.subfolder2,
"maxage": self.maxage,
"maxbytespersecond": self.maxbytespersecond,
"timeout1": self.timeout1,
"timeout2": self.timeout2,
"exclude": self.exclude,
"custom_parameters": self.custom_parameters,
"delete2duplicates": self.delete2duplicates,
"delete1": self.delete1,
"delete2": self.delete2,
"automap": self.automap,
"skipcrossduplicates": self.skipcrossduplicates,
"subscribeall": self.subscribeall,
"active": self.active
}
return {key: value for key, value in syncjob.items() if value is not None}
def get(self):
"""
Get the sync job details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.getSyncjob(self.username)
def delete(self):
"""
Get the sync job details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.deleteSyncjob(self.id)
def add(self):
"""
Get the sync job details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.addSyncjob(self.getAdd())
def edit(self):
"""
Get the sync job details from the mailcow API.
:return: Response from the mailcow API.
"""
return self.mailcow.editSyncjob(self.id, self.getEdit())
def run(self):
"""
Run the sync job.
:return: Response from the mailcow API.
"""
return self.mailcow.runSyncjob(self.id, force=self.force)
@classmethod
def add_parser(cls, subparsers):
parser = subparsers.add_parser(
cls.parser_command,
help="Manage sync jobs (add, delete, get, edit)"
)
parser.add_argument("object", choices=list(cls.required_args.keys()), help="Action to perform: add, delete, get, edit")
parser.add_argument("--id", help="Syncjob object ID (required for edit, delete, run)")
parser.add_argument("--username", help="Target mailbox username (e.g. user@example.com)")
parser.add_argument("--host1", help="Source IMAP server hostname")
parser.add_argument("--port1", help="Source IMAP server port")
parser.add_argument("--user1", help="Source IMAP account username")
parser.add_argument("--password1", help="Source IMAP account password")
parser.add_argument("--enc1", choices=["PLAIN", "SSL", "TLS"], help="Encryption for source server connection")
parser.add_argument("--mins-interval", help="Sync interval in minutes (default: 20)")
parser.add_argument("--subfolder2", help="Destination subfolder (default: empty)")
parser.add_argument("--maxage", help="Maximum mail age in days (default: 0 = unlimited)")
parser.add_argument("--maxbytespersecond", help="Maximum bandwidth in bytes/sec (default: 0 = unlimited)")
parser.add_argument("--timeout1", help="Timeout for source server in seconds (default: 600)")
parser.add_argument("--timeout2", help="Timeout for destination server in seconds (default: 600)")
parser.add_argument("--exclude", help="Regex pattern to exclude folders (default: (?i)spam|(?i)junk)")
parser.add_argument("--custom-parameters", help="Additional imapsync parameters")
parser.add_argument("--delete2duplicates", choices=["1", "0"], help="Delete duplicates on destination (1 = yes, 0 = no)")
parser.add_argument("--del1", choices=["1", "0"], help="Delete mails on source after sync (1 = yes, 0 = no)")
parser.add_argument("--del2", choices=["1", "0"], help="Delete mails on destination after sync (1 = yes, 0 = no)")
parser.add_argument("--automap", choices=["1", "0"], help="Enable folder automapping (1 = yes, 0 = no)")
parser.add_argument("--skipcrossduplicates", choices=["1", "0"], help="Skip cross-account duplicates (1 = yes, 0 = no)")
parser.add_argument("--subscribeall", choices=["1", "0"], help="Subscribe to all folders (1 = yes, 0 = no)")
parser.add_argument("--active", choices=["1", "0"], help="Activate syncjob (1 = yes, 0 = no)")
parser.add_argument("--force", action="store_true", help="Force the syncjob to run even if it is not active")

View File

@@ -0,0 +1,128 @@
import docker
from docker.errors import APIError
class Docker:
def __init__(self):
self.client = docker.from_env()
def exec_command(self, container_name, cmd, user=None):
"""
Execute a command in a container by its container name.
:param container_name: The name of the container.
:param cmd: The command to execute as a list (e.g., ["ls", "-la"]).
:param user: The user to execute the command as (optional).
:return: A standardized response with status, output, and exit_code.
"""
filters = {"name": container_name}
try:
for container in self.client.containers.list(filters=filters):
exec_result = container.exec_run(cmd, user=user)
return {
"status": "success",
"exit_code": exec_result.exit_code,
"output": exec_result.output.decode("utf-8")
}
except APIError as e:
return {
"status": "error",
"exit_code": "APIError",
"output": str(e)
}
except Exception as e:
return {
"status": "error",
"exit_code": "Exception",
"output": str(e)
}
def start_container(self, container_name):
"""
Start a container by its container name.
:param container_name: The name of the container.
:return: A standardized response with status, output, and exit_code.
"""
filters = {"name": container_name}
try:
for container in self.client.containers.list(filters=filters):
container.start()
return {
"status": "success",
"exit_code": "0",
"output": f"Container '{container_name}' started successfully."
}
except APIError as e:
return {
"status": "error",
"exit_code": "APIError",
"output": str(e)
}
except Exception as e:
return {
"status": "error",
"error_type": "Exception",
"output": str(e)
}
def stop_container(self, container_name):
"""
Stop a container by its container name.
:param container_name: The name of the container.
:return: A standardized response with status, output, and exit_code.
"""
filters = {"name": container_name}
try:
for container in self.client.containers.list(filters=filters):
container.stop()
return {
"status": "success",
"exit_code": "0",
"output": f"Container '{container_name}' stopped successfully."
}
except APIError as e:
return {
"status": "error",
"exit_code": "APIError",
"output": str(e)
}
except Exception as e:
return {
"status": "error",
"exit_code": "Exception",
"output": str(e)
}
def restart_container(self, container_name):
"""
Restart a container by its container name.
:param container_name: The name of the container.
:return: A standardized response with status, output, and exit_code.
"""
filters = {"name": container_name}
try:
for container in self.client.containers.list(filters=filters):
container.restart()
return {
"status": "success",
"exit_code": "0",
"output": f"Container '{container_name}' restarted successfully."
}
except APIError as e:
return {
"status": "error",
"exit_code": "APIError",
"output": str(e)
}
except Exception as e:
return {
"status": "error",
"exit_code": "Exception",
"output": str(e)
}

View File

@@ -0,0 +1,206 @@
import os
from modules.Docker import Docker
class Dovecot:
def __init__(self):
self.docker = Docker()
def decryptMaildir(self, source_dir="/var/vmail/", output_dir=None):
"""
Decrypt files in /var/vmail using doveadm if they are encrypted.
:param output_dir: Directory inside the Dovecot container to store decrypted files, Default overwrite.
"""
private_key = "/mail_crypt/ecprivkey.pem"
public_key = "/mail_crypt/ecpubkey.pem"
if output_dir:
# Ensure the output directory exists inside the container
mkdir_result = self.docker.exec_command("dovecot-mailcow", f"bash -c 'mkdir -p {output_dir} && chown vmail:vmail {output_dir}'")
if mkdir_result.get("status") != "success":
print(f"Error creating output directory: {mkdir_result.get('output')}")
return
find_command = [
"find", source_dir, "-type", "f", "-regextype", "egrep", "-regex", ".*S=.*W=.*"
]
try:
find_result = self.docker.exec_command("dovecot-mailcow", " ".join(find_command))
if find_result.get("status") != "success":
print(f"Error finding files: {find_result.get('output')}")
return
files = find_result.get("output", "").splitlines()
for file in files:
head_command = f"head -c7 {file}"
head_result = self.docker.exec_command("dovecot-mailcow", head_command)
if head_result.get("status") == "success" and head_result.get("output", "").strip() == "CRYPTED":
if output_dir:
# Preserve the directory structure in the output directory
relative_path = os.path.relpath(file, source_dir)
output_file = os.path.join(output_dir, relative_path)
current_path = output_dir
for part in os.path.dirname(relative_path).split(os.sep):
current_path = os.path.join(current_path, part)
mkdir_result = self.docker.exec_command("dovecot-mailcow", f"bash -c '[ ! -d {current_path} ] && mkdir {current_path} && chown vmail:vmail {current_path}'")
if mkdir_result.get("status") != "success":
print(f"Error creating directory {current_path}: {mkdir_result.get('output')}")
continue
else:
# Overwrite the original file
output_file = file
decrypt_command = (
f"bash -c 'doveadm fs get compress lz4:1:crypt:private_key_path={private_key}:public_key_path={public_key}:posix:prefix=/ {file} > {output_file}'"
)
decrypt_result = self.docker.exec_command("dovecot-mailcow", decrypt_command)
if decrypt_result.get("status") == "success":
print(f"Decrypted {file}")
# Verify the file size and set permissions
size_check_command = f"bash -c '[ -s {output_file} ] && chmod 600 {output_file} && chown vmail:vmail {output_file} || rm -f {output_file}'"
size_check_result = self.docker.exec_command("dovecot-mailcow", size_check_command)
if size_check_result.get("status") != "success":
print(f"Error setting permissions for {output_file}: {size_check_result.get('output')}\n")
except Exception as e:
print(f"Error during decryption: {e}")
return "Done"
def encryptMaildir(self, source_dir="/var/vmail/", output_dir=None):
"""
Encrypt files in /var/vmail using doveadm if they are not already encrypted.
:param source_dir: Directory inside the Dovecot container to encrypt files.
:param output_dir: Directory inside the Dovecot container to store encrypted files, Default overwrite.
"""
private_key = "/mail_crypt/ecprivkey.pem"
public_key = "/mail_crypt/ecpubkey.pem"
if output_dir:
# Ensure the output directory exists inside the container
mkdir_result = self.docker.exec_command("dovecot-mailcow", f"mkdir -p {output_dir}")
if mkdir_result.get("status") != "success":
print(f"Error creating output directory: {mkdir_result.get('output')}")
return
find_command = [
"find", source_dir, "-type", "f", "-regextype", "egrep", "-regex", ".*S=.*W=.*"
]
try:
find_result = self.docker.exec_command("dovecot-mailcow", " ".join(find_command))
if find_result.get("status") != "success":
print(f"Error finding files: {find_result.get('output')}")
return
files = find_result.get("output", "").splitlines()
for file in files:
head_command = f"head -c7 {file}"
head_result = self.docker.exec_command("dovecot-mailcow", head_command)
if head_result.get("status") == "success" and head_result.get("output", "").strip() != "CRYPTED":
if output_dir:
# Preserve the directory structure in the output directory
relative_path = os.path.relpath(file, source_dir)
output_file = os.path.join(output_dir, relative_path)
current_path = output_dir
for part in os.path.dirname(relative_path).split(os.sep):
current_path = os.path.join(current_path, part)
mkdir_result = self.docker.exec_command("dovecot-mailcow", f"bash -c '[ ! -d {current_path} ] && mkdir {current_path} && chown vmail:vmail {current_path}'")
if mkdir_result.get("status") != "success":
print(f"Error creating directory {current_path}: {mkdir_result.get('output')}")
continue
else:
# Overwrite the original file
output_file = file
encrypt_command = (
f"bash -c 'doveadm fs put crypt private_key_path={private_key}:public_key_path={public_key}:posix:prefix=/ {file} {output_file}'"
)
encrypt_result = self.docker.exec_command("dovecot-mailcow", encrypt_command)
if encrypt_result.get("status") == "success":
print(f"Encrypted {file}")
# Set permissions
permissions_command = f"bash -c 'chmod 600 {output_file} && chown 5000:5000 {output_file}'"
permissions_result = self.docker.exec_command("dovecot-mailcow", permissions_command)
if permissions_result.get("status") != "success":
print(f"Error setting permissions for {output_file}: {permissions_result.get('output')}\n")
except Exception as e:
print(f"Error during encryption: {e}")
return "Done"
def listDeletedMaildirs(self, source_dir="/var/vmail/_garbage"):
"""
List deleted maildirs in the specified garbage directory.
:param source_dir: Directory to search for deleted maildirs.
:return: List of maildirs.
"""
list_command = ["bash", "-c", f"ls -la {source_dir}"]
try:
result = self.docker.exec_command("dovecot-mailcow", list_command)
if result.get("status") != "success":
print(f"Error listing deleted maildirs: {result.get('output')}")
return []
lines = result.get("output", "").splitlines()
maildirs = {}
for idx, line in enumerate(lines):
parts = line.split()
if "_" in line:
folder_name = parts[-1]
time, maildir = folder_name.split("_", 1)
if maildir.endswith("_index"):
main_item = maildir[:-6]
if main_item in maildirs:
maildirs[main_item]["has_index"] = True
else:
maildirs[maildir] = {"item": idx, "time": time, "name": maildir, "has_index": False}
return list(maildirs.values())
except Exception as e:
print(f"Error during listing deleted maildirs: {e}")
return []
def restoreMaildir(self, username, item, source_dir="/var/vmail/_garbage"):
"""
Restore a maildir item for a specific user from the deleted maildirs.
:param username: Username to restore the item to.
:param item: Item to restore (e.g., mailbox, folder).
:param source_dir: Directory containing deleted maildirs.
:return: Response from Dovecot.
"""
username_splitted = username.split("@")
maildirs = self.listDeletedMaildirs()
maildir = None
for mdir in maildirs:
if mdir["item"] == int(item):
maildir = mdir
break
if not maildir:
return {"status": "error", "message": "Maildir not found."}
restore_command = f"mv {source_dir}/{maildir['time']}_{maildir['name']} /var/vmail/{username_splitted[1]}/{username_splitted[0]}"
restore_index_command = f"mv {source_dir}/{maildir['time']}_{maildir['name']}_index /var/vmail_index/{username}"
result = self.docker.exec_command("dovecot-mailcow", ["bash", "-c", restore_command])
if result.get("status") != "success":
return {"status": "error", "message": "Failed to restore maildir."}
result = self.docker.exec_command("dovecot-mailcow", ["bash", "-c", restore_index_command])
if result.get("status") != "success":
return {"status": "error", "message": "Failed to restore maildir index."}
return "Done"

View File

@@ -0,0 +1,457 @@
import requests
import urllib3
import sys
import os
import subprocess
import tempfile
import mysql.connector
from contextlib import contextmanager
from datetime import datetime
from modules.Docker import Docker
class Mailcow:
def __init__(self):
self.apiUrl = "/api/v1"
self.ignore_ssl_errors = True
self.baseUrl = f"https://{os.getenv('IPv4_NETWORK', '172.22.1')}.247:{os.getenv('HTTPS_PORT', '443')}"
self.host = os.getenv("MAILCOW_HOSTNAME", "")
self.apiKey = ""
if self.ignore_ssl_errors:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.db_config = {
'user': os.getenv('DBUSER'),
'password': os.getenv('DBPASS'),
'database': os.getenv('DBNAME'),
'unix_socket': '/var/run/mysqld/mysqld.sock',
}
self.docker = Docker()
# API Functions
def addDomain(self, domain):
"""
Add a domain to the mailcow instance.
:param domain: Dictionary containing domain details.
:return: Response from the mailcow API.
"""
return self.post('/add/domain', domain)
def addMailbox(self, mailbox):
"""
Add a mailbox to the mailcow instance.
:param mailbox: Dictionary containing mailbox details.
:return: Response from the mailcow API.
"""
return self.post('/add/mailbox', mailbox)
def addAlias(self, alias):
"""
Add an alias to the mailcow instance.
:param alias: Dictionary containing alias details.
:return: Response from the mailcow API.
"""
return self.post('/add/alias', alias)
def addSyncjob(self, syncjob):
"""
Add a sync job to the mailcow instance.
:param syncjob: Dictionary containing sync job details.
:return: Response from the mailcow API.
"""
return self.post('/add/syncjob', syncjob)
def addDomainadmin(self, domainadmin):
"""
Add a domain admin to the mailcow instance.
:param domainadmin: Dictionary containing domain admin details.
:return: Response from the mailcow API.
"""
return self.post('/add/domain-admin', domainadmin)
def deleteDomain(self, domain):
"""
Delete a domain from the mailcow instance.
:param domain: Name of the domain to delete.
:return: Response from the mailcow API.
"""
items = [domain]
return self.post('/delete/domain', items)
def deleteAlias(self, id):
"""
Delete an alias from the mailcow instance.
:param id: ID of the alias to delete.
:return: Response from the mailcow API.
"""
items = [id]
return self.post('/delete/alias', items)
def deleteSyncjob(self, id):
"""
Delete a sync job from the mailcow instance.
:param id: ID of the sync job to delete.
:return: Response from the mailcow API.
"""
items = [id]
return self.post('/delete/syncjob', items)
def deleteMailbox(self, mailbox):
"""
Delete a mailbox from the mailcow instance.
:param mailbox: Name of the mailbox to delete.
:return: Response from the mailcow API.
"""
items = [mailbox]
return self.post('/delete/mailbox', items)
def deleteDomainadmin(self, username):
"""
Delete a domain admin from the mailcow instance.
:param username: Username of the domain admin to delete.
:return: Response from the mailcow API.
"""
items = [username]
return self.post('/delete/domain-admin', items)
def post(self, endpoint, data):
"""
Make a POST request to the mailcow API.
:param endpoint: The API endpoint to post to.
:param data: Data to be sent in the POST request.
:return: Response from the mailcow API.
"""
url = f"{self.baseUrl}{self.apiUrl}/{endpoint.lstrip('/')}"
headers = {
"Content-Type": "application/json",
"Host": self.host
}
if self.apiKey:
headers["X-Api-Key"] = self.apiKey
response = requests.post(
url,
json=data,
headers=headers,
verify=not self.ignore_ssl_errors
)
response.raise_for_status()
return response.json()
def getDomain(self, domain):
"""
Get a domain from the mailcow instance.
:param domain: Name of the domain to get.
:return: Response from the mailcow API.
"""
return self.get(f'/get/domain/{domain}')
def getMailbox(self, username):
"""
Get a mailbox from the mailcow instance.
:param mailbox: Dictionary containing mailbox details (e.g. {"username": "user@example.com"})
:return: Response from the mailcow API.
"""
return self.get(f'/get/mailbox/{username}')
def getAlias(self, id):
"""
Get an alias from the mailcow instance.
:param alias: Dictionary containing alias details (e.g. {"address": "alias@example.com"})
:return: Response from the mailcow API.
"""
return self.get(f'/get/alias/{id}')
def getSyncjob(self, id):
"""
Get a sync job from the mailcow instance.
:param syncjob: Dictionary containing sync job details (e.g. {"id": "123"})
:return: Response from the mailcow API.
"""
return self.get(f'/get/syncjobs/{id}')
def getDomainadmin(self, username):
"""
Get a domain admin from the mailcow instance.
:param username: Username of the domain admin to get.
:return: Response from the mailcow API.
"""
return self.get(f'/get/domain-admin/{username}')
def getStatusVersion(self):
"""
Get the version of the mailcow instance.
:return: Response from the mailcow API.
"""
return self.get('/get/status/version')
def getStatusVmail(self):
"""
Get the vmail status from the mailcow instance.
:return: Response from the mailcow API.
"""
return self.get('/get/status/vmail')
def getStatusContainers(self):
"""
Get the status of containers from the mailcow instance.
:return: Response from the mailcow API.
"""
return self.get('/get/status/containers')
def get(self, endpoint, params=None):
"""
Make a GET request to the mailcow API.
:param endpoint: The API endpoint to get from.
:param params: Parameters to be sent in the GET request.
:return: Response from the mailcow API.
"""
url = f"{self.baseUrl}{self.apiUrl}/{endpoint.lstrip('/')}"
headers = {
"Content-Type": "application/json",
"Host": self.host
}
if self.apiKey:
headers["X-Api-Key"] = self.apiKey
response = requests.get(
url,
params=params,
headers=headers,
verify=not self.ignore_ssl_errors
)
response.raise_for_status()
return response.json()
def editDomain(self, domain, attributes):
"""
Edit an existing domain in the mailcow instance.
:param domain: Name of the domain to edit
:param attributes: Dictionary containing the new domain attributes.
"""
items = [domain]
return self.edit('/edit/domain', items, attributes)
def editMailbox(self, mailbox, attributes):
"""
Edit an existing mailbox in the mailcow instance.
:param mailbox: Name of the mailbox to edit
:param attributes: Dictionary containing the new mailbox attributes.
"""
items = [mailbox]
return self.edit('/edit/mailbox', items, attributes)
def editAlias(self, alias, attributes):
"""
Edit an existing alias in the mailcow instance.
:param alias: Name of the alias to edit
:param attributes: Dictionary containing the new alias attributes.
"""
items = [alias]
return self.edit('/edit/alias', items, attributes)
def editSyncjob(self, syncjob, attributes):
"""
Edit an existing sync job in the mailcow instance.
:param syncjob: Name of the sync job to edit
:param attributes: Dictionary containing the new sync job attributes.
"""
items = [syncjob]
return self.edit('/edit/syncjob', items, attributes)
def editDomainadmin(self, username, attributes):
"""
Edit an existing domain admin in the mailcow instance.
:param username: Username of the domain admin to edit
:param attributes: Dictionary containing the new domain admin attributes.
"""
items = [username]
return self.edit('/edit/domain-admin', items, attributes)
def edit(self, endpoint, items, attributes):
"""
Make a POST request to edit items in the mailcow API.
:param items: List of items to edit.
:param attributes: Dictionary containing the new attributes for the items.
:return: Response from the mailcow API.
"""
url = f"{self.baseUrl}{self.apiUrl}/{endpoint.lstrip('/')}"
headers = {
"Content-Type": "application/json",
"Host": self.host
}
if self.apiKey:
headers["X-Api-Key"] = self.apiKey
data = {
"items": items,
"attr": attributes
}
response = requests.post(
url,
json=data,
headers=headers,
verify=not self.ignore_ssl_errors
)
response.raise_for_status()
return response.json()
# System Functions
def runSyncjob(self, id, force=False):
"""
Run a sync job.
:param id: ID of the sync job to run.
:return: Response from the imapsync script.
"""
creds_path = "/app/sieve.creds"
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor(dictionary=True)
with open(creds_path, 'r') as file:
master_user, master_pass = file.read().strip().split(':')
query = ("SELECT * FROM imapsync WHERE id = %s")
cursor.execute(query, (id,))
success = False
syncjob = cursor.fetchone()
if not syncjob:
cursor.close()
conn.close()
return f"Sync job with ID {id} not found."
if syncjob['active'] == 0 and not force:
cursor.close()
conn.close()
return f"Sync job with ID {id} is not active."
enc1_flag = "--tls1" if syncjob['enc1'] == "TLS" else "--ssl1" if syncjob['enc1'] == "SSL" else None
passfile1_path = f"/tmp/passfile1_{id}.txt"
passfile2_path = f"/tmp/passfile2_{id}.txt"
passfile1_cmd = [
"sh", "-c",
f"echo {syncjob['password1']} > {passfile1_path}"
]
passfile2_cmd = [
"sh", "-c",
f"echo {master_pass} > {passfile2_path}"
]
self.docker.exec_command("dovecot-mailcow", passfile1_cmd)
self.docker.exec_command("dovecot-mailcow", passfile2_cmd)
imapsync_cmd = [
"/usr/local/bin/imapsync",
"--tmpdir", "/tmp",
"--nofoldersizes",
"--addheader"
]
if int(syncjob['timeout1']) > 0:
imapsync_cmd.extend(['--timeout1', str(syncjob['timeout1'])])
if int(syncjob['timeout2']) > 0:
imapsync_cmd.extend(['--timeout2', str(syncjob['timeout2'])])
if syncjob['exclude']:
imapsync_cmd.extend(['--exclude', syncjob['exclude']])
if syncjob['subfolder2']:
imapsync_cmd.extend(['--subfolder2', syncjob['subfolder2']])
if int(syncjob['maxage']) > 0:
imapsync_cmd.extend(['--maxage', str(syncjob['maxage'])])
if int(syncjob['maxbytespersecond']) > 0:
imapsync_cmd.extend(['--maxbytespersecond', str(syncjob['maxbytespersecond'])])
if int(syncjob['delete2duplicates']) == 1:
imapsync_cmd.append("--delete2duplicates")
if int(syncjob['subscribeall']) == 1:
imapsync_cmd.append("--subscribeall")
if int(syncjob['delete1']) == 1:
imapsync_cmd.append("--delete")
if int(syncjob['delete2']) == 1:
imapsync_cmd.append("--delete2")
if int(syncjob['automap']) == 1:
imapsync_cmd.append("--automap")
if int(syncjob['skipcrossduplicates']) == 1:
imapsync_cmd.append("--skipcrossduplicates")
if enc1_flag:
imapsync_cmd.append(enc1_flag)
imapsync_cmd.extend([
"--host1", syncjob['host1'],
"--user1", syncjob['user1'],
"--passfile1", passfile1_path,
"--port1", str(syncjob['port1']),
"--host2", "localhost",
"--user2", f"{syncjob['user2']}*{master_user}",
"--passfile2", passfile2_path
])
if syncjob['dry'] == 1:
imapsync_cmd.append("--dry")
imapsync_cmd.extend([
"--no-modulesversion",
"--noreleasecheck"
])
try:
cursor.execute("UPDATE imapsync SET is_running = 1, success = NULL, exit_status = NULL WHERE id = %s", (id,))
conn.commit()
result = self.docker.exec_command("dovecot-mailcow", imapsync_cmd)
print(result)
success = result['status'] == "success" and result['exit_code'] == 0
cursor.execute(
"UPDATE imapsync SET returned_text = %s, success = %s, exit_status = %s WHERE id = %s",
(result['output'], int(success), result['exit_code'], id)
)
conn.commit()
except Exception as e:
cursor.execute(
"UPDATE imapsync SET returned_text = %s, success = 0 WHERE id = %s",
(str(e), id)
)
conn.commit()
finally:
cursor.execute("UPDATE imapsync SET last_run = NOW(), is_running = 0 WHERE id = %s", (id,))
conn.commit()
delete_passfile1_cmd = [
"sh", "-c",
f"rm -f {passfile1_path}"
]
delete_passfile2_cmd = [
"sh", "-c",
f"rm -f {passfile2_path}"
]
self.docker.exec_command("dovecot-mailcow", delete_passfile1_cmd)
self.docker.exec_command("dovecot-mailcow", delete_passfile2_cmd)
cursor.close()
conn.close()
return "Sync job completed successfully." if success else "Sync job failed."

View File

@@ -0,0 +1,64 @@
import smtplib
import json
import os
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from jinja2 import Environment, BaseLoader
class Mailer:
def __init__(self, smtp_host, smtp_port, username, password, use_tls=True):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.username = username
self.password = password
self.use_tls = use_tls
self.server = None
self.env = Environment(loader=BaseLoader())
def connect(self):
print("Connecting to the SMTP server...")
self.server = smtplib.SMTP(self.smtp_host, self.smtp_port)
if self.use_tls:
self.server.starttls()
print("TLS activated!")
if self.username and self.password:
self.server.login(self.username, self.password)
print("Authenticated!")
def disconnect(self):
if self.server:
try:
if self.server.sock:
self.server.quit()
except smtplib.SMTPServerDisconnected:
pass
finally:
self.server = None
def render_inline_template(self, template_string, context):
template = self.env.from_string(template_string)
return template.render(context)
def send_mail(self, subject, from_addr, to_addrs, template, context = {}):
try:
if template == "":
print("Cannot send email, template is empty!")
return "Failed: Template is empty."
body = self.render_inline_template(template, context)
msg = MIMEMultipart()
msg['From'] = from_addr
msg['To'] = ', '.join(to_addrs) if isinstance(to_addrs, list) else to_addrs
msg['Subject'] = subject
msg.attach(MIMEText(body, 'html'))
self.connect()
self.server.sendmail(from_addr, to_addrs, msg.as_string())
self.disconnect()
return f"Success: Email sent to {msg['To']}"
except Exception as e:
print(f"Error during send_mail: {type(e).__name__}: {e}")
return f"Failed: {type(e).__name__}: {e}"
finally:
self.disconnect()

View File

@@ -0,0 +1,51 @@
from jinja2 import Environment, Template
import csv
def split_at(value, sep, idx):
try:
return value.split(sep)[idx]
except Exception:
return ''
class Reader:
"""
Reader class to handle reading and processing of CSV and JSON files for mailcow.
"""
def __init__(self):
pass
def read_csv(self, file_path, delimiter=',', encoding='iso-8859-1'):
"""
Read a CSV file and return a list of dictionaries.
Each dictionary represents a row in the CSV file.
:param file_path: Path to the CSV file.
:param delimiter: Delimiter used in the CSV file (default: ',').
"""
with open(file_path, mode='r', encoding=encoding) as file:
reader = csv.DictReader(file, delimiter=delimiter)
reader.fieldnames = [h.replace(" ", "_") if h else h for h in reader.fieldnames]
return [row for row in reader]
def map_csv_data(self, data, mapping_file_path, encoding='iso-8859-1'):
"""
Map CSV data to a specific structure based on the provided Jinja2 template file.
:param data: List of dictionaries representing CSV rows.
:param mapping_file_path: Path to the Jinja2 template file.
:return: List of dictionaries with mapped data.
"""
with open(mapping_file_path, 'r', encoding=encoding) as tpl_file:
template_content = tpl_file.read()
env = Environment()
env.filters['split_at'] = split_at
template = env.from_string(template_content)
mapped_data = []
for row in data:
rendered = template.render(**row)
try:
mapped_row = eval(rendered)
except Exception:
mapped_row = rendered
mapped_data.append(mapped_row)
return mapped_data

View File

@@ -0,0 +1,512 @@
import requests
import urllib3
import os
from uuid import uuid4
from collections import defaultdict
class Sogo:
def __init__(self, username, password=""):
self.apiUrl = "/SOGo/so"
self.davUrl = "/SOGo/dav"
self.ignore_ssl_errors = True
self.baseUrl = f"https://{os.getenv('IPv4_NETWORK', '172.22.1')}.247:{os.getenv('HTTPS_PORT', '443')}"
self.host = os.getenv("MAILCOW_HOSTNAME", "")
if self.ignore_ssl_errors:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.username = username
self.password = password
def addCalendar(self, calendar_name):
"""
Add a new calendar to the sogo instance.
:param calendar_name: Name of the calendar to be created
:return: Response from the sogo API.
"""
res = self.post(f"/{self.username}/Calendar/createFolder", {
"name": calendar_name
})
try:
return res.json()
except ValueError:
return res.text
def getCalendarIdByName(self, calendar_name):
"""
Get the calendar ID by its name.
:param calendar_name: Name of the calendar to find
:return: Calendar ID if found, otherwise None.
"""
res = self.get(f"/{self.username}/Calendar/calendarslist")
try:
for calendar in res.json()["calendars"]:
if calendar['name'] == calendar_name:
return calendar['id']
except ValueError:
return None
return None
def getCalendar(self):
"""
Get calendar list.
:return: Response from SOGo API.
"""
res = self.get(f"/{self.username}/Calendar/calendarslist")
try:
return res.json()
except ValueError:
return res.text
def deleteCalendar(self, calendar_id):
"""
Delete a calendar.
:param calendar_id: ID of the calendar to be deleted
:return: Response from SOGo API.
"""
res = self.get(f"/{self.username}/Calendar/{calendar_id}/delete")
return res.status_code == 204
def importCalendar(self, calendar_name, ics_file):
"""
Import a calendar from an ICS file.
:param calendar_name: Name of the calendar to import into
:param ics_file: Path to the ICS file to import
:return: Response from SOGo API.
"""
try:
with open(ics_file, "rb") as f:
pass
except Exception as e:
print(f"Could not open ICS file '{ics_file}': {e}")
return {"status": "error", "message": str(e)}
new_calendar = self.addCalendar(calendar_name)
selected_calendar = new_calendar.json()["id"]
url = f"{self.baseUrl}{self.apiUrl}/{self.username}/Calendar/{selected_calendar}/import"
auth = (self.username, self.password)
with open(ics_file, "rb") as f:
files = {'icsFile': (ics_file, f, 'text/calendar')}
res = requests.post(
url,
files=files,
auth=auth,
verify=not self.ignore_ssl_errors
)
try:
return res.json()
except ValueError:
return res.text
return None
def setCalendarACL(self, calendar_id, sharee_email, acl="r", subscribe=False):
"""
Set CalDAV calendar permissions for a user (sharee).
:param calendar_id: ID of the calendar to share
:param sharee_email: Email of the user to share with
:param acl: "w" for write, "r" for read-only or combination "rw" for read-write
:param subscribe: True will scubscribe the sharee to the calendar
:return: None
"""
# Access rights
if acl == "" or len(acl) > 2:
return "Invalid acl level specified. Use 'w', 'r' or combinations like 'rw'."
rights = [{
"c_email": sharee_email,
"uid": sharee_email,
"userClass": "normal-user",
"rights": {
"Public": "None",
"Private": "None",
"Confidential": "None",
"canCreateObjects": 0,
"canEraseObjects": 0
}
}]
if "w" in acl:
rights[0]["rights"]["canCreateObjects"] = 1
rights[0]["rights"]["canEraseObjects"] = 1
if "r" in acl:
rights[0]["rights"]["Public"] = "Viewer"
rights[0]["rights"]["Private"] = "Viewer"
rights[0]["rights"]["Confidential"] = "Viewer"
r_add = self.get(f"/{self.username}/Calendar/{calendar_id}/addUserInAcls?uid={sharee_email}")
if r_add.status_code < 200 or r_add.status_code > 299:
try:
return r_add.json()
except ValueError:
return r_add.text
r_save = self.post(f"/{self.username}/Calendar/{calendar_id}/saveUserRights", rights)
if r_save.status_code < 200 or r_save.status_code > 299:
try:
return r_save.json()
except ValueError:
return r_save.text
if subscribe:
r_subscribe = self.get(f"/{self.username}/Calendar/{calendar_id}/subscribeUsers?uids={sharee_email}")
if r_subscribe.status_code < 200 or r_subscribe.status_code > 299:
try:
return r_subscribe.json()
except ValueError:
return r_subscribe.text
return r_save.status_code == 200
def getCalendarACL(self, calendar_id):
"""
Get CalDAV calendar permissions for a user (sharee).
:param calendar_id: ID of the calendar to get ACL from
:return: Response from SOGo API.
"""
res = self.get(f"/{self.username}/Calendar/{calendar_id}/acls")
try:
return res.json()
except ValueError:
return res.text
def deleteCalendarACL(self, calendar_id, sharee_email):
"""
Delete a calendar ACL for a user (sharee).
:param calendar_id: ID of the calendar to delete ACL from
:param sharee_email: Email of the user whose ACL to delete
:return: Response from SOGo API.
"""
res = self.get(f"/{self.username}/Calendar/{calendar_id}/removeUserFromAcls?uid={sharee_email}")
return res.status_code == 204
def addAddressbook(self, addressbook_name):
"""
Add a new addressbook to the sogo instance.
:param addressbook_name: Name of the addressbook to be created
:return: Response from the sogo API.
"""
res = self.post(f"/{self.username}/Contacts/createFolder", {
"name": addressbook_name
})
try:
return res.json()
except ValueError:
return res.text
def getAddressbookIdByName(self, addressbook_name):
"""
Get the addressbook ID by its name.
:param addressbook_name: Name of the addressbook to find
:return: Addressbook ID if found, otherwise None.
"""
res = self.get(f"/{self.username}/Contacts/addressbooksList")
try:
for addressbook in res.json()["addressbooks"]:
if addressbook['name'] == addressbook_name:
return addressbook['id']
except ValueError:
return None
return None
def deleteAddressbook(self, addressbook_id):
"""
Delete an addressbook.
:param addressbook_id: ID of the addressbook to be deleted
:return: Response from SOGo API.
"""
res = self.get(f"/{self.username}/Contacts/{addressbook_id}/delete")
return res.status_code == 204
def getAddressbookList(self):
"""
Get addressbook list.
:return: Response from SOGo API.
"""
res = self.get(f"/{self.username}/Contacts/addressbooksList")
try:
return res.json()
except ValueError:
return res.text
def setAddressbookACL(self, addressbook_id, sharee_email, acl="r", subscribe=False):
"""
Set CalDAV addressbook permissions for a user (sharee).
:param addressbook_id: ID of the addressbook to share
:param sharee_email: Email of the user to share with
:param acl: "w" for write, "r" for read-only or combination "rw" for read-write
:param subscribe: True will subscribe the sharee to the addressbook
:return: None
"""
# Access rights
if acl == "" or len(acl) > 2:
print("Invalid acl level specified. Use 's', 'w', 'r' or combinations like 'rws'.")
return "Invalid acl level specified. Use 'w', 'r' or combinations like 'rw'."
rights = [{
"c_email": sharee_email,
"uid": sharee_email,
"userClass": "normal-user",
"rights": {
"canCreateObjects": 0,
"canEditObjects": 0,
"canEraseObjects": 0,
"canViewObjects": 0,
}
}]
if "w" in acl:
rights[0]["rights"]["canCreateObjects"] = 1
rights[0]["rights"]["canEditObjects"] = 1
rights[0]["rights"]["canEraseObjects"] = 1
if "r" in acl:
rights[0]["rights"]["canViewObjects"] = 1
r_add = self.get(f"/{self.username}/Contacts/{addressbook_id}/addUserInAcls?uid={sharee_email}")
if r_add.status_code < 200 or r_add.status_code > 299:
try:
return r_add.json()
except ValueError:
return r_add.text
r_save = self.post(f"/{self.username}/Contacts/{addressbook_id}/saveUserRights", rights)
if r_save.status_code < 200 or r_save.status_code > 299:
try:
return r_save.json()
except ValueError:
return r_save.text
if subscribe:
r_subscribe = self.get(f"/{self.username}/Contacts/{addressbook_id}/subscribeUsers?uids={sharee_email}")
if r_subscribe.status_code < 200 or r_subscribe.status_code > 299:
try:
return r_subscribe.json()
except ValueError:
return r_subscribe.text
return r_save.status_code == 200
def getAddressbookACL(self, addressbook_id):
"""
Get CalDAV addressbook permissions for a user (sharee).
:param addressbook_id: ID of the addressbook to get ACL from
:return: Response from SOGo API.
"""
res = self.get(f"/{self.username}/Contacts/{addressbook_id}/acls")
try:
return res.json()
except ValueError:
return res.text
def deleteAddressbookACL(self, addressbook_id, sharee_email):
"""
Delete an addressbook ACL for a user (sharee).
:param addressbook_id: ID of the addressbook to delete ACL from
:param sharee_email: Email of the user whose ACL to delete
:return: Response from SOGo API.
"""
res = self.get(f"/{self.username}/Contacts/{addressbook_id}/removeUserFromAcls?uid={sharee_email}")
return res.status_code == 204
def getAddressbookNewGuid(self, addressbook_id):
"""
Request a new GUID for a SOGo addressbook.
:param addressbook_id: ID of the addressbook
:return: JSON response from SOGo or None if not found
"""
res = self.get(f"/{self.username}/Contacts/{addressbook_id}/newguid")
try:
return res.json()
except ValueError:
return res.text
def addAddressbookContact(self, addressbook_id, contact_name, contact_email):
"""
Save a vCard as a contact in the specified addressbook.
:param addressbook_id: ID of the addressbook
:param contact_name: Name of the contact
:param contact_email: Email of the contact
:return: JSON response from SOGo or None if not found
"""
vcard_id = self.getAddressbookNewGuid(addressbook_id)
contact_data = {
"id": vcard_id["id"],
"pid": vcard_id["pid"],
"c_cn": contact_name,
"emails": [{
"type": "pref",
"value": contact_email
}],
"isNew": True,
"c_component": "vcard",
}
endpoint = f"/{self.username}/Contacts/{addressbook_id}/{vcard_id['id']}/saveAsContact"
res = self.post(endpoint, contact_data)
try:
return res.json()
except ValueError:
return res.text
def getAddressbookContacts(self, addressbook_id, contact_email=None):
"""
Get all contacts from the specified addressbook.
:param addressbook_id: ID of the addressbook
:return: JSON response with contacts or None if not found
"""
res = self.get(f"/{self.username}/Contacts/{addressbook_id}/view")
try:
res_json = res.json()
headers = res_json.get("headers", [])
if not headers or len(headers) < 2:
return []
field_names = headers[0]
contacts = []
for row in headers[1:]:
contact = dict(zip(field_names, row))
contacts.append(contact)
if contact_email:
contact = {}
for c in contacts:
if c["c_mail"] == contact_email or c["c_cn"] == contact_email:
contact = c
break
return contact
return contacts
except ValueError:
return res.text
def addAddressbookContactList(self, addressbook_id, contact_name, contact_email=None):
"""
Add a new contact list to the addressbook.
:param addressbook_id: ID of the addressbook
:param contact_name: Name of the contact list
:param contact_email: Comma-separated emails to include in the list
:return: Response from SOGo API.
"""
gal_domain = self.username.split("@")[-1]
vlist_id = self.getAddressbookNewGuid(addressbook_id)
contact_emails = contact_email.split(",") if contact_email else []
contacts = self.getAddressbookContacts(addressbook_id)
refs = []
for contact in contacts:
if contact['c_mail'] in contact_emails:
refs.append({
"refs": [],
"categories": [],
"c_screenname": contact.get("c_screenname", ""),
"pid": contact.get("pid", vlist_id["pid"]),
"id": contact.get("id", ""),
"notes": [""],
"empty": " ",
"hasphoto": contact.get("hasphoto", 0),
"c_cn": contact.get("c_cn", ""),
"c_uid": contact.get("c_uid", None),
"containername": contact.get("containername", f"GAL {gal_domain}"), # or your addressbook name
"sourceid": contact.get("sourceid", gal_domain),
"c_component": contact.get("c_component", "vcard"),
"c_sn": contact.get("c_sn", ""),
"c_givenname": contact.get("c_givenname", ""),
"c_name": contact.get("c_name", contact.get("id", "")),
"c_telephonenumber": contact.get("c_telephonenumber", ""),
"fn": contact.get("fn", ""),
"c_mail": contact.get("c_mail", ""),
"emails": contact.get("emails", []),
"c_o": contact.get("c_o", ""),
"reference": contact.get("id", ""),
"birthday": contact.get("birthday", "")
})
contact_data = {
"refs": refs,
"categories": [],
"c_screenname": None,
"pid": vlist_id["pid"],
"c_component": "vlist",
"notes": [""],
"empty": " ",
"isNew": True,
"id": vlist_id["id"],
"c_cn": contact_name,
"birthday": ""
}
endpoint = f"/{self.username}/Contacts/{addressbook_id}/{vlist_id['id']}/saveAsList"
res = self.post(endpoint, contact_data)
try:
return res.json()
except ValueError:
return res.text
def deleteAddressbookItem(self, addressbook_id, contact_name):
"""
Delete an addressbook item by its ID.
:param addressbook_id: ID of the addressbook item to delete
:param contact_name: Name of the contact to delete
:return: Response from SOGo API.
"""
res = self.getAddressbookContacts(addressbook_id, contact_name)
if "id" not in res:
print(f"Contact '{contact_name}' not found in addressbook '{addressbook_id}'.")
return None
res = self.post(f"/{self.username}/Contacts/{addressbook_id}/batchDelete", {
"uids": [res["id"]],
})
return res.status_code == 204
def get(self, endpoint, params=None):
"""
Make a GET request to the mailcow API.
:param endpoint: The API endpoint to get.
:param params: Optional parameters for the GET request.
:return: Response from the mailcow API.
"""
url = f"{self.baseUrl}{self.apiUrl}{endpoint}"
auth = (self.username, self.password)
headers = {"Host": self.host}
response = requests.get(
url,
params=params,
auth=auth,
headers=headers,
verify=not self.ignore_ssl_errors
)
return response
def post(self, endpoint, data):
"""
Make a POST request to the mailcow API.
:param endpoint: The API endpoint to post to.
:param data: Data to be sent in the POST request.
:return: Response from the mailcow API.
"""
url = f"{self.baseUrl}{self.apiUrl}{endpoint}"
auth = (self.username, self.password)
headers = {"Host": self.host}
response = requests.post(
url,
json=data,
auth=auth,
headers=headers,
verify=not self.ignore_ssl_errors
)
return response

View File

@@ -0,0 +1,37 @@
import json
import random
import string
class Utils:
def __init(self):
pass
def normalize_email(self, email):
replacements = {
"ä": "ae", "ö": "oe", "ü": "ue", "ß": "ss",
"Ä": "Ae", "Ö": "Oe", "Ü": "Ue"
}
for orig, repl in replacements.items():
email = email.replace(orig, repl)
return email
def generate_password(self, length=8):
chars = string.ascii_letters + string.digits
return ''.join(random.choices(chars, k=length))
def pprint(self, data=""):
"""
Pretty print a dictionary, list, or text.
If data is a text containing JSON, it will be printed in a formatted way.
"""
if isinstance(data, (dict, list)):
print(json.dumps(data, indent=2, ensure_ascii=False))
elif isinstance(data, str):
try:
json_data = json.loads(data)
print(json.dumps(json_data, indent=2, ensure_ascii=False))
except json.JSONDecodeError:
print(data)
else:
print(data)

View File

@@ -0,0 +1,4 @@
jinja2
requests
mysql-connector-python
pytest

View File

@@ -0,0 +1,94 @@
import pytest
import json
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../")))
from models.DomainModel import DomainModel
from models.AliasModel import AliasModel
def test_model():
# Generate random alias
random_alias = f"alias_test{os.urandom(4).hex()}@mailcow.local"
# Create an instance of AliasModel
model = AliasModel(
address=random_alias,
goto="test@mailcow.local,test2@mailcow.local"
)
# Test the parser_command attribute
assert model.parser_command == "alias", "Parser command should be 'alias'"
# add Domain for testing
domain_model = DomainModel(domain="mailcow.local")
domain_model.add()
# 1. Alias add tests, should success
r_add = model.add()
assert isinstance(r_add, list), f"Expected a array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add) > 0, f"Wrong array received: {json.dumps(r_add, indent=2)}"
assert "type" in r_add[0], f"'type' key missing in response: {json.dumps(r_add, indent=2)}"
assert r_add[0]['type'] == "success", f"Wrong 'type' received: {r_add[0]['type']}\n{json.dumps(r_add, indent=2)}"
assert "msg" in r_add[0], f"'msg' key missing in response: {json.dumps(r_add, indent=2)}"
assert isinstance(r_add[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add[0]['msg']) > 0 and len(r_add[0]['msg']) <= 3, f"Wrong 'msg' array received: {json.dumps(r_add, indent=2)}"
assert r_add[0]['msg'][0] == "alias_added", f"Wrong 'msg' received: {r_add[0]['msg'][0]}, expected: 'alias_added'\n{json.dumps(r_add, indent=2)}"
# Assign created alias ID for further tests
model.id = r_add[0]['msg'][2]
# 2. Alias add tests, should fail because the alias already exists
r_add = model.add()
assert isinstance(r_add, list), f"Expected a array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add) > 0, f"Wrong array received: {json.dumps(r_add, indent=2)}"
assert "type" in r_add[0], f"'type' key missing in response: {json.dumps(r_add, indent=2)}"
assert r_add[0]['type'] == "danger", f"Wrong 'type' received: {r_add[0]['type']}\n{json.dumps(r_add, indent=2)}"
assert "msg" in r_add[0], f"'msg' key missing in response: {json.dumps(r_add, indent=2)}"
assert isinstance(r_add[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add[0]['msg']) > 0 and len(r_add[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_add, indent=2)}"
assert r_add[0]['msg'][0] == "is_alias_or_mailbox", f"Wrong 'msg' received: {r_add[0]['msg'][0]}, expected: 'is_alias_or_mailbox'\n{json.dumps(r_add, indent=2)}"
# 3. Alias get tests
r_get = model.get()
assert isinstance(r_get, dict), f"Expected a dict but received: {json.dumps(r_get, indent=2)}"
assert "domain" in r_get, f"'domain' key missing in response: {json.dumps(r_get, indent=2)}"
assert "goto" in r_get, f"'goto' key missing in response: {json.dumps(r_get, indent=2)}"
assert "address" in r_get, f"'address' key missing in response: {json.dumps(r_get, indent=2)}"
assert r_get['domain'] == model.address.split("@")[1], f"Wrong 'domain' received: {r_get['domain']}, expected: {model.address.split('@')[1]}\n{json.dumps(r_get, indent=2)}"
assert r_get['goto'] == model.goto, f"Wrong 'goto' received: {r_get['goto']}, expected: {model.goto}\n{json.dumps(r_get, indent=2)}"
assert r_get['address'] == model.address, f"Wrong 'address' received: {r_get['address']}, expected: {model.address}\n{json.dumps(r_get, indent=2)}"
# 4. Alias edit tests
model.goto = "test@mailcow.local"
model.active = 0
r_edit = model.edit()
assert isinstance(r_edit, list), f"Expected a array but received: {json.dumps(r_edit, indent=2)}"
assert len(r_edit) > 0, f"Wrong array received: {json.dumps(r_edit, indent=2)}"
assert "type" in r_edit[0], f"'type' key missing in response: {json.dumps(r_edit, indent=2)}"
assert r_edit[0]['type'] == "success", f"Wrong 'type' received: {r_edit[0]['type']}\n{json.dumps(r_edit, indent=2)}"
assert "msg" in r_edit[0], f"'msg' key missing in response: {json.dumps(r_edit, indent=2)}"
assert isinstance(r_edit[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_edit, indent=2)}"
assert len(r_edit[0]['msg']) > 0 and len(r_edit[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_edit, indent=2)}"
assert r_edit[0]['msg'][0] == "alias_modified", f"Wrong 'msg' received: {r_edit[0]['msg'][0]}, expected: 'alias_modified'\n{json.dumps(r_edit, indent=2)}"
# 5. Alias delete tests
r_delete = model.delete()
assert isinstance(r_delete, list), f"Expected a array but received: {json.dumps(r_delete, indent=2)}"
assert len(r_delete) > 0, f"Wrong array received: {json.dumps(r_delete, indent=2)}"
assert "type" in r_delete[0], f"'type' key missing in response: {json.dumps(r_delete, indent=2)}"
assert r_delete[0]['type'] == "success", f"Wrong 'type' received: {r_delete[0]['type']}\n{json.dumps(r_delete, indent=2)}"
assert "msg" in r_delete[0], f"'msg' key missing in response: {json.dumps(r_delete, indent=2)}"
assert isinstance(r_delete[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_delete, indent=2)}"
assert len(r_delete[0]['msg']) > 0 and len(r_delete[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_delete, indent=2)}"
assert r_delete[0]['msg'][0] == "alias_removed", f"Wrong 'msg' received: {r_delete[0]['msg'][0]}, expected: 'alias_removed'\n{json.dumps(r_delete, indent=2)}"
# delete testing Domain
domain_model.delete()
if __name__ == "__main__":
print("Running AliasModel tests...")
test_model()
print("All tests passed!")

View File

@@ -0,0 +1,71 @@
import pytest
from models.BaseModel import BaseModel
class Args:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def test_has_required_args():
BaseModel.required_args = {
"test_object": [["arg1"], ["arg2", "arg3"]],
}
# Test cases with Args object
args = Args(object="non_existent_object")
assert BaseModel.has_required_args(args) == False
args = Args(object="test_object")
assert BaseModel.has_required_args(args) == False
args = Args(object="test_object", arg1="value")
assert BaseModel.has_required_args(args) == True
args = Args(object="test_object", arg2="value")
assert BaseModel.has_required_args(args) == False
args = Args(object="test_object", arg3="value")
assert BaseModel.has_required_args(args) == False
args = Args(object="test_object", arg2="value", arg3="value")
assert BaseModel.has_required_args(args) == True
# Test cases with dict object
args = {"object": "non_existent_object"}
assert BaseModel.has_required_args(args) == False
args = {"object": "test_object"}
assert BaseModel.has_required_args(args) == False
args = {"object": "test_object", "arg1": "value"}
assert BaseModel.has_required_args(args) == True
args = {"object": "test_object", "arg2": "value"}
assert BaseModel.has_required_args(args) == False
args = {"object": "test_object", "arg3": "value"}
assert BaseModel.has_required_args(args) == False
args = {"object": "test_object", "arg2": "value", "arg3": "value"}
assert BaseModel.has_required_args(args) == True
BaseModel.required_args = {
"test_object": [[]],
}
# Test cases with Args object
args = Args(object="non_existent_object")
assert BaseModel.has_required_args(args) == False
args = Args(object="test_object")
assert BaseModel.has_required_args(args) == True
# Test cases with dict object
args = {"object": "non_existent_object"}
assert BaseModel.has_required_args(args) == False
args = {"object": "test_object"}
assert BaseModel.has_required_args(args) == True

View File

@@ -0,0 +1,74 @@
import pytest
import json
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../")))
from models.DomainModel import DomainModel
def test_model():
# Create an instance of DomainModel
model = DomainModel(
domain="mailcow.local",
)
# Test the parser_command attribute
assert model.parser_command == "domain", "Parser command should be 'domain'"
# 1. Domain add tests, should success
r_add = model.add()
assert isinstance(r_add, list), f"Expected a array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add) > 0 and len(r_add) >= 2, f"Wrong array received: {json.dumps(r_add, indent=2)}"
assert "type" in r_add[1], f"'type' key missing in response: {json.dumps(r_add, indent=2)}"
assert r_add[1]['type'] == "success", f"Wrong 'type' received: {r_add[1]['type']}\n{json.dumps(r_add, indent=2)}"
assert "msg" in r_add[1], f"'msg' key missing in response: {json.dumps(r_add, indent=2)}"
assert isinstance(r_add[1]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add[1]['msg']) > 0 and len(r_add[1]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_add, indent=2)}"
assert r_add[1]['msg'][0] == "domain_added", f"Wrong 'msg' received: {r_add[1]['msg'][0]}, expected: 'domain_added'\n{json.dumps(r_add, indent=2)}"
# 2. Domain add tests, should fail because the domain already exists
r_add = model.add()
assert isinstance(r_add, list), f"Expected a array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add) > 0, f"Wrong array received: {json.dumps(r_add, indent=2)}"
assert "type" in r_add[0], f"'type' key missing in response: {json.dumps(r_add, indent=2)}"
assert r_add[0]['type'] == "danger", f"Wrong 'type' received: {r_add[0]['type']}\n{json.dumps(r_add, indent=2)}"
assert "msg" in r_add[0], f"'msg' key missing in response: {json.dumps(r_add, indent=2)}"
assert isinstance(r_add[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add[0]['msg']) > 0 and len(r_add[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_add, indent=2)}"
assert r_add[0]['msg'][0] == "domain_exists", f"Wrong 'msg' received: {r_add[0]['msg'][0]}, expected: 'domain_exists'\n{json.dumps(r_add, indent=2)}"
# 3. Domain get tests
r_get = model.get()
assert isinstance(r_get, dict), f"Expected a dict but received: {json.dumps(r_get, indent=2)}"
assert "domain_name" in r_get, f"'domain_name' key missing in response: {json.dumps(r_get, indent=2)}"
assert r_get['domain_name'] == model.domain, f"Wrong 'domain_name' received: {r_get['domain_name']}, expected: {model.domain}\n{json.dumps(r_get, indent=2)}"
# 4. Domain edit tests
model.active = 0
r_edit = model.edit()
assert isinstance(r_edit, list), f"Expected a array but received: {json.dumps(r_edit, indent=2)}"
assert len(r_edit) > 0, f"Wrong array received: {json.dumps(r_edit, indent=2)}"
assert "type" in r_edit[0], f"'type' key missing in response: {json.dumps(r_edit, indent=2)}"
assert r_edit[0]['type'] == "success", f"Wrong 'type' received: {r_edit[0]['type']}\n{json.dumps(r_edit, indent=2)}"
assert "msg" in r_edit[0], f"'msg' key missing in response: {json.dumps(r_edit, indent=2)}"
assert isinstance(r_edit[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_edit, indent=2)}"
assert len(r_edit[0]['msg']) > 0 and len(r_edit[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_edit, indent=2)}"
assert r_edit[0]['msg'][0] == "domain_modified", f"Wrong 'msg' received: {r_edit[0]['msg'][0]}, expected: 'domain_modified'\n{json.dumps(r_edit, indent=2)}"
# 5. Domain delete tests
r_delete = model.delete()
assert isinstance(r_delete, list), f"Expected a array but received: {json.dumps(r_delete, indent=2)}"
assert len(r_delete) > 0, f"Wrong array received: {json.dumps(r_delete, indent=2)}"
assert "type" in r_delete[0], f"'type' key missing in response: {json.dumps(r_delete, indent=2)}"
assert r_delete[0]['type'] == "success", f"Wrong 'type' received: {r_delete[0]['type']}\n{json.dumps(r_delete, indent=2)}"
assert "msg" in r_delete[0], f"'msg' key missing in response: {json.dumps(r_delete, indent=2)}"
assert isinstance(r_delete[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_delete, indent=2)}"
assert len(r_delete[0]['msg']) > 0 and len(r_delete[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_delete, indent=2)}"
assert r_delete[0]['msg'][0] == "domain_removed", f"Wrong 'msg' received: {r_delete[0]['msg'][0]}, expected: 'domain_removed'\n{json.dumps(r_delete, indent=2)}"
if __name__ == "__main__":
print("Running DomainModel tests...")
test_model()
print("All tests passed!")

View File

@@ -0,0 +1,89 @@
import pytest
import json
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../")))
from models.DomainModel import DomainModel
from models.DomainadminModel import DomainadminModel
def test_model():
# Generate random domainadmin
random_username = f"dadmin_test{os.urandom(4).hex()}"
random_password = f"{os.urandom(4).hex()}"
# Create an instance of DomainadminModel
model = DomainadminModel(
username=random_username,
password=random_password,
domains="mailcow.local",
)
# Test the parser_command attribute
assert model.parser_command == "domainadmin", "Parser command should be 'domainadmin'"
# add Domain for testing
domain_model = DomainModel(domain="mailcow.local")
domain_model.add()
# 1. Domainadmin add tests, should success
r_add = model.add()
assert isinstance(r_add, list), f"Expected a array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add) > 0, f"Wrong array received: {json.dumps(r_add, indent=2)}"
assert "type" in r_add[0], f"'type' key missing in response: {json.dumps(r_add, indent=2)}"
assert r_add[0]['type'] == "success", f"Wrong 'type' received: {r_add[0]['type']}\n{json.dumps(r_add, indent=2)}"
assert "msg" in r_add[0], f"'msg' key missing in response: {json.dumps(r_add, indent=2)}"
assert isinstance(r_add[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add[0]['msg']) > 0 and len(r_add[0]['msg']) <= 3, f"Wrong 'msg' array received: {json.dumps(r_add, indent=2)}"
assert r_add[0]['msg'][0] == "domain_admin_added", f"Wrong 'msg' received: {r_add[0]['msg'][0]}, expected: 'domain_admin_added'\n{json.dumps(r_add, indent=2)}"
# 2. Domainadmin add tests, should fail because the domainadmin already exists
r_add = model.add()
assert isinstance(r_add, list), f"Expected a array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add) > 0, f"Wrong array received: {json.dumps(r_add, indent=2)}"
assert "type" in r_add[0], f"'type' key missing in response: {json.dumps(r_add, indent=2)}"
assert r_add[0]['type'] == "danger", f"Wrong 'type' received: {r_add[0]['type']}\n{json.dumps(r_add, indent=2)}"
assert "msg" in r_add[0], f"'msg' key missing in response: {json.dumps(r_add, indent=2)}"
assert isinstance(r_add[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add[0]['msg']) > 0 and len(r_add[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_add, indent=2)}"
assert r_add[0]['msg'][0] == "object_exists", f"Wrong 'msg' received: {r_add[0]['msg'][0]}, expected: 'object_exists'\n{json.dumps(r_add, indent=2)}"
# 3. Domainadmin get tests
r_get = model.get()
assert isinstance(r_get, dict), f"Expected a dict but received: {json.dumps(r_get, indent=2)}"
assert "selected_domains" in r_get, f"'selected_domains' key missing in response: {json.dumps(r_get, indent=2)}"
assert "username" in r_get, f"'username' key missing in response: {json.dumps(r_get, indent=2)}"
assert set(model.domains.replace(" ", "").split(",")) == set(r_get['selected_domains']), f"Wrong 'selected_domains' received: {r_get['selected_domains']}, expected: {model.domains}\n{json.dumps(r_get, indent=2)}"
assert r_get['username'] == model.username, f"Wrong 'username' received: {r_get['username']}, expected: {model.username}\n{json.dumps(r_get, indent=2)}"
# 4. Domainadmin edit tests
model.active = 0
r_edit = model.edit()
assert isinstance(r_edit, list), f"Expected a array but received: {json.dumps(r_edit, indent=2)}"
assert len(r_edit) > 0, f"Wrong array received: {json.dumps(r_edit, indent=2)}"
assert "type" in r_edit[0], f"'type' key missing in response: {json.dumps(r_edit, indent=2)}"
assert r_edit[0]['type'] == "success", f"Wrong 'type' received: {r_edit[0]['type']}\n{json.dumps(r_edit, indent=2)}"
assert "msg" in r_edit[0], f"'msg' key missing in response: {json.dumps(r_edit, indent=2)}"
assert isinstance(r_edit[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_edit, indent=2)}"
assert len(r_edit[0]['msg']) > 0 and len(r_edit[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_edit, indent=2)}"
assert r_edit[0]['msg'][0] == "domain_admin_modified", f"Wrong 'msg' received: {r_edit[0]['msg'][0]}, expected: 'domain_admin_modified'\n{json.dumps(r_edit, indent=2)}"
# 5. Domainadmin delete tests
r_delete = model.delete()
assert isinstance(r_delete, list), f"Expected a array but received: {json.dumps(r_delete, indent=2)}"
assert len(r_delete) > 0, f"Wrong array received: {json.dumps(r_delete, indent=2)}"
assert "type" in r_delete[0], f"'type' key missing in response: {json.dumps(r_delete, indent=2)}"
assert r_delete[0]['type'] == "success", f"Wrong 'type' received: {r_delete[0]['type']}\n{json.dumps(r_delete, indent=2)}"
assert "msg" in r_delete[0], f"'msg' key missing in response: {json.dumps(r_delete, indent=2)}"
assert isinstance(r_delete[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_delete, indent=2)}"
assert len(r_delete[0]['msg']) > 0 and len(r_delete[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_delete, indent=2)}"
assert r_delete[0]['msg'][0] == "domain_admin_removed", f"Wrong 'msg' received: {r_delete[0]['msg'][0]}, expected: 'domain_admin_removed'\n{json.dumps(r_delete, indent=2)}"
# delete testing Domain
domain_model.delete()
if __name__ == "__main__":
print("Running DomainadminModel tests...")
test_model()
print("All tests passed!")

View File

@@ -0,0 +1,89 @@
import pytest
import json
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../")))
from models.DomainModel import DomainModel
from models.MailboxModel import MailboxModel
def test_model():
# Generate random mailbox
random_username = f"mbox_test{os.urandom(4).hex()}@mailcow.local"
random_password = f"{os.urandom(4).hex()}"
# Create an instance of MailboxModel
model = MailboxModel(
username=random_username,
password=random_password
)
# Test the parser_command attribute
assert model.parser_command == "mailbox", "Parser command should be 'mailbox'"
# add Domain for testing
domain_model = DomainModel(domain="mailcow.local")
domain_model.add()
# 1. Mailbox add tests, should success
r_add = model.add()
assert isinstance(r_add, list), f"Expected a array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add) > 0 and len(r_add) <= 2, f"Wrong array received: {json.dumps(r_add, indent=2)}"
assert "type" in r_add[1], f"'type' key missing in response: {json.dumps(r_add, indent=2)}"
assert r_add[1]['type'] == "success", f"Wrong 'type' received: {r_add[1]['type']}\n{json.dumps(r_add, indent=2)}"
assert "msg" in r_add[1], f"'msg' key missing in response: {json.dumps(r_add, indent=2)}"
assert isinstance(r_add[1]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add[1]['msg']) > 0 and len(r_add[1]['msg']) <= 3, f"Wrong 'msg' array received: {json.dumps(r_add, indent=2)}"
assert r_add[1]['msg'][0] == "mailbox_added", f"Wrong 'msg' received: {r_add[1]['msg'][0]}, expected: 'mailbox_added'\n{json.dumps(r_add, indent=2)}"
# 2. Mailbox add tests, should fail because the mailbox already exists
r_add = model.add()
assert isinstance(r_add, list), f"Expected a array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add) > 0, f"Wrong array received: {json.dumps(r_add, indent=2)}"
assert "type" in r_add[0], f"'type' key missing in response: {json.dumps(r_add, indent=2)}"
assert r_add[0]['type'] == "danger", f"Wrong 'type' received: {r_add[0]['type']}\n{json.dumps(r_add, indent=2)}"
assert "msg" in r_add[0], f"'msg' key missing in response: {json.dumps(r_add, indent=2)}"
assert isinstance(r_add[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add[0]['msg']) > 0 and len(r_add[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_add, indent=2)}"
assert r_add[0]['msg'][0] == "object_exists", f"Wrong 'msg' received: {r_add[0]['msg'][0]}, expected: 'object_exists'\n{json.dumps(r_add, indent=2)}"
# 3. Mailbox get tests
r_get = model.get()
assert isinstance(r_get, dict), f"Expected a dict but received: {json.dumps(r_get, indent=2)}"
assert "domain" in r_get, f"'domain' key missing in response: {json.dumps(r_get, indent=2)}"
assert "local_part" in r_get, f"'local_part' key missing in response: {json.dumps(r_get, indent=2)}"
assert r_get['domain'] == model.domain, f"Wrong 'domain' received: {r_get['domain']}, expected: {model.domain}\n{json.dumps(r_get, indent=2)}"
assert r_get['local_part'] == model.local_part, f"Wrong 'local_part' received: {r_get['local_part']}, expected: {model.local_part}\n{json.dumps(r_get, indent=2)}"
# 4. Mailbox edit tests
model.active = 0
r_edit = model.edit()
assert isinstance(r_edit, list), f"Expected a array but received: {json.dumps(r_edit, indent=2)}"
assert len(r_edit) > 0, f"Wrong array received: {json.dumps(r_edit, indent=2)}"
assert "type" in r_edit[0], f"'type' key missing in response: {json.dumps(r_edit, indent=2)}"
assert r_edit[0]['type'] == "success", f"Wrong 'type' received: {r_edit[0]['type']}\n{json.dumps(r_edit, indent=2)}"
assert "msg" in r_edit[0], f"'msg' key missing in response: {json.dumps(r_edit, indent=2)}"
assert isinstance(r_edit[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_edit, indent=2)}"
assert len(r_edit[0]['msg']) > 0 and len(r_edit[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_edit, indent=2)}"
assert r_edit[0]['msg'][0] == "mailbox_modified", f"Wrong 'msg' received: {r_edit[0]['msg'][0]}, expected: 'mailbox_modified'\n{json.dumps(r_edit, indent=2)}"
# 5. Mailbox delete tests
r_delete = model.delete()
assert isinstance(r_delete, list), f"Expected a array but received: {json.dumps(r_delete, indent=2)}"
assert len(r_delete) > 0, f"Wrong array received: {json.dumps(r_delete, indent=2)}"
assert "type" in r_delete[0], f"'type' key missing in response: {json.dumps(r_delete, indent=2)}"
assert r_delete[0]['type'] == "success", f"Wrong 'type' received: {r_delete[0]['type']}\n{json.dumps(r_delete, indent=2)}"
assert "msg" in r_delete[0], f"'msg' key missing in response: {json.dumps(r_delete, indent=2)}"
assert isinstance(r_delete[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_delete, indent=2)}"
assert len(r_delete[0]['msg']) > 0 and len(r_delete[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_delete, indent=2)}"
assert r_delete[0]['msg'][0] == "mailbox_removed", f"Wrong 'msg' received: {r_delete[0]['msg'][0]}, expected: 'mailbox_removed'\n{json.dumps(r_delete, indent=2)}"
# delete testing Domain
domain_model.delete()
if __name__ == "__main__":
print("Running MailboxModel tests...")
test_model()
print("All tests passed!")

View File

@@ -0,0 +1,39 @@
import pytest
import json
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../")))
from models.StatusModel import StatusModel
def test_model():
# Create an instance of StatusModel
model = StatusModel()
# Test the parser_command attribute
assert model.parser_command == "status", "Parser command should be 'status'"
# 1. Status version tests
r_version = model.version()
assert isinstance(r_version, dict), f"Expected a dict but received: {json.dumps(r_version, indent=2)}"
assert "version" in r_version, f"'version' key missing in response: {json.dumps(r_version, indent=2)}"
# 2. Status vmail tests
r_vmail = model.vmail()
assert isinstance(r_vmail, dict), f"Expected a dict but received: {json.dumps(r_vmail, indent=2)}"
assert "type" in r_vmail, f"'type' key missing in response: {json.dumps(r_vmail, indent=2)}"
assert "disk" in r_vmail, f"'disk' key missing in response: {json.dumps(r_vmail, indent=2)}"
assert "used" in r_vmail, f"'used' key missing in response: {json.dumps(r_vmail, indent=2)}"
assert "total" in r_vmail, f"'total' key missing in response: {json.dumps(r_vmail, indent=2)}"
assert "used_percent" in r_vmail, f"'used_percent' key missing in response: {json.dumps(r_vmail, indent=2)}"
# 3. Status containers tests
r_containers = model.containers()
assert isinstance(r_containers, dict), f"Expected a dict but received: {json.dumps(r_containers, indent=2)}"
if __name__ == "__main__":
print("Running StatusModel tests...")
test_model()
print("All tests passed!")

View File

@@ -0,0 +1,106 @@
import pytest
import json
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../")))
from models.DomainModel import DomainModel
from models.MailboxModel import MailboxModel
from models.SyncjobModel import SyncjobModel
def test_model():
# Generate random Mailbox
random_username = f"mbox_test@mailcow.local"
random_password = f"{os.urandom(4).hex()}"
# Create an instance of SyncjobModel
model = SyncjobModel(
username=random_username,
host1="mailcow.local",
port1=993,
user1="testuser@mailcow.local",
password1="testpassword",
enc1="SSL",
)
# Test the parser_command attribute
assert model.parser_command == "syncjob", "Parser command should be 'syncjob'"
# add Domain and Mailbox for testing
domain_model = DomainModel(domain="mailcow.local")
domain_model.add()
mbox_model = MailboxModel(username=random_username, password=random_password)
mbox_model.add()
# 1. Syncjob add tests, should success
r_add = model.add()
assert isinstance(r_add, list), f"Expected a array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add) > 0 and len(r_add) <= 2, f"Wrong array received: {json.dumps(r_add, indent=2)}"
assert "type" in r_add[0], f"'type' key missing in response: {json.dumps(r_add, indent=2)}"
assert r_add[0]['type'] == "success", f"Wrong 'type' received: {r_add[0]['type']}\n{json.dumps(r_add, indent=2)}"
assert "msg" in r_add[0], f"'msg' key missing in response: {json.dumps(r_add, indent=2)}"
assert isinstance(r_add[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add[0]['msg']) > 0 and len(r_add[0]['msg']) <= 3, f"Wrong 'msg' array received: {json.dumps(r_add, indent=2)}"
assert r_add[0]['msg'][0] == "mailbox_modified", f"Wrong 'msg' received: {r_add[0]['msg'][0]}, expected: 'mailbox_modified'\n{json.dumps(r_add, indent=2)}"
# Assign created syncjob ID for further tests
model.id = r_add[0]['msg'][2]
# 2. Syncjob add tests, should fail because the syncjob already exists
r_add = model.add()
assert isinstance(r_add, list), f"Expected a array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add) > 0, f"Wrong array received: {json.dumps(r_add, indent=2)}"
assert "type" in r_add[0], f"'type' key missing in response: {json.dumps(r_add, indent=2)}"
assert r_add[0]['type'] == "danger", f"Wrong 'type' received: {r_add[0]['type']}\n{json.dumps(r_add, indent=2)}"
assert "msg" in r_add[0], f"'msg' key missing in response: {json.dumps(r_add, indent=2)}"
assert isinstance(r_add[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_add, indent=2)}"
assert len(r_add[0]['msg']) > 0 and len(r_add[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_add, indent=2)}"
assert r_add[0]['msg'][0] == "object_exists", f"Wrong 'msg' received: {r_add[0]['msg'][0]}, expected: 'object_exists'\n{json.dumps(r_add, indent=2)}"
# 3. Syncjob get tests
r_get = model.get()
assert isinstance(r_get, list), f"Expected a list but received: {json.dumps(r_get, indent=2)}"
assert "user2" in r_get[0], f"'user2' key missing in response: {json.dumps(r_get, indent=2)}"
assert "host1" in r_get[0], f"'host1' key missing in response: {json.dumps(r_get, indent=2)}"
assert "port1" in r_get[0], f"'port1' key missing in response: {json.dumps(r_get, indent=2)}"
assert "user1" in r_get[0], f"'user1' key missing in response: {json.dumps(r_get, indent=2)}"
assert "enc1" in r_get[0], f"'enc1' key missing in response: {json.dumps(r_get, indent=2)}"
assert r_get[0]['user2'] == model.username, f"Wrong 'user2' received: {r_get[0]['user2']}, expected: {model.username}\n{json.dumps(r_get, indent=2)}"
assert r_get[0]['host1'] == model.host1, f"Wrong 'host1' received: {r_get[0]['host1']}, expected: {model.host1}\n{json.dumps(r_get, indent=2)}"
assert r_get[0]['port1'] == model.port1, f"Wrong 'port1' received: {r_get[0]['port1']}, expected: {model.port1}\n{json.dumps(r_get, indent=2)}"
assert r_get[0]['user1'] == model.user1, f"Wrong 'user1' received: {r_get[0]['user1']}, expected: {model.user1}\n{json.dumps(r_get, indent=2)}"
assert r_get[0]['enc1'] == model.enc1, f"Wrong 'enc1' received: {r_get[0]['enc1']}, expected: {model.enc1}\n{json.dumps(r_get, indent=2)}"
# 4. Syncjob edit tests
model.active = 1
r_edit = model.edit()
assert isinstance(r_edit, list), f"Expected a array but received: {json.dumps(r_edit, indent=2)}"
assert len(r_edit) > 0, f"Wrong array received: {json.dumps(r_edit, indent=2)}"
assert "type" in r_edit[0], f"'type' key missing in response: {json.dumps(r_edit, indent=2)}"
assert r_edit[0]['type'] == "success", f"Wrong 'type' received: {r_edit[0]['type']}\n{json.dumps(r_edit, indent=2)}"
assert "msg" in r_edit[0], f"'msg' key missing in response: {json.dumps(r_edit, indent=2)}"
assert isinstance(r_edit[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_edit, indent=2)}"
assert len(r_edit[0]['msg']) > 0 and len(r_edit[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_edit, indent=2)}"
assert r_edit[0]['msg'][0] == "mailbox_modified", f"Wrong 'msg' received: {r_edit[0]['msg'][0]}, expected: 'mailbox_modified'\n{json.dumps(r_edit, indent=2)}"
# 5. Syncjob delete tests
r_delete = model.delete()
assert isinstance(r_delete, list), f"Expected a array but received: {json.dumps(r_delete, indent=2)}"
assert len(r_delete) > 0, f"Wrong array received: {json.dumps(r_delete, indent=2)}"
assert "type" in r_delete[0], f"'type' key missing in response: {json.dumps(r_delete, indent=2)}"
assert r_delete[0]['type'] == "success", f"Wrong 'type' received: {r_delete[0]['type']}\n{json.dumps(r_delete, indent=2)}"
assert "msg" in r_delete[0], f"'msg' key missing in response: {json.dumps(r_delete, indent=2)}"
assert isinstance(r_delete[0]['msg'], list), f"Expected a 'msg' array but received: {json.dumps(r_delete, indent=2)}"
assert len(r_delete[0]['msg']) > 0 and len(r_delete[0]['msg']) <= 2, f"Wrong 'msg' array received: {json.dumps(r_delete, indent=2)}"
assert r_delete[0]['msg'][0] == "deleted_syncjob", f"Wrong 'msg' received: {r_delete[0]['msg'][0]}, expected: 'deleted_syncjob'\n{json.dumps(r_delete, indent=2)}"
# delete testing Domain and Mailbox
mbox_model.delete()
domain_model.delete()
if __name__ == "__main__":
print("Running SyncjobModel tests...")
test_model()
print("All tests passed!")

View File

@@ -0,0 +1,8 @@
#!/bin/bash
printf "READY\n";
while read line; do
echo "Processing Event: $line" >&2;
kill -3 $(cat "/var/run/supervisord.pid")
done < /dev/stdin

View File

@@ -0,0 +1,17 @@
[supervisord]
nodaemon=true
user=root
pidfile=/var/run/supervisord.pid
[program:api]
command=python /app/api/main.py
autostart=true
autorestart=true
stdout_logfile=/dev/stdout
stderr_logfile=/dev/stderr
stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0
[eventlistener:processes]
command=/usr/local/sbin/stop-supervisor.sh
events=PROCESS_STATE_STOPPED, PROCESS_STATE_EXITED, PROCESS_STATE_FATAL

View File

@@ -1,9 +0,0 @@
#!/bin/bash
`openssl req -x509 -newkey rsa:4096 -sha256 -days 3650 -nodes \
-keyout /app/dockerapi_key.pem \
-out /app/dockerapi_cert.pem \
-subj /CN=dockerapi/O=mailcow \
-addext subjectAltName=DNS:dockerapi`
exec "$@"

View File

@@ -25,11 +25,11 @@ sed -i -e 's/\([^\\]\)\$\([^\/]\)/\1\\$\2/g' /etc/rspamd/custom/sa-rules
if [[ "$(cat /etc/rspamd/custom/sa-rules | md5sum | cut -d' ' -f1)" != "${HASH_SA_RULES}" ]]; then
CONTAINER_NAME=rspamd-mailcow
CONTAINER_ID=$(curl --silent --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | \
CONTAINER_ID=$(curl --silent --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | \
jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" | \
jq -rc "select( .name | tostring | contains(\"${CONTAINER_NAME}\")) | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id")
if [[ ! -z ${CONTAINER_ID} ]]; then
curl --silent --insecure -XPOST --connect-timeout 15 --max-time 120 https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${CONTAINER_ID}/restart
curl --silent --insecure -XPOST --connect-timeout 15 --max-time 120 https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${CONTAINER_ID}/restart
fi
fi

View File

@@ -32,7 +32,7 @@ session.save_path = "tcp://'${REDIS_HOST}':'${REDIS_PORT}'?auth='${REDISPASS}'"
# Check mysql_upgrade (master and slave)
CONTAINER_ID=
until [[ ! -z "${CONTAINER_ID}" ]] && [[ "${CONTAINER_ID}" =~ ^[[:alnum:]]*$ ]]; do
CONTAINER_ID=$(curl --silent --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" 2> /dev/null | jq -rc "select( .name | tostring | contains(\"mysql-mailcow\")) | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id" 2> /dev/null)
CONTAINER_ID=$(curl --silent --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" 2> /dev/null | jq -rc "select( .name | tostring | contains(\"mysql-mailcow\")) | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id" 2> /dev/null)
echo "Could not get mysql-mailcow container id... trying again"
sleep 2
done
@@ -44,7 +44,7 @@ until [[ ${SQL_UPGRADE_STATUS} == 'success' ]]; do
echo "Tried to upgrade MySQL and failed, giving up after ${SQL_LOOP_C} retries and starting container (oops, not good)"
break
fi
SQL_FULL_UPGRADE_RETURN=$(curl --silent --insecure -XPOST https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${CONTAINER_ID}/exec -d '{"cmd":"system", "task":"mysql_upgrade"}' --silent -H 'Content-type: application/json')
SQL_FULL_UPGRADE_RETURN=$(curl --silent --insecure -XPOST https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${CONTAINER_ID}/exec -d '{"cmd":"system", "task":"mysql_upgrade"}' --silent -H 'Content-type: application/json')
SQL_UPGRADE_STATUS=$(echo ${SQL_FULL_UPGRADE_RETURN} | jq -r .type)
SQL_LOOP_C=$((SQL_LOOP_C+1))
echo "SQL upgrade iteration #${SQL_LOOP_C}"
@@ -69,12 +69,12 @@ done
# doing post-installation stuff, if SQL was upgraded (master and slave)
if [ ${SQL_CHANGED} -eq 1 ]; then
POSTFIX=$(curl --silent --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" 2> /dev/null | jq -rc "select( .name | tostring | contains(\"postfix-mailcow\")) | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id" 2> /dev/null)
POSTFIX=$(curl --silent --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" 2> /dev/null | jq -rc "select( .name | tostring | contains(\"postfix-mailcow\")) | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id" 2> /dev/null)
if [[ -z "${POSTFIX}" ]] || ! [[ "${POSTFIX}" =~ ^[[:alnum:]]*$ ]]; then
echo "Could not determine Postfix container ID, skipping Postfix restart."
else
echo "Restarting Postfix"
curl -X POST --silent --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${POSTFIX}/restart | jq -r '.msg'
curl -X POST --silent --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${POSTFIX}/restart | jq -r '.msg'
echo "Sleeping 5 seconds..."
sleep 5
fi
@@ -83,7 +83,7 @@ fi
# Check mysql tz import (master and slave)
TZ_CHECK=$(mariadb --skip-ssl --socket=/var/run/mysqld/mysqld.sock -u ${DBUSER} -p${DBPASS} ${DBNAME} -e "SELECT CONVERT_TZ('2019-11-02 23:33:00','Europe/Berlin','UTC') AS time;" -BN 2> /dev/null)
if [[ -z ${TZ_CHECK} ]] || [[ "${TZ_CHECK}" == "NULL" ]]; then
SQL_FULL_TZINFO_IMPORT_RETURN=$(curl --silent --insecure -XPOST https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${CONTAINER_ID}/exec -d '{"cmd":"system", "task":"mysql_tzinfo_to_sql"}' --silent -H 'Content-type: application/json')
SQL_FULL_TZINFO_IMPORT_RETURN=$(curl --silent --insecure -XPOST https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${CONTAINER_ID}/exec -d '{"cmd":"system", "task":"mysql_tzinfo_to_sql"}' --silent -H 'Content-type: application/json')
echo "MySQL mysql_tzinfo_to_sql - debug output:"
echo ${SQL_FULL_TZINFO_IMPORT_RETURN}
fi

View File

@@ -195,12 +195,12 @@ get_container_ip() {
else
sleep 0.5
# get long container id for exact match
CONTAINER_ID=($(curl --silent --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" | jq -rc "select( .name | tostring == \"${1}\") | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id"))
CONTAINER_ID=($(curl --silent --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" | jq -rc "select( .name | tostring == \"${1}\") | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id"))
# returned id can have multiple elements (if scaled), shuffle for random test
CONTAINER_ID=($(printf "%s\n" "${CONTAINER_ID[@]}" | shuf))
if [[ ! -z ${CONTAINER_ID} ]]; then
for matched_container in "${CONTAINER_ID[@]}"; do
CONTAINER_IPS=($(curl --silent --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${matched_container}/json | jq -r '.NetworkSettings.Networks[].IPAddress'))
CONTAINER_IPS=($(curl --silent --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${matched_container}/json | jq -r '.NetworkSettings.Networks[].IPAddress'))
for ip_match in "${CONTAINER_IPS[@]}"; do
# grep will do nothing if one of these vars is empty
[[ -z ${ip_match} ]] && continue
@@ -1033,15 +1033,15 @@ while true; do
done
) &
# Monitor dockerapi
# Monitor controller
(
while true; do
while nc -z dockerapi 443; do
while nc -z controller 443; do
sleep 3
done
log_msg "Cannot find dockerapi-mailcow, waiting to recover..."
log_msg "Cannot find controller-mailcow, waiting to recover..."
kill -STOP ${BACKGROUND_TASKS[*]}
until nc -z dockerapi 443; do
until nc -z controller 443; do
sleep 3
done
kill -CONT ${BACKGROUND_TASKS[*]}
@@ -1101,12 +1101,12 @@ while true; do
elif [[ ${com_pipe_answer} =~ .+-mailcow ]]; then
kill -STOP ${BACKGROUND_TASKS[*]}
sleep 10
CONTAINER_ID=$(curl --silent --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" | jq -rc "select( .name | tostring | contains(\"${com_pipe_answer}\")) | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id")
CONTAINER_ID=$(curl --silent --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/json | jq -r ".[] | {name: .Config.Labels[\"com.docker.compose.service\"], project: .Config.Labels[\"com.docker.compose.project\"], id: .Id}" | jq -rc "select( .name | tostring | contains(\"${com_pipe_answer}\")) | select( .project | tostring | contains(\"${COMPOSE_PROJECT_NAME,,}\")) | .id")
if [[ ! -z ${CONTAINER_ID} ]]; then
if [[ "${com_pipe_answer}" == "php-fpm-mailcow" ]]; then
HAS_INITDB=$(curl --silent --insecure -XPOST https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${CONTAINER_ID}/top | jq '.msg.Processes[] | contains(["php -c /usr/local/etc/php -f /web/inc/init_db.inc.php"])' | grep true)
HAS_INITDB=$(curl --silent --insecure -XPOST https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${CONTAINER_ID}/top | jq '.msg.Processes[] | contains(["php -c /usr/local/etc/php -f /web/inc/init_db.inc.php"])' | grep true)
fi
S_RUNNING=$(($(date +%s) - $(curl --silent --insecure https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${CONTAINER_ID}/json | jq .State.StartedAt | xargs -n1 date +%s -d)))
S_RUNNING=$(($(date +%s) - $(curl --silent --insecure https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${CONTAINER_ID}/json | jq .State.StartedAt | xargs -n1 date +%s -d)))
if [ ${S_RUNNING} -lt 360 ]; then
log_msg "Container is running for less than 360 seconds, skipping action..."
elif [[ ! -z ${HAS_INITDB} ]]; then
@@ -1114,7 +1114,7 @@ while true; do
sleep 60
else
log_msg "Sending restart command to ${CONTAINER_ID}..."
curl --silent --insecure -XPOST https://dockerapi.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${CONTAINER_ID}/restart
curl --silent --insecure -XPOST https://controller.${COMPOSE_PROJECT_NAME}_mailcow-network/containers/${CONTAINER_ID}/restart
notify_error "${com_pipe_answer}"
log_msg "Wait for restarted container to settle and continue watching..."
sleep 35

View File

@@ -178,6 +178,7 @@ location ^~ /Microsoft-Server-ActiveSync {
auth_request_set $user $upstream_http_x_user;
auth_request_set $auth $upstream_http_x_auth;
auth_request_set $auth_type $upstream_http_x_auth_type;
auth_request_set $real_ip $remote_addr;
proxy_set_header x-webobjects-remote-user "$user";
proxy_set_header Authorization "$auth";
proxy_set_header x-webobjects-auth-type "$auth_type";
@@ -203,6 +204,7 @@ location ^~ /SOGo {
auth_request_set $user $upstream_http_x_user;
auth_request_set $auth $upstream_http_x_auth;
auth_request_set $auth_type $upstream_http_x_auth_type;
auth_request_set $real_ip $remote_addr;
proxy_set_header x-webobjects-remote-user "$user";
proxy_set_header Authorization "$auth";
proxy_set_header x-webobjects-auth-type "$auth_type";
@@ -225,6 +227,7 @@ location ^~ /SOGo {
auth_request_set $user $upstream_http_x_user;
auth_request_set $auth $upstream_http_x_auth;
auth_request_set $auth_type $upstream_http_x_auth_type;
auth_request_set $real_ip $remote_addr;
proxy_set_header x-webobjects-remote-user "$user";
proxy_set_header Authorization "$auth";
proxy_set_header x-webobjects-auth-type "$auth_type";

View File

@@ -5352,9 +5352,9 @@ paths:
started_at: "2019-12-22T21:00:01.622856172Z"
state: running
type: info
dockerapi-mailcow:
container: dockerapi-mailcow
image: "mailcow/dockerapi:1.36"
controller-mailcow:
container: controller-mailcow
image: "mailcow/controller:1.36"
started_at: "2019-12-22T20:59:59.984797808Z"
state: running
type: info

View File

@@ -4,12 +4,12 @@ function docker($action, $service_name = null, $attr1 = null, $attr2 = null, $ex
global $redis;
$curl = curl_init();
curl_setopt($curl, CURLOPT_HTTPHEADER,array('Content-Type: application/json' ));
// We are using our mail certificates for dockerapi, the names will not match, the certs are trusted anyway
// We are using our mail certificates for controller, the names will not match, the certs are trusted anyway
curl_setopt($curl, CURLOPT_SSL_VERIFYHOST, 0);
curl_setopt($curl, CURLOPT_SSL_VERIFYPEER, 0);
switch($action) {
case 'get_id':
curl_setopt($curl, CURLOPT_URL, 'https://dockerapi:443/containers/json');
curl_setopt($curl, CURLOPT_URL, 'https://controller:443/containers/json');
curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($curl, CURLOPT_POST, 0);
curl_setopt($curl, CURLOPT_TIMEOUT, $DOCKER_TIMEOUT);
@@ -35,7 +35,7 @@ function docker($action, $service_name = null, $attr1 = null, $attr2 = null, $ex
return false;
break;
case 'containers':
curl_setopt($curl, CURLOPT_URL, 'https://dockerapi:443/containers/json');
curl_setopt($curl, CURLOPT_URL, 'https://controller:443/containers/json');
curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($curl, CURLOPT_POST, 0);
curl_setopt($curl, CURLOPT_TIMEOUT, $DOCKER_TIMEOUT);
@@ -63,7 +63,7 @@ function docker($action, $service_name = null, $attr1 = null, $attr2 = null, $ex
break;
case 'info':
if (empty($service_name)) {
curl_setopt($curl, CURLOPT_URL, 'https://dockerapi:443/containers/json');
curl_setopt($curl, CURLOPT_URL, 'https://controller:443/containers/json');
curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($curl, CURLOPT_POST, 0);
curl_setopt($curl, CURLOPT_TIMEOUT, $DOCKER_TIMEOUT);
@@ -71,7 +71,7 @@ function docker($action, $service_name = null, $attr1 = null, $attr2 = null, $ex
else {
$container_id = docker('get_id', $service_name);
if (ctype_xdigit($container_id)) {
curl_setopt($curl, CURLOPT_URL, 'https://dockerapi:443/containers/' . $container_id . '/json');
curl_setopt($curl, CURLOPT_URL, 'https://controller:443/containers/' . $container_id . '/json');
}
else {
return false;
@@ -102,7 +102,7 @@ function docker($action, $service_name = null, $attr1 = null, $attr2 = null, $ex
}
}
else {
if (isset($decoded_response['Config']['Labels']['com.docker.compose.project'])
if (isset($decoded_response['Config']['Labels']['com.docker.compose.project'])
&& strtolower($decoded_response['Config']['Labels']['com.docker.compose.project']) == strtolower(getenv('COMPOSE_PROJECT_NAME'))) {
unset($container['Config']['Env']);
$out[$decoded_response['Config']['Labels']['com.docker.compose.service']]['State'] = $decoded_response['State'];
@@ -123,7 +123,7 @@ function docker($action, $service_name = null, $attr1 = null, $attr2 = null, $ex
if (!empty($attr1)) {
$container_id = docker('get_id', $service_name);
if (ctype_xdigit($container_id) && ctype_alnum($attr1)) {
curl_setopt($curl, CURLOPT_URL, 'https://dockerapi:443/containers/' . $container_id . '/' . $attr1);
curl_setopt($curl, CURLOPT_URL, 'https://controller:443/containers/' . $container_id . '/' . $attr1);
curl_setopt($curl, CURLOPT_POST, 1);
curl_setopt($curl, CURLOPT_TIMEOUT, $DOCKER_TIMEOUT);
if (!empty($attr2)) {
@@ -157,7 +157,7 @@ function docker($action, $service_name = null, $attr1 = null, $attr2 = null, $ex
}
$container_id = $service_name;
curl_setopt($curl, CURLOPT_URL, 'https://dockerapi:443/container/' . $container_id . '/stats/update');
curl_setopt($curl, CURLOPT_URL, 'https://controller:443/container/' . $container_id . '/stats/update');
curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($curl, CURLOPT_POST, 1);
curl_setopt($curl, CURLOPT_TIMEOUT, $DOCKER_TIMEOUT);
@@ -175,7 +175,7 @@ function docker($action, $service_name = null, $attr1 = null, $attr2 = null, $ex
return false;
break;
case 'host_stats':
curl_setopt($curl, CURLOPT_URL, 'https://dockerapi:443/host/stats');
curl_setopt($curl, CURLOPT_URL, 'https://controller:443/host/stats');
curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($curl, CURLOPT_POST, 0);
curl_setopt($curl, CURLOPT_TIMEOUT, $DOCKER_TIMEOUT);

View File

@@ -488,6 +488,16 @@ function sys_mail($_data) {
'msg' => 'Mass mail job completed, sent ' . count($rcpts) . ' mails'
);
}
function get_remote_ip($use_x_real_ip = true) {
$remote = $_SERVER['REMOTE_ADDR'];
if ($use_x_real_ip && !empty($_SERVER['HTTP_X_REAL_IP'])) {
$remote = $_SERVER['HTTP_X_REAL_IP'];
}
if (filter_var($remote, FILTER_VALIDATE_IP) === false) {
$remote = '0.0.0.0';
}
return $remote;
}
function logger($_data = false) {
/*
logger() will be called as last function

View File

@@ -468,10 +468,11 @@ function mailbox($_action, $_type, $_data = null, $_extra = null) {
':delete2duplicates' => $delete2duplicates,
':active' => $active,
));
$id = $pdo->lastInsertId();
$_SESSION['return'][] = array(
'type' => 'success',
'log' => array(__FUNCTION__, $_action, $_type, $_data_log, $_attr),
'msg' => array('mailbox_modified', $username)
'msg' => array('mailbox_modified', $username, $id)
);
break;
case 'domain':

View File

@@ -105,11 +105,11 @@ http_response_code(500);
<?php
exit;
}
// Stop when dockerapi is not available
if (fsockopen("tcp://dockerapi", 443, $errno, $errstr) === false) {
// Stop when controller is not available
if (fsockopen("tcp://controller", 443, $errno, $errstr) === false) {
http_response_code(500);
?>
<center style='font-family:sans-serif;'>Connection to dockerapi container failed.<br /><br />The following error was reported:<br/><?=$errno;?> - <?=$errstr;?></center>
<center style='font-family:sans-serif;'>Connection to controller container failed.<br /><br />The following error was reported:<br/><?=$errno;?> - <?=$errstr;?></center>
<?php
exit;
}
@@ -165,14 +165,6 @@ if(!$DEV_MODE) {
set_exception_handler('exception_handler');
}
// TODO: Move function
function get_remote_ip() {
$remote = $_SERVER['REMOTE_ADDR'];
if (filter_var($remote, FILTER_VALIDATE_IP) === false) {
return '0.0.0.0';
}
return $remote;
}
// Load core functions first
require_once $_SERVER['DOCUMENT_ROOT'] . '/inc/functions.inc.php';

View File

@@ -68,7 +68,7 @@ if (!empty($_SERVER['HTTP_X_API_KEY'])) {
}
else {
$redis->publish("F2B_CHANNEL", "mailcow UI: Invalid password for API_USER by " . $_SERVER['REMOTE_ADDR']);
error_log("mailcow UI: Invalid password for " . $user . " by " . $_SERVER['REMOTE_ADDR']);
error_log("mailcow UI: Invalid password for API_USER by " . $_SERVER['REMOTE_ADDR']);
http_response_code(401);
echo json_encode(array(
'type' => 'error',
@@ -80,7 +80,7 @@ if (!empty($_SERVER['HTTP_X_API_KEY'])) {
}
else {
$redis->publish("F2B_CHANNEL", "mailcow UI: Invalid password for API_USER by " . $_SERVER['REMOTE_ADDR']);
error_log("mailcow UI: Invalid password for " . $user . " by " . $_SERVER['REMOTE_ADDR']);
error_log("mailcow UI: Invalid password for API_USER by " . $_SERVER['REMOTE_ADDR']);
http_response_code(401);
echo json_encode(array(
'type' => 'error',
@@ -90,6 +90,16 @@ if (!empty($_SERVER['HTTP_X_API_KEY'])) {
exit();
}
}
else {
$remote = get_remote_ip(false);
$docker_ipv4_network = getenv('IPV4_NETWORK');
if ($remote == "{$docker_ipv4_network}.246") {
$_SESSION['mailcow_cc_username'] = 'Controller';
$_SESSION['mailcow_cc_role'] = 'admin';
$_SESSION['mailcow_cc_api'] = true;
$_SESSION['mailcow_cc_api_access'] = 'rw';
}
}
// Handle logouts
if (isset($_POST["logout"])) {

View File

@@ -23,7 +23,16 @@ if (isset($_SERVER['PHP_AUTH_USER'])) {
elseif (preg_match('/^(\/SOGo|)\/Microsoft-Server-ActiveSync.*/', $original_uri) === 1) {
$is_eas = true;
}
$login_check = check_login($username, $password, array('dav' => $is_dav, 'eas' => $is_eas));
if (empty($password)) {
$remote = get_remote_ip();
$docker_ipv4_network = getenv('IPV4_NETWORK');
if ($remote == "{$docker_ipv4_network}.246") {
$login_check = 'user';
$password = file_get_contents("/etc/sogo-sso/sogo-sso.pass");
}
} else {
$login_check = check_login($username, $password, array('dav' => $is_dav, 'eas' => $is_eas));
}
if ($login_check === 'user') {
header("X-User: $username");
header("X-Auth: Basic ".base64_encode("$username:$password"));
@@ -79,6 +88,7 @@ elseif (isset($_SERVER['HTTP_X_ORIGINAL_URI']) && strcasecmp(substr($_SERVER['HT
if (file_exists($_SERVER['DOCUMENT_ROOT'] . '/inc/vars.local.inc.php')) {
include_once $_SERVER['DOCUMENT_ROOT'] . '/inc/vars.local.inc.php';
}
require_once $_SERVER['DOCUMENT_ROOT'] . '/inc/functions.inc.php';
require_once $_SERVER['DOCUMENT_ROOT'] . '/inc/sessions.inc.php';
// extract email address from "/SOGo/so/user@domain/xy"

View File

@@ -117,7 +117,7 @@ services:
- rspamd
php-fpm-mailcow:
image: ghcr.io/mailcow/phpfpm:1.93
image: ghcr.io/mailcow/phpfpm:nightly-29072025
command: "php-fpm -d date.timezone=${TZ} -d expose_php=0"
depends_on:
- redis-mailcow
@@ -251,7 +251,7 @@ services:
- sogo
dovecot-mailcow:
image: ghcr.io/mailcow/dovecot:2.34
image: ghcr.io/mailcow/dovecot:nightly-29072025
depends_on:
- mysql-mailcow
- netfilter-mailcow
@@ -431,6 +431,7 @@ services:
restart: always
networks:
mailcow-network:
ipv4_address: ${IPV4_NETWORK:-172.22.1}.247
aliases:
- nginx
@@ -440,7 +441,7 @@ services:
condition: service_started
unbound-mailcow:
condition: service_healthy
image: ghcr.io/mailcow/acme:1.93
image: ghcr.io/mailcow/acme:nightly-29072025
dns:
- ${IPV4_NETWORK:-172.22.1}.254
environment:
@@ -497,7 +498,7 @@ services:
- /lib/modules:/lib/modules:ro
watchdog-mailcow:
image: ghcr.io/mailcow/watchdog:2.08
image: ghcr.io/mailcow/watchdog:nightly-29072025
dns:
- ${IPV4_NETWORK:-172.22.1}.254
tmpfs:
@@ -569,25 +570,33 @@ services:
aliases:
- watchdog
dockerapi-mailcow:
image: ghcr.io/mailcow/dockerapi:2.11
controller-mailcow:
image: ghcr.io/mailcow/controller:nightly-29072025
security_opt:
- label=disable
restart: always
dns:
- ${IPV4_NETWORK:-172.22.1}.254
environment:
- DBROOT=${DBROOT}
- MAILCOW_HOSTNAME=${MAILCOW_HOSTNAME}
- HTTPS_PORT=${HTTPS_PORT:-443}
- TZ=${TZ}
- DBNAME=${DBNAME}
- DBUSER=${DBUSER}
- DBPASS=${DBPASS}
- DBROOT=${DBROOT}
- REDIS_SLAVEOF_IP=${REDIS_SLAVEOF_IP:-}
- REDIS_SLAVEOF_PORT=${REDIS_SLAVEOF_PORT:-}
- REDISPASS=${REDISPASS}
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
- mysql-socket-vol-1:/var/run/mysqld/:z
- ./data/conf/sogo/sieve.creds:/app/sieve.creds:z
networks:
mailcow-network:
ipv4_address: ${IPV4_NETWORK:-172.22.1}.246
aliases:
- dockerapi
- controller
olefy-mailcow:
image: ghcr.io/mailcow/olefy:1.15

3
mailcow-adm.sh Executable file
View File

@@ -0,0 +1,3 @@
#!/bin/bash
docker compose exec -it controller-mailcow python3 /app/mailcow-adm/mailcow-adm.py "$@"