mirror of
https://github.com/mailcow/mailcow-dockerized.git
synced 2026-06-11 17:10:28 +00:00
[Agent] Replace dockerapi container with Redis-based control bus
This commit is contained in:
@@ -0,0 +1,34 @@
|
||||
# Builder image for mailcow-agent. Each service Dockerfile pulls the static
|
||||
# binary from here via:
|
||||
#
|
||||
# COPY --from=ghcr.io/mailcow/agent:VERSION /out/mailcow-agent /usr/local/bin/mailcow-agent
|
||||
#
|
||||
# For local development: build this image first.
|
||||
#
|
||||
# docker build -t ghcr.io/mailcow/agent:dev data/Dockerfiles/agent/
|
||||
#
|
||||
# CI publishes a versioned tag and the service Dockerfiles pin against it via
|
||||
# ARG AGENT_IMAGE.
|
||||
|
||||
FROM golang:1.22-alpine AS build
|
||||
|
||||
ENV CGO_ENABLED=0 \
|
||||
GOOS=linux
|
||||
|
||||
WORKDIR /src
|
||||
|
||||
COPY go.mod go.sum* ./
|
||||
RUN go mod download
|
||||
|
||||
COPY . .
|
||||
|
||||
RUN mkdir -p /out \
|
||||
&& go build -trimpath -ldflags="-s -w" \
|
||||
-o /out/mailcow-agent ./cmd/mailcow-agent \
|
||||
&& cp mailcow-agent-cli /out/mailcow-agent-cli \
|
||||
&& chmod +x /out/mailcow-agent-cli
|
||||
|
||||
# Final stage: tiny image whose only purpose is to be a COPY --from source.
|
||||
FROM scratch
|
||||
COPY --from=build /out/mailcow-agent /out/mailcow-agent
|
||||
COPY --from=build /out/mailcow-agent-cli /out/mailcow-agent-cli
|
||||
@@ -0,0 +1,16 @@
|
||||
# mailcow-agent
|
||||
|
||||
Each mailcow service container (postfix, dovecot, …) runs `mailcow-agent` as
|
||||
ENTRYPOINT. It supervises the original service main process and exposes its
|
||||
control commands over a Redis Pub/Sub bus:
|
||||
|
||||
- `mailcow.control.<service>` — request channel (Backend → Agent)
|
||||
- `mailcow.reply.<request_id>` — per-request reply channel
|
||||
- `mailcow.events.<topic>` — broadcast events
|
||||
- `mailcow.nodes.<service>` (ZSET) + `mailcow.node.<service>.<node_id>` (HASH) — heartbeat registry
|
||||
- `mailcow.stats.<service>.<node_id>` (HASH) — per-node cpu/memory stats
|
||||
|
||||
Service behaviour is selected via `MAILCOW_AGENT_SERVICE=<service>`. The main
|
||||
process command is configured via `MAILCOW_AGENT_MAIN_CMD` (string, executed via
|
||||
`sh -c` so existing entrypoints/supervisord commands keep working).
|
||||
|
||||
@@ -0,0 +1,278 @@
|
||||
// Per-container control-bus subscriber. Subscribes to mailcow.control.<service>
|
||||
// on Redis, runs handlers from the per-service command table, publishes
|
||||
// heartbeats and stats. Optionally supervises a child process.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/bus"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/commands"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/proc"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/registry"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/services"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/stats"
|
||||
)
|
||||
|
||||
const agentVersion = "0.1.0"
|
||||
|
||||
// atomicSignal shares the last caught terminal signal between the handler
|
||||
// goroutine and main() so it can be forwarded to the supervised child.
|
||||
type atomicSignal struct{ v atomic.Int32 }
|
||||
|
||||
func (a *atomicSignal) Store(s syscall.Signal) { a.v.Store(int32(s)) }
|
||||
func (a *atomicSignal) Load() os.Signal { return syscall.Signal(a.v.Load()) }
|
||||
|
||||
// healthState holds the latest health probe result. Written by the probe loop,
|
||||
// read by the heartbeat loop.
|
||||
type healthState struct {
|
||||
mu sync.RWMutex
|
||||
ok bool
|
||||
detail string
|
||||
at time.Time
|
||||
}
|
||||
|
||||
func (h *healthState) Set(ok bool, detail string) {
|
||||
h.mu.Lock()
|
||||
h.ok = ok
|
||||
h.detail = detail
|
||||
h.at = time.Now()
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *healthState) Snapshot() (ok bool, detail string, at time.Time) {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return h.ok, h.detail, h.at
|
||||
}
|
||||
|
||||
func main() {
|
||||
service := strings.TrimSpace(os.Getenv("MAILCOW_AGENT_SERVICE"))
|
||||
if service == "" {
|
||||
fmt.Fprintf(os.Stderr, "mailcow-agent: MAILCOW_AGENT_SERVICE is required. Known: %v\n", services.Known())
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
// `mailcow-agent healthcheck` runs the probe once and exits 0/1
|
||||
if len(os.Args) > 1 && os.Args[1] == "healthcheck" {
|
||||
runHealthcheckOnce(service)
|
||||
}
|
||||
|
||||
nodeID := strings.TrimSpace(os.Getenv("MAILCOW_AGENT_NODE_ID"))
|
||||
if nodeID == "" {
|
||||
h, err := os.Hostname()
|
||||
if err != nil {
|
||||
log.Fatalf("mailcow-agent: hostname: %v", err)
|
||||
}
|
||||
nodeID = h
|
||||
}
|
||||
|
||||
mainCmd := strings.TrimSpace(os.Getenv("MAILCOW_AGENT_MAIN_CMD"))
|
||||
// host-agent has no supervised child; everything else runs one.
|
||||
wantsSupervisor := service != "host" && mainCmd != ""
|
||||
|
||||
rdb, err := newRedis()
|
||||
if err != nil {
|
||||
log.Fatalf("mailcow-agent: redis: %v", err)
|
||||
}
|
||||
defer rdb.Close()
|
||||
|
||||
// Start the supervised process before serving bus requests — restart/stop
|
||||
// handlers assume something is already running.
|
||||
var sup *proc.Supervisor
|
||||
if wantsSupervisor {
|
||||
sup = proc.New(mainCmd)
|
||||
if err := sup.Start(); err != nil {
|
||||
log.Fatalf("mailcow-agent: start main: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
table, err := services.Build(service, sup)
|
||||
if err != nil {
|
||||
log.Fatalf("mailcow-agent: %v", err)
|
||||
}
|
||||
|
||||
// We handle signals ourselves so we can (a) suppress the Go-runtime stack
|
||||
// dump on SIGQUIT (php-fpm-alpine's STOPSIGNAL) and (b) remember which
|
||||
// signal arrived to forward it to the child on shutdown.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT,
|
||||
syscall.SIGHUP, syscall.SIGUSR1, syscall.SIGUSR2)
|
||||
defer signal.Stop(sigCh)
|
||||
|
||||
stopSig := atomicSignal{}
|
||||
stopSig.Store(syscall.SIGTERM)
|
||||
|
||||
go func() {
|
||||
for sig := range sigCh {
|
||||
switch sig {
|
||||
case syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT:
|
||||
stopSig.Store(sig.(syscall.Signal))
|
||||
log.Printf("mailcow-agent: caught %s, beginning graceful shutdown", sig)
|
||||
cancel()
|
||||
return
|
||||
case syscall.SIGHUP, syscall.SIGUSR1, syscall.SIGUSR2:
|
||||
if sup != nil {
|
||||
sup.SignalChild(sig)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Initial state is "ok" so the service isn't flagged unhealthy before the
|
||||
// first probe has run.
|
||||
health := &healthState{ok: true, detail: "", at: time.Now()}
|
||||
if table.HealthProbe != nil {
|
||||
go runHealthLoop(ctx, table.HealthProbe, health, 10*time.Second)
|
||||
}
|
||||
|
||||
hb := registry.Heartbeat{
|
||||
Service: service,
|
||||
NodeID: nodeID,
|
||||
Version: agentVersion,
|
||||
StartedAt: time.Now(),
|
||||
Image: os.Getenv("MAILCOW_AGENT_IMAGE"),
|
||||
Health: health,
|
||||
}
|
||||
go registry.Loop(ctx, rdb, hb, 10*time.Second)
|
||||
|
||||
// cgroup stats for this container. Host metrics come from exec.host-stats.
|
||||
pub := stats.NewPublisher(rdb, service, nodeID)
|
||||
go pub.Run(ctx, 10*time.Second)
|
||||
|
||||
srv := bus.NewServer(rdb, table, nodeID)
|
||||
busErrCh := make(chan error, 1)
|
||||
go func() { busErrCh <- srv.Run(ctx) }()
|
||||
|
||||
log.Printf("mailcow-agent: service=%s node=%s ready (commands=%d)", service, nodeID, len(table.Handlers))
|
||||
|
||||
// Exit only on outside termination or fatal bus error. A crashed/stopped
|
||||
// child should not tear down the container — the operator may want to
|
||||
// issue `start` over the bus afterwards.
|
||||
exitCode := 0
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("mailcow-agent: shutdown signal received")
|
||||
case err := <-busErrCh:
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
log.Printf("mailcow-agent: bus loop exited: %v", err)
|
||||
exitCode = 1
|
||||
}
|
||||
}
|
||||
|
||||
// Graceful shutdown bounded at 35s.
|
||||
shutCtx, shutCancel := context.WithTimeout(context.Background(), 35*time.Second)
|
||||
defer shutCancel()
|
||||
_ = srv.Shutdown(shutCtx)
|
||||
_ = registry.Deregister(shutCtx, rdb, service, nodeID)
|
||||
if sup != nil {
|
||||
// Forward the exact signal we received so the child sees the same
|
||||
// shutdown semantics it would without us in front (e.g. SIGQUIT →
|
||||
// php-fpm graceful drain).
|
||||
if err := sup.StopWithSignal(shutCtx, stopSig.Load()); err != nil {
|
||||
log.Printf("mailcow-agent: stop main: %v", err)
|
||||
}
|
||||
}
|
||||
os.Exit(exitCode)
|
||||
}
|
||||
|
||||
// runHealthcheckOnce runs the local probe with a tight deadline and exits 0/1.
|
||||
// Used by the `healthcheck` sub-command path.
|
||||
func runHealthcheckOnce(service string) {
|
||||
table, err := services.Build(service, nil)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "mailcow-agent healthcheck:", err)
|
||||
os.Exit(2)
|
||||
}
|
||||
if table.HealthProbe == nil {
|
||||
// Services without a probe are considered healthy.
|
||||
os.Exit(0)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := table.HealthProbe(ctx); err != nil {
|
||||
fmt.Fprintln(os.Stderr, "unhealthy:", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
// runHealthLoop ticks the probe and updates state. Same probe path as the
|
||||
// healthcheck sub-command.
|
||||
func runHealthLoop(ctx context.Context, probe commands.HealthProbe, state *healthState, interval time.Duration) {
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
check := func() {
|
||||
pctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
if err := probe(pctx); err != nil {
|
||||
state.Set(false, err.Error())
|
||||
} else {
|
||||
state.Set(true, "")
|
||||
}
|
||||
}
|
||||
check()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
check()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newRedis() (*redis.Client, error) {
|
||||
addr := os.Getenv("REDIS_SLAVEOF_IP")
|
||||
port := os.Getenv("REDIS_SLAVEOF_PORT")
|
||||
if addr == "" {
|
||||
addr = "redis-mailcow"
|
||||
port = "6379"
|
||||
}
|
||||
if port == "" {
|
||||
port = "6379"
|
||||
}
|
||||
pass := os.Getenv("REDISPASS")
|
||||
cli := redis.NewClient(&redis.Options{
|
||||
Addr: addr + ":" + port,
|
||||
Password: pass,
|
||||
DB: 0,
|
||||
MaxRetries: -1,
|
||||
MinRetryBackoff: 200 * time.Millisecond,
|
||||
MaxRetryBackoff: 5 * time.Second,
|
||||
})
|
||||
// Wait up to 2 minutes for Redis to come up before giving up
|
||||
deadline := time.Now().Add(2 * time.Minute)
|
||||
var lastErr error
|
||||
for attempt := 1; time.Now().Before(deadline); attempt++ {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
err := cli.Ping(ctx).Err()
|
||||
cancel()
|
||||
if err == nil {
|
||||
return cli, nil
|
||||
}
|
||||
lastErr = err
|
||||
wait := time.Duration(attempt) * time.Second
|
||||
if wait > 10*time.Second {
|
||||
wait = 10 * time.Second
|
||||
}
|
||||
log.Printf("mailcow-agent: waiting for redis %s (attempt %d): %v", addr, attempt, err)
|
||||
time.Sleep(wait)
|
||||
}
|
||||
return nil, fmt.Errorf("ping %s after 2m: %w", addr, lastErr)
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
module github.com/mailcow/mailcow-dockerized/agent
|
||||
|
||||
go 1.22
|
||||
|
||||
require (
|
||||
github.com/oklog/ulid/v2 v2.1.0
|
||||
github.com/redis/go-redis/v9 v9.7.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
)
|
||||
@@ -0,0 +1,6 @@
|
||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
|
||||
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
|
||||
@@ -0,0 +1,175 @@
|
||||
// Package bus implements the Redis Pub/Sub control bus: subscribing to the
|
||||
// service's control channel, dispatching envelopes to a commands.Table, and
|
||||
// publishing responses back to env.ReplyTo.
|
||||
package bus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/commands"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/envelope"
|
||||
)
|
||||
|
||||
// ControlChannel assembles the per-service control channel.
|
||||
func ControlChannel(service string) string { return "mailcow.control." + service }
|
||||
|
||||
// Server subscribes to a control channel and dispatches commands.
|
||||
type Server struct {
|
||||
rdb *redis.Client
|
||||
table *commands.Table
|
||||
nodeID string
|
||||
dedupe *lru
|
||||
stop chan struct{}
|
||||
stopped sync.Once
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewServer wires a fresh server. nodeID is stamped into every Response and is
|
||||
// what the backend's fan-in aggregator uses to attribute results.
|
||||
func NewServer(rdb *redis.Client, table *commands.Table, nodeID string) *Server {
|
||||
return &Server{
|
||||
rdb: rdb,
|
||||
table: table,
|
||||
nodeID: nodeID,
|
||||
dedupe: newLRU(1024),
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Run blocks, subscribing to ControlChannel(service) and dispatching incoming
|
||||
// envelopes concurrently. It returns when ctx is done or Shutdown is called.
|
||||
func (s *Server) Run(ctx context.Context) error {
|
||||
channel := ControlChannel(s.table.Service)
|
||||
sub := s.rdb.Subscribe(ctx, channel)
|
||||
defer sub.Close()
|
||||
if _, err := sub.Receive(ctx); err != nil {
|
||||
return fmt.Errorf("bus: subscribe %s: %w", channel, err)
|
||||
}
|
||||
msgs := sub.Channel()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
s.wg.Wait()
|
||||
return ctx.Err()
|
||||
case <-s.stop:
|
||||
s.wg.Wait()
|
||||
return nil
|
||||
case m, ok := <-msgs:
|
||||
if !ok {
|
||||
s.wg.Wait()
|
||||
return errors.New("bus: subscription channel closed")
|
||||
}
|
||||
s.wg.Add(1)
|
||||
go func(payload string) {
|
||||
defer s.wg.Done()
|
||||
s.dispatch(ctx, payload)
|
||||
}(m.Payload)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown signals Run to stop and waits for in-flight handlers (bounded by
|
||||
// ctx).
|
||||
func (s *Server) Shutdown(ctx context.Context) error {
|
||||
s.stopped.Do(func() { close(s.stop) })
|
||||
done := make(chan struct{})
|
||||
go func() { s.wg.Wait(); close(done) }()
|
||||
select {
|
||||
case <-done:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) dispatch(parent context.Context, payload string) {
|
||||
var req envelope.Request
|
||||
if err := json.Unmarshal([]byte(payload), &req); err != nil {
|
||||
// Malformed envelope: no RequestID/ReplyTo we can trust — drop.
|
||||
return
|
||||
}
|
||||
if req.RequestID != "" && !s.dedupe.add(req.RequestID) {
|
||||
// Duplicate (retry of an idempotent command): silently absorb.
|
||||
return
|
||||
}
|
||||
// Per-node targeting: if args.target_node is set and doesn't match us,
|
||||
// drop silently. The intended replica picks it up and replies.
|
||||
if target, ok := req.Args["target_node"].(string); ok && target != "" && target != s.nodeID {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := handlerContext(parent, req.Deadline)
|
||||
defer cancel()
|
||||
|
||||
start := time.Now()
|
||||
resp := envelope.Response{RequestID: req.RequestID, OK: true, Node: s.nodeID}
|
||||
|
||||
if h := s.table.Lookup(req.Cmd); h == nil {
|
||||
resp.OK = false
|
||||
resp.Error = fmt.Sprintf("no handler for cmd %q", req.Cmd)
|
||||
resp.ErrorCode = envelope.ErrCodeUnsupportedCommand
|
||||
} else {
|
||||
result, err := runWithRecover(ctx, h, req.Args)
|
||||
switch {
|
||||
case err == nil:
|
||||
resp.Result = result
|
||||
case errors.Is(err, commands.ErrNotFound):
|
||||
resp.OK = false
|
||||
resp.Error = err.Error()
|
||||
resp.ErrorCode = envelope.ErrCodeNotFound
|
||||
case errors.Is(err, commands.ErrValidation):
|
||||
resp.OK = false
|
||||
resp.Error = err.Error()
|
||||
resp.ErrorCode = envelope.ErrCodeValidation
|
||||
case errors.Is(err, context.DeadlineExceeded), errors.Is(ctx.Err(), context.DeadlineExceeded):
|
||||
resp.OK = false
|
||||
resp.Error = err.Error()
|
||||
resp.ErrorCode = envelope.ErrCodeTimeout
|
||||
default:
|
||||
resp.OK = false
|
||||
resp.Error = err.Error()
|
||||
resp.ErrorCode = envelope.ErrCodeInternal
|
||||
}
|
||||
}
|
||||
resp.DurationMS = time.Since(start).Milliseconds()
|
||||
|
||||
if req.ReplyTo == "" {
|
||||
return
|
||||
}
|
||||
data, err := json.Marshal(resp)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Replies go through a List (RPUSH + EXPIRE), not Pub/Sub. This sidesteps
|
||||
// the "subscribe-before-publish" race and lets the backend fan-in via
|
||||
// BLPOP — important because PhpRedis's subscribe() blocks, so we can't
|
||||
// publish on the same connection after subscribing. Use parent ctx so a
|
||||
// per-handler deadline can't stop us from delivering the timeout response.
|
||||
pipe := s.rdb.Pipeline()
|
||||
pipe.RPush(parent, req.ReplyTo, data)
|
||||
pipe.Expire(parent, req.ReplyTo, 60*time.Second)
|
||||
_, _ = pipe.Exec(parent)
|
||||
}
|
||||
|
||||
func runWithRecover(ctx context.Context, h commands.Handler, args map[string]any) (out any, err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = fmt.Errorf("handler panic: %v", r)
|
||||
}
|
||||
}()
|
||||
return h(ctx, args)
|
||||
}
|
||||
|
||||
func handlerContext(parent context.Context, deadline time.Time) (context.Context, context.CancelFunc) {
|
||||
if deadline.IsZero() {
|
||||
return context.WithCancel(parent)
|
||||
}
|
||||
return context.WithDeadline(parent, deadline)
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package bus
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// lru is a tiny request-id deduplication cache. The bus treats Pub/Sub
|
||||
// retries (same request_id) as no-ops. Not a security boundary — only a
|
||||
// best-effort guard against accidental double-execution.
|
||||
type lru struct {
|
||||
mu sync.Mutex
|
||||
cap int
|
||||
idx map[string]*list.Element
|
||||
list *list.List
|
||||
}
|
||||
|
||||
func newLRU(cap int) *lru {
|
||||
return &lru{cap: cap, idx: make(map[string]*list.Element, cap), list: list.New()}
|
||||
}
|
||||
|
||||
// add returns true if id is new and was inserted; false if it was already
|
||||
// known (caller should skip the duplicate).
|
||||
func (l *lru) add(id string) bool {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
if _, ok := l.idx[id]; ok {
|
||||
return false
|
||||
}
|
||||
e := l.list.PushFront(id)
|
||||
l.idx[id] = e
|
||||
for l.list.Len() > l.cap {
|
||||
old := l.list.Back()
|
||||
l.list.Remove(old)
|
||||
delete(l.idx, old.Value.(string))
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
// Package commands defines the per-service handler table. The bus dispatcher
|
||||
// looks up handlers by name and wraps the result in an envelope.Response.
|
||||
package commands
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
)
|
||||
|
||||
// ErrNotFound signals that the target (queue id, mailbox, …) doesn't live on
|
||||
// this node. For broadcast operations the aggregator still counts success if
|
||||
// any other node returns ok.
|
||||
var ErrNotFound = errors.New("not_found")
|
||||
|
||||
// ErrValidation indicates a missing or malformed argument.
|
||||
var ErrValidation = errors.New("validation")
|
||||
|
||||
// Handler executes a single command for a service.
|
||||
type Handler func(ctx context.Context, args map[string]any) (any, error)
|
||||
|
||||
// HealthProbe returns nil if the supervised service is healthy, error otherwise.
|
||||
// Shared between the `healthcheck` sub-command and the agent's heartbeat loop.
|
||||
type HealthProbe func(ctx context.Context) error
|
||||
|
||||
// Table is the per-service command registry built once at startup.
|
||||
type Table struct {
|
||||
Service string
|
||||
Handlers map[string]Handler
|
||||
HealthProbe HealthProbe
|
||||
}
|
||||
|
||||
// New constructs an empty table for a service.
|
||||
func New(service string) *Table {
|
||||
return &Table{Service: service, Handlers: make(map[string]Handler)}
|
||||
}
|
||||
|
||||
// Register adds a handler. Duplicate registration panics — wiring bugs should
|
||||
// be loud.
|
||||
func (t *Table) Register(cmd string, h Handler) {
|
||||
if _, dup := t.Handlers[cmd]; dup {
|
||||
panic("commands: duplicate handler " + t.Service + "/" + cmd)
|
||||
}
|
||||
t.Handlers[cmd] = h
|
||||
}
|
||||
|
||||
// Lookup returns the handler for cmd or nil.
|
||||
func (t *Table) Lookup(cmd string) Handler {
|
||||
return t.Handlers[cmd]
|
||||
}
|
||||
|
||||
// ArgString extracts a required string argument.
|
||||
func ArgString(args map[string]any, key string) (string, error) {
|
||||
v, ok := args[key]
|
||||
if !ok {
|
||||
return "", errArg(key, "missing")
|
||||
}
|
||||
s, ok := v.(string)
|
||||
if !ok || s == "" {
|
||||
return "", errArg(key, "must be non-empty string")
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// ArgStringOpt returns an optional string argument with a default.
|
||||
func ArgStringOpt(args map[string]any, key, def string) string {
|
||||
if v, ok := args[key]; ok {
|
||||
if s, ok := v.(string); ok && s != "" {
|
||||
return s
|
||||
}
|
||||
}
|
||||
return def
|
||||
}
|
||||
|
||||
func errArg(key, reason string) error {
|
||||
return &validationError{key: key, reason: reason}
|
||||
}
|
||||
|
||||
type validationError struct{ key, reason string }
|
||||
|
||||
func (e *validationError) Error() string { return "arg " + e.key + ": " + e.reason }
|
||||
func (e *validationError) Is(target error) bool {
|
||||
return target == ErrValidation
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
package commands
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
)
|
||||
|
||||
// RunOptions configures a single Run invocation.
|
||||
type RunOptions struct {
|
||||
// Stdin, if non-nil, is written to the process stdin.
|
||||
Stdin []byte
|
||||
// CombinedOutputCap limits the captured output (truncated at the end).
|
||||
// 0 means unlimited. The agent uses ~1 MiB for cat-queue, smaller for
|
||||
// status-style commands.
|
||||
OutputCap int
|
||||
}
|
||||
|
||||
// RunResult is what every shell-style command returns.
|
||||
type RunResult struct {
|
||||
Stdout string `json:"stdout,omitempty"`
|
||||
Stderr string `json:"stderr,omitempty"`
|
||||
ExitCode int `json:"exit_code"`
|
||||
}
|
||||
|
||||
// Run executes argv[0] argv[1:] under ctx (the bus deadline). It does not
|
||||
// translate exit codes to errors — callers inspect r.ExitCode themselves so
|
||||
// they can map e.g. "queue id not found" exit codes to ErrNotFound.
|
||||
func Run(ctx context.Context, opts RunOptions, argv ...string) (*RunResult, error) {
|
||||
if len(argv) == 0 {
|
||||
return nil, fmt.Errorf("commands.Run: empty argv")
|
||||
}
|
||||
cmd := exec.CommandContext(ctx, argv[0], argv[1:]...)
|
||||
var stdout, stderr bytes.Buffer
|
||||
cmd.Stdout = &stdout
|
||||
cmd.Stderr = &stderr
|
||||
if opts.Stdin != nil {
|
||||
cmd.Stdin = bytes.NewReader(opts.Stdin)
|
||||
}
|
||||
err := cmd.Run()
|
||||
|
||||
out := stdout.String()
|
||||
errOut := stderr.String()
|
||||
if opts.OutputCap > 0 {
|
||||
if len(out) > opts.OutputCap {
|
||||
out = out[:opts.OutputCap] + "\n…(truncated)"
|
||||
}
|
||||
if len(errOut) > opts.OutputCap {
|
||||
errOut = errOut[:opts.OutputCap] + "\n…(truncated)"
|
||||
}
|
||||
}
|
||||
|
||||
exit := 0
|
||||
if exitErr, ok := err.(*exec.ExitError); ok {
|
||||
exit = exitErr.ExitCode()
|
||||
err = nil
|
||||
}
|
||||
return &RunResult{Stdout: out, Stderr: errOut, ExitCode: exit}, err
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
// Package envelope defines the wire format for the mailcow-agent control bus.
|
||||
package envelope
|
||||
|
||||
import "time"
|
||||
|
||||
// Request is what the backend publishes on mailcow.control.<service>.
|
||||
type Request struct {
|
||||
Cmd string `json:"cmd"`
|
||||
RequestID string `json:"request_id"`
|
||||
Args map[string]any `json:"args,omitempty"`
|
||||
ReplyTo string `json:"reply_to,omitempty"`
|
||||
Deadline time.Time `json:"deadline,omitempty"`
|
||||
IssuedBy string `json:"issued_by,omitempty"`
|
||||
}
|
||||
|
||||
// Response is what the agent publishes on the reply_to channel.
|
||||
type Response struct {
|
||||
RequestID string `json:"request_id"`
|
||||
OK bool `json:"ok"`
|
||||
Result any `json:"result,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
ErrorCode string `json:"error_code,omitempty"`
|
||||
DurationMS int64 `json:"duration_ms"`
|
||||
Node string `json:"node,omitempty"`
|
||||
}
|
||||
|
||||
// Error codes returned in Response.ErrorCode. Keep in sync with the V2 schema.
|
||||
const (
|
||||
ErrCodeValidation = "validation"
|
||||
ErrCodeNotFound = "not_found"
|
||||
ErrCodeTimeout = "timeout"
|
||||
ErrCodeUnsupportedCommand = "unsupported_command"
|
||||
ErrCodeInternal = "internal"
|
||||
)
|
||||
@@ -0,0 +1,253 @@
|
||||
// Package proc supervises the service's main process — postfix, dovecot,
|
||||
// nginx, … — as a child of the agent. It exposes the high-level lifecycle
|
||||
// verbs (reload/restart/stop/start) used by the per-service command tables.
|
||||
//
|
||||
// "reload" → SIGHUP
|
||||
// "restart" → SIGTERM, wait, exec again
|
||||
// "stop" → SIGTERM, leave stopped
|
||||
// "start" → exec again (only if currently stopped)
|
||||
package proc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Supervisor wraps a single child process.
|
||||
type Supervisor struct {
|
||||
cmdLine string // shell command (passed to `sh -c …`)
|
||||
stopSignal os.Signal
|
||||
stopGrace time.Duration
|
||||
|
||||
mu sync.Mutex
|
||||
cmd *exec.Cmd
|
||||
stopped bool
|
||||
exitedCh chan struct{}
|
||||
}
|
||||
|
||||
// New constructs a Supervisor. cmdLine is executed via `sh -c` so existing
|
||||
// docker-entrypoint.sh scripts keep working without quoting headaches.
|
||||
func New(cmdLine string) *Supervisor {
|
||||
return &Supervisor{
|
||||
cmdLine: cmdLine,
|
||||
stopSignal: syscall.SIGTERM,
|
||||
stopGrace: 30 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
// Start launches the child process. Returns an error if it cannot be spawned.
|
||||
// The agent's main() also blocks on Wait() to surface exit status.
|
||||
func (s *Supervisor) Start() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.cmd != nil && s.cmd.Process != nil && !s.stopped {
|
||||
return errors.New("proc: already running")
|
||||
}
|
||||
// `exec ` prefix tells the shell to replace itself with the command
|
||||
// instead of forking and waiting. Without it, sh stays alive as the
|
||||
// parent of the real service process, signals from us land on the
|
||||
// shell instead of on the service, and SIGHUP for config reloads
|
||||
// silently does nothing. With the prefix the supervised PID *is* the
|
||||
// service after the script's own `exec "$@"` chains through.
|
||||
cmd := exec.Command("/bin/sh", "-c", "exec "+s.cmdLine)
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
||||
if err := cmd.Start(); err != nil {
|
||||
return fmt.Errorf("proc: start: %w", err)
|
||||
}
|
||||
s.cmd = cmd
|
||||
s.stopped = false
|
||||
s.exitedCh = make(chan struct{})
|
||||
go func() {
|
||||
_ = cmd.Wait()
|
||||
close(s.exitedCh)
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wait blocks until the child exits and returns its exit code.
|
||||
func (s *Supervisor) Wait() int {
|
||||
s.mu.Lock()
|
||||
exited := s.exitedCh
|
||||
cmd := s.cmd
|
||||
s.mu.Unlock()
|
||||
if exited == nil {
|
||||
return -1
|
||||
}
|
||||
<-exited
|
||||
if cmd == nil || cmd.ProcessState == nil {
|
||||
return -1
|
||||
}
|
||||
return cmd.ProcessState.ExitCode()
|
||||
}
|
||||
|
||||
// SignalChild forwards a single signal to the supervised child without
|
||||
// changing the supervisor's lifecycle state. Used to relay SIGHUP/USR1/USR2
|
||||
// from the agent's signal handler to the service so operators can still
|
||||
// `docker compose kill -s HUP postfix-mailcow` and see the expected effect.
|
||||
func (s *Supervisor) SignalChild(sig os.Signal) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.cmd == nil || s.cmd.Process == nil || s.stopped {
|
||||
return errors.New("proc: not running")
|
||||
}
|
||||
return s.cmd.Process.Signal(sig)
|
||||
}
|
||||
|
||||
// Reload sends SIGHUP. Returns nil if the signal was delivered.
|
||||
func (s *Supervisor) Reload() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.cmd == nil || s.cmd.Process == nil || s.stopped {
|
||||
return errors.New("proc: not running")
|
||||
}
|
||||
return s.cmd.Process.Signal(syscall.SIGHUP)
|
||||
}
|
||||
|
||||
// Stop sends the configured stop signal and waits for the process to exit
|
||||
// (bounded by stopGrace). Marks the supervisor as stopped — Start must be
|
||||
// called again to relaunch.
|
||||
func (s *Supervisor) Stop(ctx context.Context) error {
|
||||
return s.StopWithSignal(ctx, s.stopSignal)
|
||||
}
|
||||
|
||||
// StopWithSignal is like Stop but lets the caller override the stop signal.
|
||||
// Used by main() to forward whatever signal Docker sent us (SIGTERM for
|
||||
// most containers, SIGQUIT for php-fpm-alpine which uses SIGQUIT for
|
||||
// graceful shutdown) so the child gets the same signal semantics it would
|
||||
// receive without the agent in front of it.
|
||||
func (s *Supervisor) StopWithSignal(ctx context.Context, sig os.Signal) error {
|
||||
s.mu.Lock()
|
||||
cmd := s.cmd
|
||||
exited := s.exitedCh
|
||||
if cmd == nil || cmd.Process == nil {
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
s.stopped = true
|
||||
s.mu.Unlock()
|
||||
|
||||
sysSig, ok := sig.(syscall.Signal)
|
||||
if !ok {
|
||||
sysSig = syscall.SIGTERM
|
||||
}
|
||||
pgid, err := syscall.Getpgid(cmd.Process.Pid)
|
||||
if err == nil {
|
||||
_ = syscall.Kill(-pgid, sysSig)
|
||||
} else {
|
||||
_ = cmd.Process.Signal(sysSig)
|
||||
}
|
||||
|
||||
timer := time.NewTimer(s.stopGrace)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-exited:
|
||||
return nil
|
||||
case <-timer.C:
|
||||
// Last resort: SIGKILL the whole process group.
|
||||
if pgid, err := syscall.Getpgid(cmd.Process.Pid); err == nil {
|
||||
_ = syscall.Kill(-pgid, syscall.SIGKILL)
|
||||
} else {
|
||||
_ = cmd.Process.Kill()
|
||||
}
|
||||
<-exited
|
||||
return errors.New("proc: forced kill after grace period")
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Restart performs Stop+Start using the supervisor's default stop signal.
|
||||
// Different from a Docker-initiated shutdown: here it's an explicit "restart
|
||||
// this service" command, so we want the standard SIGTERM semantics.
|
||||
func (s *Supervisor) Restart(ctx context.Context) error {
|
||||
if err := s.Stop(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.Start()
|
||||
}
|
||||
|
||||
// IsRunning reports whether the supervised child is currently alive (started
|
||||
// and not yet exited or stopped).
|
||||
func (s *Supervisor) IsRunning() bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.stopped || s.cmd == nil || s.cmd.Process == nil {
|
||||
return false
|
||||
}
|
||||
// exitedCh is closed when the child exits. Non-blocking read.
|
||||
select {
|
||||
case <-s.exitedCh:
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// WaitStable blocks for `settle` and returns nil if the supervised child is
|
||||
// still running at the end, otherwise an error describing the exit. Used by
|
||||
// the `restart` command to give the operator real "did it come back up"
|
||||
// feedback instead of an immediate OK.
|
||||
func (s *Supervisor) WaitStable(ctx context.Context, settle time.Duration) error {
|
||||
s.mu.Lock()
|
||||
exited := s.exitedCh
|
||||
s.mu.Unlock()
|
||||
if exited == nil {
|
||||
return errors.New("proc: not running")
|
||||
}
|
||||
select {
|
||||
case <-exited:
|
||||
// Child died within the settle window.
|
||||
s.mu.Lock()
|
||||
cmd := s.cmd
|
||||
s.mu.Unlock()
|
||||
code := -1
|
||||
if cmd != nil && cmd.ProcessState != nil {
|
||||
code = cmd.ProcessState.ExitCode()
|
||||
}
|
||||
return fmt.Errorf("proc: child exited within settle window (code=%d)", code)
|
||||
case <-time.After(settle):
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Forward installs a signal forwarder: SIGINT/SIGTERM/SIGHUP/SIGUSR1/SIGUSR2
|
||||
// received by the agent are propagated to the child. Returns a cancel func
|
||||
// to release the handler.
|
||||
func (s *Supervisor) Forward(signals ...os.Signal) func() {
|
||||
ch := make(chan os.Signal, len(signals)+1)
|
||||
signalNotify(ch, signals...)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case sig := <-ch:
|
||||
s.mu.Lock()
|
||||
cmd := s.cmd
|
||||
s.mu.Unlock()
|
||||
if cmd != nil && cmd.Process != nil {
|
||||
_ = cmd.Process.Signal(sig)
|
||||
}
|
||||
if sig == syscall.SIGTERM || sig == syscall.SIGINT {
|
||||
// On terminal signals propagate and let main exit.
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
return func() {
|
||||
close(done)
|
||||
signalStop(ch)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
package proc
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
)
|
||||
|
||||
// Indirection so tests can stub these out if ever needed.
|
||||
var (
|
||||
signalNotify = signal.Notify
|
||||
signalStop = signal.Stop
|
||||
)
|
||||
|
||||
var _ = os.Stdout // anchor import for go vet
|
||||
@@ -0,0 +1,97 @@
|
||||
// Package registry publishes per-node heartbeats to Redis so the backend can
|
||||
// enumerate live containers. Two keys per service:
|
||||
//
|
||||
// ZSET mailcow.nodes.<service> score=unix_ts member=node_id
|
||||
// HASH mailcow.node.<service>.<node_id> { version, started_at, image, health* }
|
||||
//
|
||||
// Both keys have a 30s TTL refreshed every 10s. Deregister clears them on
|
||||
// graceful shutdown.
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// HealthSnapshotter returns the latest health probe result so the heartbeat
|
||||
// can attach it to each tick. Implemented by main.healthState.
|
||||
type HealthSnapshotter interface {
|
||||
Snapshot() (ok bool, detail string, at time.Time)
|
||||
}
|
||||
|
||||
// Heartbeat carries the metadata published with every refresh.
|
||||
type Heartbeat struct {
|
||||
Service string
|
||||
NodeID string
|
||||
Version string
|
||||
StartedAt time.Time
|
||||
Image string
|
||||
Health HealthSnapshotter // optional; nil → omit health fields
|
||||
}
|
||||
|
||||
func nodesKey(service string) string { return "mailcow.nodes." + service }
|
||||
func nodeKey(service, node string) string { return "mailcow.node." + service + "." + node }
|
||||
|
||||
// Publish writes one heartbeat tick. Callers run this in a loop.
|
||||
func Publish(ctx context.Context, rdb *redis.Client, h Heartbeat) error {
|
||||
now := time.Now().Unix()
|
||||
fields := map[string]any{
|
||||
"version": h.Version,
|
||||
"started_at": h.StartedAt.UTC().Format(time.RFC3339),
|
||||
"image": h.Image,
|
||||
"node_id": h.NodeID,
|
||||
"service": h.Service,
|
||||
"updated_at": strconv.FormatInt(now, 10),
|
||||
}
|
||||
if h.Health != nil {
|
||||
ok, detail, at := h.Health.Snapshot()
|
||||
if ok {
|
||||
fields["health"] = "ok"
|
||||
} else {
|
||||
fields["health"] = "fail"
|
||||
}
|
||||
fields["health_detail"] = detail
|
||||
fields["health_at"] = strconv.FormatInt(at.Unix(), 10)
|
||||
}
|
||||
pipe := rdb.Pipeline()
|
||||
pipe.ZAdd(ctx, nodesKey(h.Service), redis.Z{Score: float64(now), Member: h.NodeID})
|
||||
pipe.Expire(ctx, nodesKey(h.Service), 5*time.Minute)
|
||||
pipe.HSet(ctx, nodeKey(h.Service, h.NodeID), fields)
|
||||
pipe.Expire(ctx, nodeKey(h.Service, h.NodeID), 30*time.Second)
|
||||
_, err := pipe.Exec(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("registry: heartbeat exec: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Deregister removes the node from the ZSET and deletes its detail hash.
|
||||
// Called on graceful shutdown so the dashboard reflects intentional stops
|
||||
// immediately rather than waiting for TTL.
|
||||
func Deregister(ctx context.Context, rdb *redis.Client, service, nodeID string) error {
|
||||
pipe := rdb.Pipeline()
|
||||
pipe.ZRem(ctx, nodesKey(service), nodeID)
|
||||
pipe.Del(ctx, nodeKey(service, nodeID))
|
||||
_, err := pipe.Exec(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
// Loop runs Publish on a ticker until ctx is done. It is the typical caller.
|
||||
func Loop(ctx context.Context, rdb *redis.Client, h Heartbeat, interval time.Duration) {
|
||||
// Publish once immediately so the dashboard sees us right away.
|
||||
_ = Publish(ctx, rdb, h)
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
_ = Publish(ctx, rdb, h)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
package services
|
||||
|
||||
import "time"
|
||||
|
||||
// nowStamp returns a sortable timestamp used to suffix moved/garbage maildirs
|
||||
// so repeated cleanups don't collide.
|
||||
func nowStamp() string {
|
||||
return time.Now().UTC().Format("20060102T150405Z")
|
||||
}
|
||||
@@ -0,0 +1,294 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/commands"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/proc"
|
||||
)
|
||||
|
||||
func init() { Register("dovecot", buildDovecot) }
|
||||
|
||||
const vmailRoot = "/var/vmail"
|
||||
|
||||
func dovecotHealthProbe(ctx context.Context) error {
|
||||
// IMAP greeting on :143 — must be "* OK ..."
|
||||
conn, err := net.DialTimeout("tcp", "127.0.0.1:143", 3*time.Second)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
buf := make([]byte, 64)
|
||||
_ = conn.SetReadDeadline(time.Now().Add(3 * time.Second))
|
||||
n, err := conn.Read(buf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read greeting: %w", err)
|
||||
}
|
||||
greeting := string(buf[:n])
|
||||
if !strings.HasPrefix(greeting, "* OK") {
|
||||
return fmt.Errorf("unexpected greeting: %s", strings.TrimSpace(greeting))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildDovecot(sup *proc.Supervisor) *commands.Table {
|
||||
t := commands.New("dovecot")
|
||||
t.HealthProbe = dovecotHealthProbe
|
||||
|
||||
// `dovecot reload` re-reads config without restarting the master process.
|
||||
t.Register("reload", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "dovecot", "reload")
|
||||
return nil, asError(r, err)
|
||||
})
|
||||
addLifecycleExceptReload(t, sup)
|
||||
|
||||
t.Register("exec.fts-rescan", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
user := commands.ArgStringOpt(args, "user", "")
|
||||
argv := []string{"doveadm", "fts", "rescan"}
|
||||
if user != "" {
|
||||
argv = append(argv, "-u", user)
|
||||
} else {
|
||||
argv = append(argv, "-A")
|
||||
}
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, argv...)
|
||||
return nil, asError(r, err)
|
||||
})
|
||||
|
||||
t.Register("exec.sieve-list", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
user, err := commands.ArgString(args, "user")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "doveadm", "sieve", "list", "-u", user)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r.ExitCode != 0 {
|
||||
return nil, &runError{msg: strings.TrimSpace(r.Stderr)}
|
||||
}
|
||||
scripts := splitNonEmpty(r.Stdout)
|
||||
return map[string]any{"scripts": scripts}, nil
|
||||
})
|
||||
|
||||
t.Register("exec.sieve-print", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
user, err := commands.ArgString(args, "user")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
script, err := commands.ArgString(args, "script")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, err := commands.Run(ctx, commands.RunOptions{OutputCap: 1 << 20}, "doveadm", "sieve", "get", "-u", user, script)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r.ExitCode != 0 {
|
||||
return nil, &runError{msg: strings.TrimSpace(r.Stderr)}
|
||||
}
|
||||
return map[string]any{"body": r.Stdout}, nil
|
||||
})
|
||||
|
||||
t.Register("exec.acl-get", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
user, err := commands.ArgString(args, "user")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// First enumerate mailboxes, then collect ACLs per mailbox.
|
||||
boxes, err := commands.Run(ctx, commands.RunOptions{}, "doveadm", "mailbox", "list", "-u", user)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if boxes.ExitCode != 0 {
|
||||
return nil, &runError{msg: strings.TrimSpace(boxes.Stderr)}
|
||||
}
|
||||
out := []map[string]any{}
|
||||
for _, mbx := range splitNonEmpty(boxes.Stdout) {
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "doveadm", "acl", "get", "-u", user, mbx)
|
||||
if err != nil || r.ExitCode != 0 {
|
||||
continue
|
||||
}
|
||||
for _, line := range strings.Split(strings.TrimSpace(r.Stdout), "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" || strings.HasPrefix(line, "ID") {
|
||||
continue
|
||||
}
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) >= 2 {
|
||||
out = append(out, map[string]any{
|
||||
"mailbox": mbx,
|
||||
"identifier": fields[0],
|
||||
"rights": strings.Join(fields[1:], " "),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return map[string]any{"acls": out}, nil
|
||||
})
|
||||
|
||||
t.Register("exec.acl-set", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
user, err := commands.ArgString(args, "user")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mailbox, err := commands.ArgString(args, "mailbox")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
identifier, err := commands.ArgString(args, "identifier")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rights, err := commands.ArgString(args, "rights")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "doveadm", "acl", "set", "-u", user, mailbox, identifier, rights)
|
||||
return nil, asError(r, err)
|
||||
})
|
||||
|
||||
t.Register("exec.acl-delete", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
user, err := commands.ArgString(args, "user")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mailbox, err := commands.ArgString(args, "mailbox")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
identifier, err := commands.ArgString(args, "identifier")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "doveadm", "acl", "delete", "-u", user, mailbox, identifier)
|
||||
return nil, asError(r, err)
|
||||
})
|
||||
|
||||
t.Register("exec.maildir-cleanup", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
maildir, err := commands.ArgString(args, "maildir")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := assertSafeMaildirPath(maildir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
src := filepath.Join(vmailRoot, maildir)
|
||||
dst := filepath.Join(vmailRoot, "_garbage", maildir+"_"+nowStamp())
|
||||
if _, err := os.Stat(src); os.IsNotExist(err) {
|
||||
return nil, commands.ErrNotFound
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Dir(dst), 0o770); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, os.Rename(src, dst)
|
||||
})
|
||||
|
||||
t.Register("exec.df", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
dir := commands.ArgStringOpt(args, "dir", "/var/vmail")
|
||||
var st syscall.Statfs_t
|
||||
if err := syscall.Statfs(dir, &st); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
size := uint64(st.Blocks) * uint64(st.Bsize)
|
||||
free := uint64(st.Bavail) * uint64(st.Bsize)
|
||||
used := size - free
|
||||
pct := 0
|
||||
if size > 0 {
|
||||
pct = int(float64(used) / float64(size) * 100)
|
||||
}
|
||||
// Format: Filesystem,Size,Used,Avail,Use%,Mounted-on
|
||||
return fmt.Sprintf("%s,%s,%s,%s,%d%%,%s",
|
||||
"local", humanBytes(size), humanBytes(used), humanBytes(free), pct, dir), nil
|
||||
})
|
||||
|
||||
t.Register("exec.maildir-move", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
from, err := commands.ArgString(args, "from")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
to, err := commands.ArgString(args, "to")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := assertSafeMaildirPath(from); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := assertSafeMaildirPath(to); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
src := filepath.Join(vmailRoot, from)
|
||||
dst := filepath.Join(vmailRoot, to)
|
||||
if _, err := os.Stat(src); os.IsNotExist(err) {
|
||||
return nil, commands.ErrNotFound
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Dir(dst), 0o770); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, os.Rename(src, dst)
|
||||
})
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
// addLifecycleExceptReload wires restart/stop/start without overriding reload,
|
||||
// which postfix/dovecot define themselves (canonical CLI command).
|
||||
func addLifecycleExceptReload(t *commands.Table, sup *proc.Supervisor) {
|
||||
if sup == nil {
|
||||
return
|
||||
}
|
||||
t.Register("restart", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
return nil, sup.Restart(ctx)
|
||||
})
|
||||
t.Register("stop", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
return nil, sup.Stop(ctx)
|
||||
})
|
||||
t.Register("start", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
return nil, sup.Start()
|
||||
})
|
||||
}
|
||||
|
||||
func splitNonEmpty(s string) []string {
|
||||
out := []string{}
|
||||
for _, line := range strings.Split(strings.TrimSpace(s), "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
if line != "" {
|
||||
out = append(out, line)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// assertSafeMaildirPath blocks path traversal and absolute paths — relative
|
||||
// names under /var/vmail only.
|
||||
func assertSafeMaildirPath(p string) error {
|
||||
if p == "" || strings.HasPrefix(p, "/") || strings.Contains(p, "..") {
|
||||
return &validationErr{msg: "unsafe maildir path"}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type validationErr struct{ msg string }
|
||||
|
||||
func (e *validationErr) Error() string { return e.msg }
|
||||
func (e *validationErr) Is(target error) bool { return target == commands.ErrValidation }
|
||||
|
||||
// humanBytes renders a byte count in `df -H` style (1000-based units).
|
||||
func humanBytes(n uint64) string {
|
||||
const unit = 1000
|
||||
if n < unit {
|
||||
return fmt.Sprintf("%dB", n)
|
||||
}
|
||||
div, exp := uint64(unit), 0
|
||||
for x := n / unit; x >= unit; x /= unit {
|
||||
div *= unit
|
||||
exp++
|
||||
}
|
||||
return fmt.Sprintf("%.1f%c", float64(n)/float64(div), "KMGTPE"[exp])
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/commands"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/proc"
|
||||
)
|
||||
|
||||
// Services without any exec.* commands of their own — lifecycle only.
|
||||
func init() {
|
||||
Register("clamd", genericBuilder("clamd", tcpProbe("127.0.0.1:3310", 2*time.Second)))
|
||||
Register("olefy", genericBuilder("olefy", tcpProbe("127.0.0.1:10055", 2*time.Second)))
|
||||
Register("postfix-tlspol", genericBuilder("postfix-tlspol", tcpProbe("127.0.0.1:8642", 2*time.Second)))
|
||||
Register("php-fpm", genericBuilder("php-fpm", tcpProbe("127.0.0.1:9001", 2*time.Second)))
|
||||
Register("acme", genericBuilder("acme", nil))
|
||||
Register("watchdog", genericBuilder("watchdog", nil))
|
||||
Register("netfilter", genericBuilder("netfilter", nil))
|
||||
Register("ofelia", genericBuilder("ofelia", nil))
|
||||
Register("dovecot-fts", genericBuilder("dovecot-fts", nil))
|
||||
}
|
||||
|
||||
func genericBuilder(name string, probe commands.HealthProbe) Builder {
|
||||
return func(sup *proc.Supervisor) *commands.Table {
|
||||
t := commands.New(name)
|
||||
t.HealthProbe = probe
|
||||
addLifecycle(t, sup)
|
||||
return t
|
||||
}
|
||||
}
|
||||
|
||||
func tcpProbe(addr string, timeout time.Duration) commands.HealthProbe {
|
||||
return func(ctx context.Context) error {
|
||||
return probeTCP(addr, timeout)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,79 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/commands"
|
||||
)
|
||||
|
||||
// runError is what we return when a shell command exited non-zero but the
|
||||
// failure is not a "target not found" case. The bus maps it to
|
||||
// ErrCodeInternal.
|
||||
type runError struct{ msg string }
|
||||
|
||||
func (e *runError) Error() string { return e.msg }
|
||||
|
||||
// asError converts a (RunResult, err) pair from commands.Run into a single
|
||||
// error: pre-exec error → return as-is; non-zero exit → wrap stderr.
|
||||
func asError(r *commands.RunResult, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if r.ExitCode != 0 {
|
||||
msg := strings.TrimSpace(r.Stderr)
|
||||
if msg == "" {
|
||||
msg = "command exited " + itoa(r.ExitCode)
|
||||
}
|
||||
return &runError{msg: msg}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// asNotFoundOrError is the variant for queue/mailbox operations that may fail
|
||||
// because the target doesn't live on this node. Maps known stderr fragments
|
||||
// to commands.ErrNotFound so broadcast aggregation works.
|
||||
func asNotFoundOrError(r *commands.RunResult, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if r.ExitCode == 0 {
|
||||
return nil
|
||||
}
|
||||
if matchesAny(r.Stderr, notFoundFragments) {
|
||||
return commands.ErrNotFound
|
||||
}
|
||||
return &runError{msg: strings.TrimSpace(r.Stderr)}
|
||||
}
|
||||
|
||||
func matchesAny(haystack string, fragments []string) bool {
|
||||
for _, f := range fragments {
|
||||
if strings.Contains(haystack, f) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func itoa(i int) string {
|
||||
// avoid strconv import for a one-shot; small ints only
|
||||
if i == 0 {
|
||||
return "0"
|
||||
}
|
||||
neg := false
|
||||
if i < 0 {
|
||||
neg = true
|
||||
i = -i
|
||||
}
|
||||
var b [20]byte
|
||||
n := len(b)
|
||||
for i > 0 {
|
||||
n--
|
||||
b[n] = byte('0' + i%10)
|
||||
i /= 10
|
||||
}
|
||||
if neg {
|
||||
n--
|
||||
b[n] = '-'
|
||||
}
|
||||
return string(b[n:])
|
||||
}
|
||||
@@ -0,0 +1,236 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/commands"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/proc"
|
||||
)
|
||||
|
||||
func init() { Register("host", buildHost) }
|
||||
|
||||
// hostProcRoot is where the host-agent container mounts /proc. If we're not
|
||||
// running as host-agent, falling back to /proc still produces sensible numbers
|
||||
// (the container's own view) so dashboards don't blank out in unit tests.
|
||||
var hostProcRoot = "/host/proc"
|
||||
|
||||
func resolveProc(p string) string {
|
||||
if _, err := os.Stat(hostProcRoot); err == nil {
|
||||
return hostProcRoot + p
|
||||
}
|
||||
return "/proc" + p
|
||||
}
|
||||
|
||||
func buildHost(_ *proc.Supervisor) *commands.Table {
|
||||
t := commands.New("host")
|
||||
// No lifecycle — the host-agent container has no main process to manage.
|
||||
|
||||
t.Register("exec.df", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
path := commands.ArgStringOpt(args, "path", "/")
|
||||
var stat syscall.Statfs_t
|
||||
if err := syscall.Statfs(path, &stat); err != nil {
|
||||
return nil, fmt.Errorf("statfs %s: %w", path, err)
|
||||
}
|
||||
size := int64(stat.Blocks) * int64(stat.Bsize)
|
||||
free := int64(stat.Bavail) * int64(stat.Bsize)
|
||||
used := size - free
|
||||
return map[string]any{
|
||||
"path": path,
|
||||
"size": size,
|
||||
"used": used,
|
||||
"available": free,
|
||||
}, nil
|
||||
})
|
||||
|
||||
t.Register("exec.host-stats", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
return readHostStats()
|
||||
})
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
func readHostStats() (map[string]any, error) {
|
||||
out := map[string]any{
|
||||
"system_time": time.Now().Format("2006-01-02 15:04:05"),
|
||||
"architecture": readArchitecture(),
|
||||
}
|
||||
|
||||
if uptime, err := readUptime(); err == nil {
|
||||
out["uptime"] = int64(uptime)
|
||||
} else {
|
||||
out["uptime"] = int64(0)
|
||||
}
|
||||
|
||||
cores := readCPUCores()
|
||||
cpuUsage, _ := sampleHostCPU(500 * time.Millisecond)
|
||||
out["cpu"] = map[string]any{
|
||||
"cores": cores,
|
||||
"usage": cpuUsage,
|
||||
}
|
||||
|
||||
memTotal, memUsage := readMemoryTotalAndUsagePct()
|
||||
out["memory"] = map[string]any{
|
||||
"total": memTotal, // bytes
|
||||
"usage": memUsage, // percent 0..100
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// readArchitecture returns the host's machine architecture (e.g. "x86_64",
|
||||
// "aarch64"). Falls back to a single dash if syscall.Uname fails.
|
||||
func readArchitecture() string {
|
||||
var u syscall.Utsname
|
||||
if err := syscall.Uname(&u); err != nil {
|
||||
return "-"
|
||||
}
|
||||
return charsToString(u.Machine[:])
|
||||
}
|
||||
|
||||
func charsToString(b []int8) string {
|
||||
out := make([]byte, 0, len(b))
|
||||
for _, c := range b {
|
||||
if c == 0 {
|
||||
break
|
||||
}
|
||||
out = append(out, byte(c))
|
||||
}
|
||||
return string(out)
|
||||
}
|
||||
|
||||
// readCPUCores counts `^processor` lines in /proc/cpuinfo. On a container
|
||||
// with /host/proc bind-mounted this gives the host's logical CPU count,
|
||||
// not the container's cgroup limits.
|
||||
func readCPUCores() int {
|
||||
f, err := os.Open(resolveProc("/cpuinfo"))
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
defer f.Close()
|
||||
n := 0
|
||||
sc := bufio.NewScanner(f)
|
||||
for sc.Scan() {
|
||||
if strings.HasPrefix(sc.Text(), "processor") {
|
||||
n++
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// readMemoryTotalAndUsagePct reads /proc/meminfo and returns (total_bytes,
|
||||
// usage_pct_0_100). "Usage" is computed as (Total - Available)/Total which
|
||||
// matches what tools like `free` show as "used".
|
||||
func readMemoryTotalAndUsagePct() (int64, int) {
|
||||
f, err := os.Open(resolveProc("/meminfo"))
|
||||
if err != nil {
|
||||
return 0, 0
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var total, available int64
|
||||
sc := bufio.NewScanner(f)
|
||||
for sc.Scan() {
|
||||
fields := strings.Fields(sc.Text())
|
||||
if len(fields) < 2 {
|
||||
continue
|
||||
}
|
||||
switch fields[0] {
|
||||
case "MemTotal:":
|
||||
total = parseInt64(fields[1]) * 1024
|
||||
case "MemAvailable:":
|
||||
available = parseInt64(fields[1]) * 1024
|
||||
}
|
||||
}
|
||||
if total <= 0 {
|
||||
return 0, 0
|
||||
}
|
||||
used := total - available
|
||||
if available <= 0 {
|
||||
used = total
|
||||
}
|
||||
pct := int(float64(used) / float64(total) * 100.0)
|
||||
if pct < 0 {
|
||||
pct = 0
|
||||
}
|
||||
if pct > 100 {
|
||||
pct = 100
|
||||
}
|
||||
return total, pct
|
||||
}
|
||||
|
||||
func readUptime() (float64, error) {
|
||||
b, err := os.ReadFile(resolveProc("/uptime"))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
fields := strings.Fields(string(b))
|
||||
if len(fields) < 1 {
|
||||
return 0, fmt.Errorf("malformed uptime")
|
||||
}
|
||||
return strconv.ParseFloat(fields[0], 64)
|
||||
}
|
||||
|
||||
// sampleHostCPU returns CPU utilization (0..100) sampled over `window`.
|
||||
func sampleHostCPU(window time.Duration) (float64, error) {
|
||||
a, err := readCPULine()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
time.Sleep(window)
|
||||
b, err := readCPULine()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
totalA, totalB := sum(a), sum(b)
|
||||
idleA, idleB := a[3], b[3]
|
||||
dTotal, dIdle := totalB-totalA, idleB-idleA
|
||||
if dTotal == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
return 100.0 * float64(dTotal-dIdle) / float64(dTotal), nil
|
||||
}
|
||||
|
||||
func readCPULine() ([]int64, error) {
|
||||
f, err := os.Open(resolveProc("/stat"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
sc := bufio.NewScanner(f)
|
||||
if !sc.Scan() {
|
||||
return nil, fmt.Errorf("empty /proc/stat")
|
||||
}
|
||||
fields := strings.Fields(sc.Text())
|
||||
if len(fields) < 5 || fields[0] != "cpu" {
|
||||
return nil, fmt.Errorf("unexpected /proc/stat first line")
|
||||
}
|
||||
out := make([]int64, 0, len(fields)-1)
|
||||
for _, f := range fields[1:] {
|
||||
n, err := strconv.ParseInt(f, 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, n)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func sum(xs []int64) int64 {
|
||||
var s int64
|
||||
for _, x := range xs {
|
||||
s += x
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func parseInt64(s string) int64 {
|
||||
n, _ := strconv.ParseInt(s, 10, 64)
|
||||
return n
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/commands"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/proc"
|
||||
)
|
||||
|
||||
func init() { Register("nginx", buildNginx) }
|
||||
|
||||
func nginxHealthProbe(ctx context.Context) error {
|
||||
if err := probeShell(ctx, 3*time.Second, "nginx", "-t"); err != nil {
|
||||
return err
|
||||
}
|
||||
return probeTCP("127.0.0.1:8081", 2*time.Second)
|
||||
}
|
||||
|
||||
func buildNginx(sup *proc.Supervisor) *commands.Table {
|
||||
t := commands.New("nginx")
|
||||
t.HealthProbe = nginxHealthProbe
|
||||
t.Register("reload", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "nginx", "-s", "reload")
|
||||
return nil, asError(r, err)
|
||||
})
|
||||
addLifecycleExceptReload(t, sup)
|
||||
t.Register("exec.test-config", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "nginx", "-t")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return map[string]any{
|
||||
"ok": r.ExitCode == 0,
|
||||
"output": r.Stderr + r.Stdout,
|
||||
}, nil
|
||||
})
|
||||
return t
|
||||
}
|
||||
@@ -0,0 +1,142 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/commands"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/proc"
|
||||
)
|
||||
|
||||
func init() { Register("postfix", buildPostfix) }
|
||||
|
||||
// notFoundFragments are substrings emitted by postsuper/postqueue when the
|
||||
// requested queue id doesn't live on this node. Broadcast handlers map them
|
||||
// to commands.ErrNotFound so the backend can count partial success.
|
||||
var notFoundFragments = []string{
|
||||
"No such file or directory",
|
||||
"no such file",
|
||||
"unknown",
|
||||
}
|
||||
|
||||
func postfixHealthProbe(ctx context.Context) error {
|
||||
if err := probeSMTPGreeting("127.0.0.1:25", 3*time.Second); err != nil {
|
||||
return err
|
||||
}
|
||||
return probeShell(ctx, 5*time.Second, "postfix", "status")
|
||||
}
|
||||
|
||||
func buildPostfix(sup *proc.Supervisor) *commands.Table {
|
||||
t := commands.New("postfix")
|
||||
t.HealthProbe = postfixHealthProbe
|
||||
|
||||
// Override generic reload — `postfix reload` is the canonical operation,
|
||||
// not SIGHUP-to-supervisord (which would just rotate logs).
|
||||
t.Register("reload", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "postfix", "reload")
|
||||
return nil, asError(r, err)
|
||||
})
|
||||
// Lifecycle: stop/start/restart still go through the supervisor.
|
||||
if sup != nil {
|
||||
t.Register("restart", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
return nil, sup.Restart(ctx)
|
||||
})
|
||||
t.Register("stop", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
return nil, sup.Stop(ctx)
|
||||
})
|
||||
t.Register("start", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
return nil, sup.Start()
|
||||
})
|
||||
}
|
||||
|
||||
t.Register("exec.mailq", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
r, err := commands.Run(ctx, commands.RunOptions{OutputCap: 8 << 20}, "postqueue", "-j")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r.ExitCode != 0 {
|
||||
return nil, &runError{msg: "postqueue failed: " + r.Stderr}
|
||||
}
|
||||
// postqueue -j prints one JSON object per line.
|
||||
entries := make([]map[string]any, 0)
|
||||
for _, line := range strings.Split(strings.TrimSpace(r.Stdout), "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
var obj map[string]any
|
||||
if err := json.Unmarshal([]byte(line), &obj); err == nil {
|
||||
entries = append(entries, obj)
|
||||
}
|
||||
}
|
||||
return map[string]any{"queue": entries}, nil
|
||||
})
|
||||
|
||||
t.Register("exec.flush-queue", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "postqueue", "-f")
|
||||
return nil, asError(r, err)
|
||||
})
|
||||
|
||||
t.Register("exec.delete-from-queue", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
qid, err := commands.ArgString(args, "queue_id")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "postsuper", "-d", qid)
|
||||
return nil, asNotFoundOrError(r, err)
|
||||
})
|
||||
|
||||
t.Register("exec.hold-queue", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
qid, err := commands.ArgString(args, "queue_id")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "postsuper", "-h", qid)
|
||||
return nil, asNotFoundOrError(r, err)
|
||||
})
|
||||
|
||||
t.Register("exec.unhold-queue", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
qid, err := commands.ArgString(args, "queue_id")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "postsuper", "-H", qid)
|
||||
return nil, asNotFoundOrError(r, err)
|
||||
})
|
||||
|
||||
t.Register("exec.deliver-now", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
qid, err := commands.ArgString(args, "queue_id")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "postqueue", "-i", qid)
|
||||
return nil, asNotFoundOrError(r, err)
|
||||
})
|
||||
|
||||
t.Register("exec.cat-queue", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
qid, err := commands.ArgString(args, "queue_id")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, err := commands.Run(ctx, commands.RunOptions{OutputCap: 2 << 20}, "postcat", "-q", qid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r.ExitCode != 0 {
|
||||
if matchesAny(r.Stderr, notFoundFragments) {
|
||||
return nil, commands.ErrNotFound
|
||||
}
|
||||
return nil, &runError{msg: "postcat failed: " + r.Stderr}
|
||||
}
|
||||
return map[string]any{"body": r.Stdout}, nil
|
||||
})
|
||||
|
||||
t.Register("exec.super-delete", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "postsuper", "-d", "ALL")
|
||||
return nil, asError(r, err)
|
||||
})
|
||||
|
||||
return t
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/commands"
|
||||
)
|
||||
|
||||
// probeTCP opens a TCP connection to addr within timeout. Returns nil if the
|
||||
// port accepts a connection, otherwise the dial error.
|
||||
func probeTCP(addr string, timeout time.Duration) error {
|
||||
conn, err := net.DialTimeout("tcp", addr, timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_ = conn.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// probeSMTPGreeting connects to addr and reads the SMTP greeting line. The
|
||||
// service is considered healthy if the line starts with "220".
|
||||
func probeSMTPGreeting(addr string, timeout time.Duration) error {
|
||||
conn, err := net.DialTimeout("tcp", addr, timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
_ = conn.SetReadDeadline(time.Now().Add(timeout))
|
||||
line, err := bufio.NewReader(conn).ReadString('\n')
|
||||
if err != nil {
|
||||
return fmt.Errorf("read greeting: %w", err)
|
||||
}
|
||||
if !strings.HasPrefix(line, "220") {
|
||||
return fmt.Errorf("unexpected greeting: %s", strings.TrimSpace(line))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// probeHTTP issues a GET to url, checks for a 2xx status.
|
||||
func probeHTTP(ctx context.Context, url string, timeout time.Duration) error {
|
||||
cctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
req, err := http.NewRequestWithContext(cctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return fmt.Errorf("http %s", resp.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// probeShell runs argv with a timeout and returns nil if exit code is 0.
|
||||
func probeShell(ctx context.Context, timeout time.Duration, argv ...string) error {
|
||||
cctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
r, err := commands.Run(cctx, commands.RunOptions{}, argv...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if r.ExitCode != 0 {
|
||||
msg := strings.TrimSpace(r.Stderr)
|
||||
if msg == "" {
|
||||
msg = fmt.Sprintf("exit %d", r.ExitCode)
|
||||
}
|
||||
return errors.New(msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,86 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/commands"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/proc"
|
||||
)
|
||||
|
||||
func init() { Register("rspamd", buildRspamd) }
|
||||
|
||||
func rspamdHealthProbe(ctx context.Context) error {
|
||||
return probeHTTP(ctx, "http://127.0.0.1:11334/ping", 3*time.Second)
|
||||
}
|
||||
|
||||
// Override file rspamd reads on startup for the controller's enable_password.
|
||||
const rspamdWorkerPasswordPath = "/etc/rspamd/override.d/worker-controller-password.inc"
|
||||
|
||||
func buildRspamd(sup *proc.Supervisor) *commands.Table {
|
||||
t := commands.New("rspamd")
|
||||
t.HealthProbe = rspamdHealthProbe
|
||||
addLifecycle(t, sup)
|
||||
|
||||
t.Register("exec.set-worker-password", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
password, err := commands.ArgString(args, "password")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// rspamadm pw -e -p <pw> writes the hashed value to stdout.
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "rspamadm", "pw", "-e", "-p", password)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r.ExitCode != 0 {
|
||||
return nil, &runError{msg: "rspamadm pw failed: " + strings.TrimSpace(r.Stderr)}
|
||||
}
|
||||
hash := strings.TrimSpace(r.Stdout)
|
||||
// rspamd distinguishes `password` (read-only access to the controller)
|
||||
// from `enable_password` (write access — restart, settings, learn).
|
||||
content := "enable_password = \"" + hash + "\";\n"
|
||||
if err := os.MkdirAll(filepath.Dir(rspamdWorkerPasswordPath), 0o755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := os.WriteFile(rspamdWorkerPasswordPath, []byte(content), 0o644); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Must do a full re-fork of workers (SIGHUP to rspamd master), not
|
||||
// `rspamadm control reload`
|
||||
if sup != nil {
|
||||
return nil, sup.Reload()
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
|
||||
t.Register("exec.relearn-spam", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
path, err := commands.ArgString(args, "file")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, err := commands.Run(ctx, commands.RunOptions{Stdin: data}, "rspamc", "learn_spam")
|
||||
return nil, asError(r, err)
|
||||
})
|
||||
|
||||
t.Register("exec.relearn-ham", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
path, err := commands.ArgString(args, "file")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, err := commands.Run(ctx, commands.RunOptions{Stdin: data}, "rspamc", "learn_ham")
|
||||
return nil, asError(r, err)
|
||||
})
|
||||
|
||||
return t
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
// Package services registers per-service command tables. The agent selects
|
||||
// the right table at startup via MAILCOW_AGENT_SERVICE.
|
||||
//
|
||||
// A service "builder" receives a Supervisor for lifecycle commands; services
|
||||
// that don't supervise a main process (currently just "host") pass nil and
|
||||
// the generic lifecycle commands are skipped.
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/commands"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/proc"
|
||||
)
|
||||
|
||||
// Builder constructs a command table for a service. sup may be nil for
|
||||
// services without a supervised main process.
|
||||
type Builder func(sup *proc.Supervisor) *commands.Table
|
||||
|
||||
var registry = map[string]Builder{}
|
||||
|
||||
// Register installs a builder for a service name. Called from init() in each
|
||||
// per-service file.
|
||||
func Register(service string, b Builder) {
|
||||
if _, dup := registry[service]; dup {
|
||||
panic("services: duplicate registration for " + service)
|
||||
}
|
||||
registry[service] = b
|
||||
}
|
||||
|
||||
// Build returns the table for service, or an error if no builder exists.
|
||||
func Build(service string, sup *proc.Supervisor) (*commands.Table, error) {
|
||||
b, ok := registry[service]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("services: unknown service %q (set MAILCOW_AGENT_SERVICE correctly)", service)
|
||||
}
|
||||
return b(sup), nil
|
||||
}
|
||||
|
||||
// Known returns the list of registered service names (sorted-ish, depends on
|
||||
// map iteration — for help output only).
|
||||
func Known() []string {
|
||||
out := make([]string, 0, len(registry))
|
||||
for k := range registry {
|
||||
out = append(out, k)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// restartSettle is how long we wait after a Start to verify the new child
|
||||
// didn't immediately crash. Gives the operator real "did the service come
|
||||
// back up?" feedback instead of an instant OK that hides flapping services.
|
||||
const restartSettle = 3 * time.Second
|
||||
|
||||
// addLifecycle wires reload/restart/stop/start onto t backed by sup. Services
|
||||
// override these (e.g. postfix overrides reload to run `postfix reload`).
|
||||
func addLifecycle(t *commands.Table, sup *proc.Supervisor) {
|
||||
if sup == nil {
|
||||
return
|
||||
}
|
||||
t.Register("reload", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
return nil, sup.Reload()
|
||||
})
|
||||
t.Register("restart", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
if err := sup.Restart(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := sup.WaitStable(ctx, restartSettle); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return map[string]any{"status": "restarted", "settled_ms": int(restartSettle / time.Millisecond)}, nil
|
||||
})
|
||||
t.Register("stop", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
return nil, sup.Stop(ctx)
|
||||
})
|
||||
t.Register("start", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
if err := sup.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := sup.WaitStable(ctx, restartSettle); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return map[string]any{"status": "started", "settled_ms": int(restartSettle / time.Millisecond)}, nil
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/commands"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/proc"
|
||||
)
|
||||
|
||||
func init() { Register("sogo", buildSogo) }
|
||||
|
||||
func sogoHealthProbe(ctx context.Context) error {
|
||||
return probeHTTP(ctx, "http://127.0.0.1:20000/SOGo.index/", 3*time.Second)
|
||||
}
|
||||
|
||||
func buildSogo(sup *proc.Supervisor) *commands.Table {
|
||||
t := commands.New("sogo")
|
||||
t.HealthProbe = sogoHealthProbe
|
||||
addLifecycle(t, sup)
|
||||
|
||||
t.Register("exec.rename-user", func(ctx context.Context, args map[string]any) (any, error) {
|
||||
oldName, err := commands.ArgString(args, "old")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newName, err := commands.ArgString(args, "new")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "sogo-tool", "rename-user", oldName, newName)
|
||||
return nil, asError(r, err)
|
||||
})
|
||||
|
||||
return t
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/commands"
|
||||
"github.com/mailcow/mailcow-dockerized/agent/internal/proc"
|
||||
)
|
||||
|
||||
func init() { Register("unbound", buildUnbound) }
|
||||
|
||||
func unboundHealthProbe(ctx context.Context) error {
|
||||
return probeShell(ctx, 3*time.Second, "dig", "+time=2", "+tries=1", "@127.0.0.1", "mailcow.email", "A")
|
||||
}
|
||||
|
||||
func buildUnbound(sup *proc.Supervisor) *commands.Table {
|
||||
t := commands.New("unbound")
|
||||
t.HealthProbe = unboundHealthProbe
|
||||
addLifecycle(t, sup)
|
||||
t.Register("exec.flush-cache", func(ctx context.Context, _ map[string]any) (any, error) {
|
||||
r, err := commands.Run(ctx, commands.RunOptions{}, "unbound-control", "flush_zone", ".")
|
||||
return nil, asError(r, err)
|
||||
})
|
||||
return t
|
||||
}
|
||||
@@ -0,0 +1,155 @@
|
||||
// Package stats reads cgroup CPU + memory usage and publishes them to
|
||||
//
|
||||
// HASH mailcow.stats.<service>.<node_id>
|
||||
//
|
||||
// with a 30s TTL. Supports both cgroup v1 and v2. The numbers are intentionally
|
||||
// approximate — they replace what dockerapi exposed via /containers/<id>/stats.
|
||||
package stats
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// Sample is one observation. CPUPercent is the share of one host CPU consumed
|
||||
// since the previous sample (range 0..100*numCPU).
|
||||
type Sample struct {
|
||||
CPUPercent float64
|
||||
MemoryBytes int64
|
||||
MemoryLimit int64
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
func statsKey(service, node string) string { return "mailcow.stats." + service + "." + node }
|
||||
|
||||
// Publisher reads cgroup metrics and pushes them to Redis on a ticker.
|
||||
type Publisher struct {
|
||||
rdb *redis.Client
|
||||
service string
|
||||
node string
|
||||
|
||||
// previous CPU sample to derive a delta-based percent
|
||||
prevCPUNanos int64
|
||||
prevAt time.Time
|
||||
}
|
||||
|
||||
// NewPublisher constructs a publisher. Caller drives it via Run.
|
||||
func NewPublisher(rdb *redis.Client, service, node string) *Publisher {
|
||||
return &Publisher{rdb: rdb, service: service, node: node}
|
||||
}
|
||||
|
||||
// Run blocks on a ticker until ctx is done.
|
||||
func (p *Publisher) Run(ctx context.Context, interval time.Duration) {
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
// Prime the CPU sample so the first publish has a real delta.
|
||||
if cpu, ok := readCPUNanos(); ok {
|
||||
p.prevCPUNanos = cpu
|
||||
p.prevAt = time.Now()
|
||||
}
|
||||
// Immediate first publish so the dashboard never sees a node without a
|
||||
// stats hash. CPU is 0 in this first sample (no prev delta yet); memory
|
||||
// is already accurate.
|
||||
_ = p.publish(ctx, p.sample())
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
_ = p.publish(ctx, p.sample())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Publisher) sample() Sample {
|
||||
s := Sample{Timestamp: time.Now()}
|
||||
if cpu, ok := readCPUNanos(); ok {
|
||||
if !p.prevAt.IsZero() {
|
||||
dCPU := cpu - p.prevCPUNanos
|
||||
dT := s.Timestamp.Sub(p.prevAt).Nanoseconds()
|
||||
if dT > 0 && dCPU >= 0 {
|
||||
s.CPUPercent = (float64(dCPU) / float64(dT)) * 100.0
|
||||
}
|
||||
}
|
||||
p.prevCPUNanos = cpu
|
||||
p.prevAt = s.Timestamp
|
||||
}
|
||||
if mem, limit, ok := readMemory(); ok {
|
||||
s.MemoryBytes = mem
|
||||
s.MemoryLimit = limit
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (p *Publisher) publish(ctx context.Context, s Sample) error {
|
||||
pipe := p.rdb.Pipeline()
|
||||
pipe.HSet(ctx, statsKey(p.service, p.node), map[string]any{
|
||||
"cpu_percent": strconv.FormatFloat(s.CPUPercent, 'f', 2, 64),
|
||||
"memory_bytes": s.MemoryBytes,
|
||||
"memory_limit": s.MemoryLimit,
|
||||
"timestamp": s.Timestamp.Unix(),
|
||||
"node_id": p.node,
|
||||
"service": p.service,
|
||||
})
|
||||
pipe.Expire(ctx, statsKey(p.service, p.node), 30*time.Second)
|
||||
_, err := pipe.Exec(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
// --- cgroup readers --------------------------------------------------------
|
||||
|
||||
// readCPUNanos returns total CPU-nanoseconds consumed by the current cgroup,
|
||||
// summed across all CPUs. Works for both cgroup v2 (cpu.stat) and v1
|
||||
// (cpuacct.usage).
|
||||
func readCPUNanos() (int64, bool) {
|
||||
if data, err := os.ReadFile("/sys/fs/cgroup/cpu.stat"); err == nil {
|
||||
// v2: lines like "usage_usec 12345"
|
||||
for _, line := range strings.Split(string(data), "\n") {
|
||||
if strings.HasPrefix(line, "usage_usec ") {
|
||||
n, err := strconv.ParseInt(strings.TrimPrefix(line, "usage_usec "), 10, 64)
|
||||
if err == nil {
|
||||
return n * 1000, true // µs → ns
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if data, err := os.ReadFile("/sys/fs/cgroup/cpuacct/cpuacct.usage"); err == nil {
|
||||
n, err := strconv.ParseInt(strings.TrimSpace(string(data)), 10, 64)
|
||||
if err == nil {
|
||||
return n, true
|
||||
}
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// readMemory returns current usage and limit in bytes.
|
||||
func readMemory() (int64, int64, bool) {
|
||||
// v2
|
||||
if cur, err := readInt("/sys/fs/cgroup/memory.current"); err == nil {
|
||||
limit, _ := readInt("/sys/fs/cgroup/memory.max")
|
||||
return cur, limit, true
|
||||
}
|
||||
// v1
|
||||
if cur, err := readInt("/sys/fs/cgroup/memory/memory.usage_in_bytes"); err == nil {
|
||||
limit, _ := readInt("/sys/fs/cgroup/memory/memory.limit_in_bytes")
|
||||
return cur, limit, true
|
||||
}
|
||||
return 0, 0, false
|
||||
}
|
||||
|
||||
func readInt(path string) (int64, error) {
|
||||
b, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
s := strings.TrimSpace(string(b))
|
||||
if s == "max" {
|
||||
return -1, nil
|
||||
}
|
||||
return strconv.ParseInt(s, 10, 64)
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
#!/bin/sh
|
||||
# mailcow-agent-cli — publish a control-bus command from inside a service
|
||||
# container, optionally collecting one reply. Same wire protocol as the Go
|
||||
# agent (see internal/envelope/envelope.go).
|
||||
#
|
||||
# Usage:
|
||||
# mailcow-agent-cli send <service> <cmd> [json-args]
|
||||
# Fire-and-forget. Prints the number of subscribers reached.
|
||||
# mailcow-agent-cli call <service> <cmd> [json-args] [timeout-seconds]
|
||||
# Publish + wait for one reply on its private reply list. Prints the
|
||||
# reply envelope JSON on stdout.
|
||||
#
|
||||
# Requires the `redis-cli` binary to be present in the calling container.
|
||||
|
||||
set -e
|
||||
|
||||
op="${1:-}"
|
||||
svc="${2:-}"
|
||||
cmd="${3:-}"
|
||||
args="${4:-{\}}"
|
||||
tmo="${5:-10}"
|
||||
|
||||
if [ -z "$op" ] || [ -z "$svc" ] || [ -z "$cmd" ]; then
|
||||
echo "usage: $0 send|call <service> <cmd> [json-args] [timeout-seconds]" >&2
|
||||
exit 2
|
||||
fi
|
||||
|
||||
redis_host="${REDIS_SLAVEOF_IP:-redis-mailcow}"
|
||||
redis_port="${REDIS_SLAVEOF_PORT:-6379}"
|
||||
|
||||
rcli() {
|
||||
if [ -n "${REDISPASS:-}" ]; then
|
||||
redis-cli -h "$redis_host" -p "$redis_port" -a "$REDISPASS" --no-auth-warning "$@"
|
||||
else
|
||||
redis-cli -h "$redis_host" -p "$redis_port" "$@"
|
||||
fi
|
||||
}
|
||||
|
||||
rid="$(date +%s%N)$$"
|
||||
issued_by="$(hostname 2>/dev/null || echo unknown)"
|
||||
|
||||
case "$op" in
|
||||
send)
|
||||
payload="{\"cmd\":\"${cmd}\",\"request_id\":\"${rid}\",\"args\":${args},\"issued_by\":\"${issued_by}\"}"
|
||||
rcli PUBLISH "mailcow.control.${svc}" "$payload"
|
||||
;;
|
||||
call)
|
||||
reply="mailcow.reply.${rid}"
|
||||
payload="{\"cmd\":\"${cmd}\",\"request_id\":\"${rid}\",\"args\":${args},\"reply_to\":\"${reply}\",\"issued_by\":\"${issued_by}\"}"
|
||||
rcli PUBLISH "mailcow.control.${svc}" "$payload" >/dev/null
|
||||
# BLPOP returns two lines: the list name then the value. Print only the value.
|
||||
rcli BLPOP "$reply" "$tmo" 2>/dev/null | tail -n1
|
||||
;;
|
||||
*)
|
||||
echo "usage: $0 send|call <service> <cmd> [json-args] [timeout-seconds]" >&2
|
||||
exit 2
|
||||
;;
|
||||
esac
|
||||
Reference in New Issue
Block a user