mirror of
https://github.com/mailcow/mailcow-dockerized.git
synced 2026-01-09 06:59:18 +00:00
207 lines
10 KiB
Python
207 lines
10 KiB
Python
import os
|
|
|
|
from modules.Docker import Docker
|
|
|
|
class Dovecot:
|
|
def __init__(self):
|
|
self.docker = Docker()
|
|
|
|
def decryptMaildir(self, source_dir="/var/vmail/", output_dir=None):
|
|
"""
|
|
Decrypt files in /var/vmail using doveadm if they are encrypted.
|
|
:param output_dir: Directory inside the Dovecot container to store decrypted files, Default overwrite.
|
|
"""
|
|
private_key = "/mail_crypt/ecprivkey.pem"
|
|
public_key = "/mail_crypt/ecpubkey.pem"
|
|
|
|
if output_dir:
|
|
# Ensure the output directory exists inside the container
|
|
mkdir_result = self.docker.exec_command("dovecot-mailcow", f"bash -c 'mkdir -p {output_dir} && chown vmail:vmail {output_dir}'")
|
|
if mkdir_result.get("status") != "success":
|
|
print(f"Error creating output directory: {mkdir_result.get('output')}")
|
|
return
|
|
|
|
find_command = [
|
|
"find", source_dir, "-type", "f", "-regextype", "egrep", "-regex", ".*S=.*W=.*"
|
|
]
|
|
|
|
try:
|
|
find_result = self.docker.exec_command("dovecot-mailcow", " ".join(find_command))
|
|
if find_result.get("status") != "success":
|
|
print(f"Error finding files: {find_result.get('output')}")
|
|
return
|
|
|
|
files = find_result.get("output", "").splitlines()
|
|
|
|
for file in files:
|
|
head_command = f"head -c7 {file}"
|
|
head_result = self.docker.exec_command("dovecot-mailcow", head_command)
|
|
if head_result.get("status") == "success" and head_result.get("output", "").strip() == "CRYPTED":
|
|
if output_dir:
|
|
# Preserve the directory structure in the output directory
|
|
relative_path = os.path.relpath(file, source_dir)
|
|
output_file = os.path.join(output_dir, relative_path)
|
|
current_path = output_dir
|
|
for part in os.path.dirname(relative_path).split(os.sep):
|
|
current_path = os.path.join(current_path, part)
|
|
mkdir_result = self.docker.exec_command("dovecot-mailcow", f"bash -c '[ ! -d {current_path} ] && mkdir {current_path} && chown vmail:vmail {current_path}'")
|
|
if mkdir_result.get("status") != "success":
|
|
print(f"Error creating directory {current_path}: {mkdir_result.get('output')}")
|
|
continue
|
|
else:
|
|
# Overwrite the original file
|
|
output_file = file
|
|
|
|
decrypt_command = (
|
|
f"bash -c 'doveadm fs get compress lz4:1:crypt:private_key_path={private_key}:public_key_path={public_key}:posix:prefix=/ {file} > {output_file}'"
|
|
)
|
|
|
|
decrypt_result = self.docker.exec_command("dovecot-mailcow", decrypt_command)
|
|
if decrypt_result.get("status") == "success":
|
|
print(f"Decrypted {file}")
|
|
|
|
# Verify the file size and set permissions
|
|
size_check_command = f"bash -c '[ -s {output_file} ] && chmod 600 {output_file} && chown vmail:vmail {output_file} || rm -f {output_file}'"
|
|
size_check_result = self.docker.exec_command("dovecot-mailcow", size_check_command)
|
|
if size_check_result.get("status") != "success":
|
|
print(f"Error setting permissions for {output_file}: {size_check_result.get('output')}\n")
|
|
|
|
except Exception as e:
|
|
print(f"Error during decryption: {e}")
|
|
|
|
return "Done"
|
|
|
|
def encryptMaildir(self, source_dir="/var/vmail/", output_dir=None):
|
|
"""
|
|
Encrypt files in /var/vmail using doveadm if they are not already encrypted.
|
|
:param source_dir: Directory inside the Dovecot container to encrypt files.
|
|
:param output_dir: Directory inside the Dovecot container to store encrypted files, Default overwrite.
|
|
"""
|
|
private_key = "/mail_crypt/ecprivkey.pem"
|
|
public_key = "/mail_crypt/ecpubkey.pem"
|
|
|
|
if output_dir:
|
|
# Ensure the output directory exists inside the container
|
|
mkdir_result = self.docker.exec_command("dovecot-mailcow", f"mkdir -p {output_dir}")
|
|
if mkdir_result.get("status") != "success":
|
|
print(f"Error creating output directory: {mkdir_result.get('output')}")
|
|
return
|
|
|
|
find_command = [
|
|
"find", source_dir, "-type", "f", "-regextype", "egrep", "-regex", ".*S=.*W=.*"
|
|
]
|
|
|
|
try:
|
|
find_result = self.docker.exec_command("dovecot-mailcow", " ".join(find_command))
|
|
if find_result.get("status") != "success":
|
|
print(f"Error finding files: {find_result.get('output')}")
|
|
return
|
|
|
|
files = find_result.get("output", "").splitlines()
|
|
|
|
for file in files:
|
|
head_command = f"head -c7 {file}"
|
|
head_result = self.docker.exec_command("dovecot-mailcow", head_command)
|
|
if head_result.get("status") == "success" and head_result.get("output", "").strip() != "CRYPTED":
|
|
if output_dir:
|
|
# Preserve the directory structure in the output directory
|
|
relative_path = os.path.relpath(file, source_dir)
|
|
output_file = os.path.join(output_dir, relative_path)
|
|
current_path = output_dir
|
|
for part in os.path.dirname(relative_path).split(os.sep):
|
|
current_path = os.path.join(current_path, part)
|
|
mkdir_result = self.docker.exec_command("dovecot-mailcow", f"bash -c '[ ! -d {current_path} ] && mkdir {current_path} && chown vmail:vmail {current_path}'")
|
|
if mkdir_result.get("status") != "success":
|
|
print(f"Error creating directory {current_path}: {mkdir_result.get('output')}")
|
|
continue
|
|
else:
|
|
# Overwrite the original file
|
|
output_file = file
|
|
|
|
encrypt_command = (
|
|
f"bash -c 'doveadm fs put crypt private_key_path={private_key}:public_key_path={public_key}:posix:prefix=/ {file} {output_file}'"
|
|
)
|
|
|
|
encrypt_result = self.docker.exec_command("dovecot-mailcow", encrypt_command)
|
|
if encrypt_result.get("status") == "success":
|
|
print(f"Encrypted {file}")
|
|
|
|
# Set permissions
|
|
permissions_command = f"bash -c 'chmod 600 {output_file} && chown 5000:5000 {output_file}'"
|
|
permissions_result = self.docker.exec_command("dovecot-mailcow", permissions_command)
|
|
if permissions_result.get("status") != "success":
|
|
print(f"Error setting permissions for {output_file}: {permissions_result.get('output')}\n")
|
|
|
|
except Exception as e:
|
|
print(f"Error during encryption: {e}")
|
|
|
|
return "Done"
|
|
|
|
def listDeletedMaildirs(self, source_dir="/var/vmail/_garbage"):
|
|
"""
|
|
List deleted maildirs in the specified garbage directory.
|
|
:param source_dir: Directory to search for deleted maildirs.
|
|
:return: List of maildirs.
|
|
"""
|
|
list_command = ["bash", "-c", f"ls -la {source_dir}"]
|
|
|
|
try:
|
|
result = self.docker.exec_command("dovecot-mailcow", list_command)
|
|
if result.get("status") != "success":
|
|
print(f"Error listing deleted maildirs: {result.get('output')}")
|
|
return []
|
|
|
|
lines = result.get("output", "").splitlines()
|
|
maildirs = {}
|
|
|
|
for idx, line in enumerate(lines):
|
|
parts = line.split()
|
|
if "_" in line:
|
|
folder_name = parts[-1]
|
|
time, maildir = folder_name.split("_", 1)
|
|
|
|
if maildir.endswith("_index"):
|
|
main_item = maildir[:-6]
|
|
if main_item in maildirs:
|
|
maildirs[main_item]["has_index"] = True
|
|
else:
|
|
maildirs[maildir] = {"item": idx, "time": time, "name": maildir, "has_index": False}
|
|
|
|
return list(maildirs.values())
|
|
|
|
except Exception as e:
|
|
print(f"Error during listing deleted maildirs: {e}")
|
|
return []
|
|
|
|
def restoreMaildir(self, username, item, source_dir="/var/vmail/_garbage"):
|
|
"""
|
|
Restore a maildir item for a specific user from the deleted maildirs.
|
|
:param username: Username to restore the item to.
|
|
:param item: Item to restore (e.g., mailbox, folder).
|
|
:param source_dir: Directory containing deleted maildirs.
|
|
:return: Response from Dovecot.
|
|
"""
|
|
username_splitted = username.split("@")
|
|
maildirs = self.listDeletedMaildirs()
|
|
|
|
maildir = None
|
|
for mdir in maildirs:
|
|
if mdir["item"] == int(item):
|
|
maildir = mdir
|
|
break
|
|
if not maildir:
|
|
return {"status": "error", "message": "Maildir not found."}
|
|
|
|
restore_command = f"mv {source_dir}/{maildir['time']}_{maildir['name']} /var/vmail/{username_splitted[1]}/{username_splitted[0]}"
|
|
restore_index_command = f"mv {source_dir}/{maildir['time']}_{maildir['name']}_index /var/vmail_index/{username}"
|
|
|
|
result = self.docker.exec_command("dovecot-mailcow", ["bash", "-c", restore_command])
|
|
if result.get("status") != "success":
|
|
return {"status": "error", "message": "Failed to restore maildir."}
|
|
|
|
result = self.docker.exec_command("dovecot-mailcow", ["bash", "-c", restore_index_command])
|
|
if result.get("status") != "success":
|
|
return {"status": "error", "message": "Failed to restore maildir index."}
|
|
|
|
return "Done"
|