test: add port ready, container cleanup

This commit is contained in:
vorotamoroz
2026-05-22 03:19:48 +00:00
parent cc3c992b1d
commit 3693d6a6b6
4 changed files with 171 additions and 22 deletions

View File

@@ -123,7 +123,9 @@ async function runNodeCommand(args: string[], stdinData?: Uint8Array): Promise<C
}).spawn();
if (TEE_ENABLED) {
Deno.stdout.writeSync(new TextEncoder().encode(`[CLI tee pid=${child.pid}] process: ${formatTeeCommand(cliArgs)}\n`));
Deno.stdout.writeSync(
new TextEncoder().encode(`[CLI tee pid=${child.pid}] process: ${formatTeeCommand(cliArgs)}\n`)
);
}
const stdoutPromise = collectStream(

View File

@@ -14,6 +14,10 @@ type DockerInvoker = {
let dockerInvokerPromise: Promise<DockerInvoker> | null = null;
const DOCKER_TEE = Deno.env.get("LIVESYNC_DOCKER_TEE") === "1" || Deno.env.get("LIVESYNC_TEST_TEE") === "1";
const trackedContainers = new Set<string>();
const CLEANUP_SIGNALS: Deno.Signal[] = ["SIGINT", "SIGTERM"];
let signalCleanupHandlersInstalled = false;
let signalCleanupInProgress = false;
// ---------------------------------------------------------------------------
// Low-level docker wrapper
@@ -183,6 +187,55 @@ async function dockerOrFail(...args: string[]): Promise<string> {
return r.stdout;
}
async function stopAndRemoveContainer(container: string): Promise<void> {
await docker("stop", container).catch(() => {});
await docker("rm", container).catch(() => {});
}
async function cleanupTrackedContainers(reason: string): Promise<void> {
const names = [...trackedContainers];
if (names.length === 0) return;
console.warn(`[WARN] cleaning up tracked containers on ${reason}: ${names.join(", ")}`);
for (const container of names.reverse()) {
await stopAndRemoveContainer(container);
trackedContainers.delete(container);
}
}
async function handleSignalCleanup(signal: Deno.Signal): Promise<void> {
if (signalCleanupInProgress) return;
signalCleanupInProgress = true;
try {
await cleanupTrackedContainers(`signal ${signal}`);
} finally {
Deno.exit(signal === "SIGINT" ? 130 : 143);
}
}
function ensureSignalCleanupHandlers(): void {
if (signalCleanupHandlersInstalled) return;
signalCleanupHandlersInstalled = true;
for (const signal of CLEANUP_SIGNALS) {
try {
Deno.addSignalListener(signal, () => {
void handleSignalCleanup(signal);
});
} catch {
// Unsupported signal on this platform.
}
}
}
function trackContainer(container: string): void {
ensureSignalCleanupHandlers();
trackedContainers.add(container);
}
function untrackContainer(container: string): void {
trackedContainers.delete(container);
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
@@ -259,8 +312,8 @@ const MINIO_IMAGE = "minio/minio";
const MINIO_MC_IMAGE = "minio/mc";
export async function stopCouchdb(): Promise<void> {
await docker("stop", COUCHDB_CONTAINER);
await docker("rm", COUCHDB_CONTAINER);
await stopAndRemoveContainer(COUCHDB_CONTAINER);
untrackContainer(COUCHDB_CONTAINER);
}
/**
@@ -289,6 +342,7 @@ export async function startCouchdb(couchdbUri: string, user: string, password: s
"COUCHDB_SINGLE_NODE=y",
COUCHDB_IMAGE
);
trackContainer(COUCHDB_CONTAINER);
console.log("[INFO] initialising CouchDB");
await initCouchdb(couchdbUri, user, password);
@@ -389,8 +443,8 @@ function shQuote(value: string): string {
}
export async function stopMinio(): Promise<void> {
await docker("stop", MINIO_CONTAINER);
await docker("rm", MINIO_CONTAINER);
await stopAndRemoveContainer(MINIO_CONTAINER);
untrackContainer(MINIO_CONTAINER);
}
async function initMinioBucket(
@@ -470,6 +524,7 @@ export async function startMinio(
"--console-address",
":9001"
);
trackContainer(MINIO_CONTAINER);
console.log(`[INFO] initialising MinIO test bucket: ${bucket}`);
let initialised = false;
@@ -517,8 +572,8 @@ EOF
exec /app/strfry --config /tmp/strfry.conf relay`;
export async function stopP2pRelay(): Promise<void> {
await docker("stop", P2P_RELAY_CONTAINER);
await docker("rm", P2P_RELAY_CONTAINER);
await stopAndRemoveContainer(P2P_RELAY_CONTAINER);
untrackContainer(P2P_RELAY_CONTAINER);
}
/**
@@ -547,6 +602,7 @@ export async function startP2pRelay(): Promise<void> {
"-lc",
STRFRY_BOOTSTRAP_SH
);
trackContainer(P2P_RELAY_CONTAINER);
}
export function isLocalP2pRelay(relayUrl: string): boolean {

View File

@@ -0,0 +1,49 @@
type WaitForPortOptions = {
timeoutMs?: number;
intervalMs?: number;
connectTimeoutMs?: number;
};
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function connectWithTimeout(hostname: string, port: number, timeoutMs: number): Promise<void> {
let timer: number | undefined;
try {
const connPromise = Deno.connect({ hostname, port });
const timeoutPromise = new Promise<never>((_, reject) => {
timer = setTimeout(() => reject(new Error(`connect timeout after ${timeoutMs}ms`)), timeoutMs);
});
const conn = await Promise.race([connPromise, timeoutPromise]);
conn.close();
} finally {
if (timer !== undefined) {
clearTimeout(timer);
}
}
}
export async function waitForPort(hostname: string, port: number, options: WaitForPortOptions = {}): Promise<void> {
const timeoutMs = options.timeoutMs ?? 15000;
const intervalMs = options.intervalMs ?? 250;
const connectTimeoutMs = options.connectTimeoutMs ?? 1000;
const started = Date.now();
let lastError: unknown;
while (Date.now() - started < timeoutMs) {
try {
await connectWithTimeout(hostname, port, connectTimeoutMs);
return;
} catch (error) {
lastError = error;
await sleep(intervalMs);
}
}
throw new Error(
`Port ${hostname}:${port} did not become ready within ${timeoutMs}ms` +
(lastError ? ` (last error: ${String(lastError)})` : "")
);
}

View File

@@ -1,11 +1,26 @@
import { runCli } from "./cli.ts";
import { isLocalP2pRelay, startP2pRelay, stopP2pRelay } from "./docker.ts";
import { waitForPort } from "./net.ts";
export type PeerEntry = {
id: string;
name: string;
};
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function parseRelayEndpoint(relay: string): { hostname: string; port: number } {
const url = new URL(relay);
const port = url.port ? Number(url.port) : url.protocol === "ws:" ? 80 : url.protocol === "wss:" ? 443 : NaN;
if (!Number.isFinite(port)) {
throw new Error(`Unsupported relay URL: ${relay}`);
}
const hostname = url.hostname === "localhost" ? "127.0.0.1" : url.hostname;
return { hostname, port };
}
export function parsePeerLines(output: string): PeerEntry[] {
return output
.split(/\r?\n/)
@@ -20,28 +35,55 @@ export async function discoverPeer(
timeoutSeconds: number,
targetPeer?: string
): Promise<PeerEntry> {
const result = await runCli(vaultDir, "--settings", settingsFile, "p2p-peers", String(timeoutSeconds));
if (result.code !== 0) {
throw new Error(`p2p-peers failed\n${result.combined}`);
}
const peers = parsePeerLines(result.stdout);
if (targetPeer) {
const matched = peers.find((peer) => peer.id === targetPeer || peer.name === targetPeer);
if (matched) return matched;
}
if (peers.length === 0) {
const fallback = result.combined.match(/Advertisement from\s+([^\s]+)/);
if (fallback?.[1]) {
return { id: fallback[1], name: fallback[1] };
const retries = Math.max(0, Number(Deno.env.get("LIVESYNC_P2P_PEERS_RETRY") ?? "3"));
let lastCombined = "";
for (let attempt = 0; attempt <= retries; attempt++) {
const result = await runCli(vaultDir, "--settings", settingsFile, "p2p-peers", String(timeoutSeconds));
lastCombined = result.combined;
if (result.code === 0) {
const peers = parsePeerLines(result.stdout);
if (targetPeer) {
const matched = peers.find((peer) => peer.id === targetPeer || peer.name === targetPeer);
if (matched) return matched;
}
if (peers.length > 0) {
return peers[0];
}
const fallback = result.combined.match(/Advertisement from\s+([^\s]+)/);
if (fallback?.[1]) {
return { id: fallback[1], name: fallback[1] };
}
}
throw new Error(`No peers discovered\n${result.combined}`);
if (attempt < retries) {
const waitMs = 400 * (attempt + 1);
console.warn(
`[WARN] p2p-peers returned no usable peers, retrying (${attempt + 1}/${retries}) in ${waitMs}ms`
);
await sleep(waitMs);
continue;
}
throw new Error(
result.code !== 0 ? `p2p-peers failed\n${result.combined}` : `No peers discovered\n${result.combined}`
);
}
return peers[0];
throw new Error(`No peers discovered\n${lastCombined}`);
}
export async function maybeStartLocalRelay(relay: string): Promise<boolean> {
if (!isLocalP2pRelay(relay)) return false;
await startP2pRelay();
const endpoint = parseRelayEndpoint(relay);
await waitForPort(endpoint.hostname, endpoint.port, {
timeoutMs: Number(Deno.env.get("LIVESYNC_P2P_RELAY_READY_TIMEOUT_MS") ?? "15000"),
intervalMs: Number(Deno.env.get("LIVESYNC_P2P_RELAY_READY_INTERVAL_MS") ?? "250"),
connectTimeoutMs: Number(Deno.env.get("LIVESYNC_P2P_RELAY_CONNECT_TIMEOUT_MS") ?? "1000"),
});
return true;
}