mirror of
https://github.com/vrtmrz/obsidian-livesync.git
synced 2026-05-23 07:41:34 +00:00
cli: implement daemon startup sequence and CouchDB→local sync
- Add daemon command to help text and --interval/-i flag for polling mode - Capture original sync settings before suspendAllSync() clobbers them - Implement daemon startup: mirror scan → restore settings → applySettings() which triggers the full suspend/resume lifecycle and starts the _changes feed - Guard processSynchroniseResult no-op to non-daemon commands so default handler writes incoming CouchDB changes to the local filesystem - Polling mode: restore settings + clearInterval-safe try/catch error handling - Warn when both liveSync and syncOnStart are false after restore (no-op config) - Fix: only block indefinitely if daemon startup succeeded
This commit is contained in:
312
src/apps/cli/commands/daemonCommand.unit.spec.ts
Normal file
312
src/apps/cli/commands/daemonCommand.unit.spec.ts
Normal file
@@ -0,0 +1,312 @@
|
||||
import { describe, expect, it, vi, beforeEach, afterEach } from "vitest";
|
||||
import { runCommand } from "./runCommand";
|
||||
import type { CLIOptions } from "./types";
|
||||
|
||||
// Mock performFullScan so daemon tests don't require a real CouchDB connection.
|
||||
vi.mock("@lib/serviceFeatures/offlineScanner", () => ({
|
||||
performFullScan: vi.fn(async () => true),
|
||||
}));
|
||||
|
||||
// Mock UnresolvedErrorManager to avoid event-hub side effects.
|
||||
vi.mock("@lib/services/base/UnresolvedErrorManager", () => ({
|
||||
UnresolvedErrorManager: class UnresolvedErrorManager {
|
||||
showError() {}
|
||||
clearError() {}
|
||||
clearErrors() {}
|
||||
},
|
||||
}));
|
||||
|
||||
import * as offlineScanner from "@lib/serviceFeatures/offlineScanner";
|
||||
|
||||
function createCoreMock() {
|
||||
return {
|
||||
services: {
|
||||
control: {
|
||||
activated: Promise.resolve(),
|
||||
applySettings: vi.fn(async () => {}),
|
||||
},
|
||||
setting: {
|
||||
applyPartial: vi.fn(async () => {}),
|
||||
currentSettings: vi.fn(() => ({ liveSync: true, syncOnStart: false })),
|
||||
},
|
||||
replication: {
|
||||
replicate: vi.fn(async () => true),
|
||||
},
|
||||
appLifecycle: {
|
||||
onUnload: {
|
||||
addHandler: vi.fn(),
|
||||
},
|
||||
},
|
||||
},
|
||||
serviceModules: {
|
||||
fileHandler: {
|
||||
dbToStorage: vi.fn(async () => true),
|
||||
storeFileToDB: vi.fn(async () => true),
|
||||
},
|
||||
storageAccess: {
|
||||
readFileAuto: vi.fn(async () => ""),
|
||||
writeFileAuto: vi.fn(async () => {}),
|
||||
},
|
||||
databaseFileAccess: {
|
||||
fetch: vi.fn(async () => undefined),
|
||||
},
|
||||
},
|
||||
} as any;
|
||||
}
|
||||
|
||||
function makeDaemonOptions(interval?: number): CLIOptions {
|
||||
return {
|
||||
command: "daemon",
|
||||
commandArgs: [],
|
||||
databasePath: "/tmp/vault",
|
||||
verbose: false,
|
||||
force: false,
|
||||
interval,
|
||||
};
|
||||
}
|
||||
|
||||
const baseContext = {
|
||||
vaultPath: "/tmp/vault",
|
||||
settingsPath: "/tmp/vault/.livesync/settings.json",
|
||||
originalSyncSettings: {
|
||||
liveSync: true,
|
||||
syncOnStart: false,
|
||||
periodicReplication: false,
|
||||
syncOnSave: false,
|
||||
syncOnEditorSave: false,
|
||||
syncOnFileOpen: false,
|
||||
syncAfterMerge: false,
|
||||
},
|
||||
} as any;
|
||||
|
||||
describe("daemon command", () => {
|
||||
beforeEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
vi.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("calls performFullScan during startup", async () => {
|
||||
const core = createCoreMock();
|
||||
vi.mocked(offlineScanner.performFullScan).mockResolvedValue(true);
|
||||
|
||||
await runCommand(makeDaemonOptions(), { ...baseContext, core });
|
||||
|
||||
expect(offlineScanner.performFullScan).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("returns false when performFullScan fails", async () => {
|
||||
const core = createCoreMock();
|
||||
vi.mocked(offlineScanner.performFullScan).mockResolvedValue(false);
|
||||
|
||||
const result = await runCommand(makeDaemonOptions(), { ...baseContext, core });
|
||||
|
||||
expect(result).toBe(false);
|
||||
});
|
||||
|
||||
it("polling mode: calls setTimeout when interval option is set", async () => {
|
||||
const core = createCoreMock();
|
||||
vi.mocked(offlineScanner.performFullScan).mockResolvedValue(true);
|
||||
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
|
||||
|
||||
await runCommand(makeDaemonOptions(30), { ...baseContext, core });
|
||||
|
||||
expect(setTimeoutSpy).toHaveBeenCalledTimes(1);
|
||||
// Interval should be in milliseconds (30s → 30000ms)
|
||||
expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), 30000);
|
||||
});
|
||||
|
||||
it("polling mode: applies settings with suspendFileWatching=false before setting interval", async () => {
|
||||
const core = createCoreMock();
|
||||
vi.mocked(offlineScanner.performFullScan).mockResolvedValue(true);
|
||||
|
||||
await runCommand(makeDaemonOptions(10), { ...baseContext, core });
|
||||
|
||||
expect(core.services.setting.applyPartial).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ suspendFileWatching: false }),
|
||||
true
|
||||
);
|
||||
expect(core.services.control.applySettings).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("liveSync mode: calls applyPartial and applySettings", async () => {
|
||||
const core = createCoreMock();
|
||||
vi.mocked(offlineScanner.performFullScan).mockResolvedValue(true);
|
||||
|
||||
await runCommand(makeDaemonOptions(), { ...baseContext, core });
|
||||
|
||||
expect(core.services.setting.applyPartial).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
...baseContext.originalSyncSettings,
|
||||
suspendFileWatching: false,
|
||||
}),
|
||||
true
|
||||
);
|
||||
expect(core.services.control.applySettings).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("liveSync mode: logs warning when both liveSync and syncOnStart are false", async () => {
|
||||
const core = createCoreMock();
|
||||
core.services.setting.currentSettings = vi.fn(() => ({
|
||||
liveSync: false,
|
||||
syncOnStart: false,
|
||||
}));
|
||||
vi.mocked(offlineScanner.performFullScan).mockResolvedValue(true);
|
||||
const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
|
||||
|
||||
const result = await runCommand(makeDaemonOptions(), { ...baseContext, core });
|
||||
|
||||
expect(result).toBe(true);
|
||||
const warningCalls = consoleSpy.mock.calls.filter(
|
||||
(args) => typeof args[0] === "string" && args[0].includes("liveSync and syncOnStart are both disabled")
|
||||
);
|
||||
expect(warningCalls.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it("liveSync mode: no warning when liveSync is true", async () => {
|
||||
const core = createCoreMock();
|
||||
core.services.setting.currentSettings = vi.fn(() => ({
|
||||
liveSync: true,
|
||||
syncOnStart: false,
|
||||
}));
|
||||
vi.mocked(offlineScanner.performFullScan).mockResolvedValue(true);
|
||||
const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
|
||||
|
||||
await runCommand(makeDaemonOptions(), { ...baseContext, core });
|
||||
|
||||
const warningCalls = consoleSpy.mock.calls.filter(
|
||||
(args) => typeof args[0] === "string" && args[0].includes("liveSync and syncOnStart are both disabled")
|
||||
);
|
||||
expect(warningCalls.length).toBe(0);
|
||||
});
|
||||
|
||||
it("calls replicate before performFullScan", async () => {
|
||||
const core = createCoreMock();
|
||||
const callOrder: string[] = [];
|
||||
core.services.replication.replicate = vi.fn(async () => {
|
||||
callOrder.push("replicate");
|
||||
return true;
|
||||
});
|
||||
vi.mocked(offlineScanner.performFullScan).mockImplementation(async () => {
|
||||
callOrder.push("performFullScan");
|
||||
return true;
|
||||
});
|
||||
|
||||
await runCommand(makeDaemonOptions(), { ...baseContext, core });
|
||||
|
||||
expect(callOrder).toEqual(["replicate", "performFullScan"]);
|
||||
});
|
||||
|
||||
it("returns false when initial replication fails", async () => {
|
||||
const core = createCoreMock();
|
||||
core.services.replication.replicate = vi.fn(async () => false);
|
||||
vi.mocked(offlineScanner.performFullScan).mockClear();
|
||||
|
||||
const result = await runCommand(makeDaemonOptions(), { ...baseContext, core });
|
||||
|
||||
expect(result).toBe(false);
|
||||
// performFullScan should NOT have been called
|
||||
expect(offlineScanner.performFullScan).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("polling mode: registers onUnload handler that clears timeout", async () => {
|
||||
const core = createCoreMock();
|
||||
vi.mocked(offlineScanner.performFullScan).mockResolvedValue(true);
|
||||
|
||||
await runCommand(makeDaemonOptions(10), { ...baseContext, core });
|
||||
|
||||
// onUnload handler should have been registered
|
||||
expect(core.services.appLifecycle.onUnload.addHandler).toHaveBeenCalledTimes(1);
|
||||
const handler = core.services.appLifecycle.onUnload.addHandler.mock.calls[0][0];
|
||||
|
||||
// Get the timeout ID that was created
|
||||
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout");
|
||||
await handler();
|
||||
expect(clearTimeoutSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("polling backoff: interval escalates on failure, caps at 300000ms, then halves on recovery", async () => {
|
||||
const core = createCoreMock();
|
||||
vi.mocked(offlineScanner.performFullScan).mockResolvedValue(true);
|
||||
vi.spyOn(console, "error").mockImplementation(() => {});
|
||||
|
||||
// startup replicate (call 1) succeeds; poll calls 2–7 fail; call 8 succeeds.
|
||||
let callCount = 0;
|
||||
core.services.replication.replicate = vi.fn(async () => {
|
||||
callCount++;
|
||||
if (callCount === 1) return true; // initial startup replicate
|
||||
if (callCount <= 7) throw new Error("network failure");
|
||||
return true; // recovery
|
||||
});
|
||||
|
||||
const baseMs = 30 * 1000;
|
||||
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
|
||||
|
||||
await runCommand(makeDaemonOptions(30), { ...baseContext, core });
|
||||
|
||||
// After runCommand returns the first setTimeout has been scheduled.
|
||||
// setTimeoutSpy.mock.calls[0] is the initial schedule (baseMs).
|
||||
expect(setTimeoutSpy.mock.calls[0][1]).toBe(baseMs);
|
||||
|
||||
// Advance through 6 failure polls. After each failure the next setTimeout
|
||||
// should be scheduled with a larger (or capped) interval.
|
||||
// formula: min(base * 2^n, 300000). base=30000ms.
|
||||
// failure 1: 30000*2=60000, failure 2: 30000*4=120000,
|
||||
// failure 3: 30000*8=240000, failure 4: 30000*16=480000→capped, 5→cap, 6→cap
|
||||
const expectedIntervals = [
|
||||
baseMs * 2, // after failure 1: 60000
|
||||
baseMs * 4, // after failure 2: 120000
|
||||
baseMs * 8, // after failure 3: 240000
|
||||
300_000, // after failure 4 (would be 480000, capped)
|
||||
300_000, // after failure 5 (cap)
|
||||
300_000, // after failure 6 (cap)
|
||||
];
|
||||
|
||||
for (const expected of expectedIntervals) {
|
||||
const prevCallCount = setTimeoutSpy.mock.calls.length;
|
||||
await vi.advanceTimersByTimeAsync(setTimeoutSpy.mock.calls[prevCallCount - 1][1] as number);
|
||||
const newCallCount = setTimeoutSpy.mock.calls.length;
|
||||
expect(newCallCount).toBeGreaterThan(prevCallCount);
|
||||
expect(setTimeoutSpy.mock.calls[newCallCount - 1][1]).toBe(expected);
|
||||
}
|
||||
|
||||
// Now trigger the success poll — interval should halve each time toward base.
|
||||
// After failure 6, consecutiveFailures=6, currentIntervalMs=300000.
|
||||
// On success: consecutiveFailures=5, currentIntervalMs=150000.
|
||||
const prevCallCount = setTimeoutSpy.mock.calls.length;
|
||||
await vi.advanceTimersByTimeAsync(setTimeoutSpy.mock.calls[prevCallCount - 1][1] as number);
|
||||
const afterSuccessCallCount = setTimeoutSpy.mock.calls.length;
|
||||
expect(afterSuccessCallCount).toBeGreaterThan(prevCallCount);
|
||||
// The interval after one success should be halved (300000 / 2 = 150000).
|
||||
expect(setTimeoutSpy.mock.calls[afterSuccessCallCount - 1][1]).toBe(150_000);
|
||||
});
|
||||
|
||||
it("polling error handling: replicate rejection is caught and console.error is called", async () => {
|
||||
const core = createCoreMock();
|
||||
vi.mocked(offlineScanner.performFullScan).mockResolvedValue(true);
|
||||
const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
|
||||
|
||||
// Make replicate succeed on the initial call (startup), then fail on the poll.
|
||||
let callCount = 0;
|
||||
core.services.replication.replicate = vi.fn(async () => {
|
||||
callCount++;
|
||||
if (callCount === 1) return true; // startup replicate
|
||||
throw new Error("network failure");
|
||||
});
|
||||
|
||||
const intervalMs = 30 * 1000;
|
||||
await runCommand(makeDaemonOptions(30), { ...baseContext, core });
|
||||
|
||||
// Advance time to trigger the first poll callback and flush its async work.
|
||||
await vi.advanceTimersByTimeAsync(intervalMs);
|
||||
|
||||
// No unhandled rejection — the error was caught internally.
|
||||
const errorCalls = consoleSpy.mock.calls.filter(
|
||||
(args) => typeof args[0] === "string" && args[0].includes("Poll error")
|
||||
);
|
||||
expect(errorCalls.length).toBeGreaterThan(0);
|
||||
});
|
||||
});
|
||||
@@ -15,6 +15,96 @@ export async function runCommand(options: CLIOptions, context: CLICommandContext
|
||||
|
||||
await core.services.control.activated;
|
||||
if (options.command === "daemon") {
|
||||
const log = (msg: unknown) => console.error(`[Daemon] ${msg}`);
|
||||
|
||||
// Skip the config mismatch dialog — the daemon cannot resolve it interactively
|
||||
// and the default "Dismiss" action would block replication. The daemon should
|
||||
// accept whatever configuration the remote has.
|
||||
await core.services.setting.applyPartial({ disableCheckingConfigMismatch: true }, true);
|
||||
|
||||
// 1. Replicate CouchDB → local PouchDB so the mirror scan has content to work with.
|
||||
log("Replicating from CouchDB...");
|
||||
const replResult = await core.services.replication.replicate(true);
|
||||
if (!replResult) {
|
||||
console.error("[Daemon] Initial CouchDB replication failed, cannot continue");
|
||||
return false;
|
||||
}
|
||||
log("CouchDB replication complete");
|
||||
|
||||
// 2. Mirror scan to reconcile PouchDB ↔ local filesystem.
|
||||
const errorManager = new UnresolvedErrorManager(core.services.appLifecycle);
|
||||
log("Running mirror scan...");
|
||||
const scanOk = await performFullScan(core as any, log, errorManager, false, true);
|
||||
if (!scanOk) {
|
||||
console.error("[Daemon] Mirror scan failed, cannot continue");
|
||||
return false;
|
||||
}
|
||||
log("Mirror scan complete");
|
||||
|
||||
// 3. Re-enable sync.
|
||||
const restoreSyncSettings = async () => {
|
||||
await core.services.setting.applyPartial({
|
||||
...context.originalSyncSettings,
|
||||
suspendFileWatching: false,
|
||||
}, true);
|
||||
// applySettings fires the full lifecycle: onSuspending → onResumed.
|
||||
// ModuleReplicatorCouchDB starts continuous replication on onResumed
|
||||
// via fireAndForget.
|
||||
await core.services.control.applySettings();
|
||||
// Lifecycle events (onSuspending) may re-enable suspension flags.
|
||||
// Clear them explicitly after the lifecycle completes. applyPartial
|
||||
// with true is a direct store write — it does not re-trigger lifecycle.
|
||||
await core.services.setting.applyPartial({
|
||||
suspendFileWatching: false,
|
||||
suspendParseReplicationResult: false,
|
||||
}, true);
|
||||
};
|
||||
if (options.interval) {
|
||||
log(`Polling mode: syncing every ${options.interval}s`);
|
||||
await restoreSyncSettings();
|
||||
const baseIntervalMs = options.interval * 1000;
|
||||
let currentIntervalMs = baseIntervalMs;
|
||||
let consecutiveFailures = 0;
|
||||
const maxIntervalMs = 5 * 60 * 1000; // 5 minutes cap
|
||||
|
||||
const poll = async () => {
|
||||
try {
|
||||
await core.services.replication.replicate(true);
|
||||
if (consecutiveFailures > 0) {
|
||||
consecutiveFailures--;
|
||||
currentIntervalMs = Math.max(currentIntervalMs / 2, baseIntervalMs);
|
||||
log(`Replication recovered`);
|
||||
}
|
||||
} catch (err) {
|
||||
consecutiveFailures++;
|
||||
currentIntervalMs = Math.min(baseIntervalMs * Math.pow(2, consecutiveFailures), maxIntervalMs);
|
||||
console.error(`[Daemon] Poll error (${consecutiveFailures} consecutive):`, err);
|
||||
if (consecutiveFailures >= 5) {
|
||||
console.error(`[Daemon] Warning: ${consecutiveFailures} consecutive failures, backing off to ${Math.round(currentIntervalMs / 1000)}s`);
|
||||
}
|
||||
}
|
||||
pollTimer = setTimeout(poll, currentIntervalMs);
|
||||
};
|
||||
let pollTimer: ReturnType<typeof setTimeout> = setTimeout(poll, currentIntervalMs);
|
||||
core.services.appLifecycle.onUnload.addHandler(async () => {
|
||||
clearTimeout(pollTimer);
|
||||
return true;
|
||||
});
|
||||
} else {
|
||||
log("LiveSync mode: restoring sync settings and starting _changes feed");
|
||||
await restoreSyncSettings();
|
||||
// The applySettings() lifecycle fires onResumed → ModuleReplicatorCouchDB which
|
||||
// starts continuous replication via fireAndForget(openReplication). Don't call
|
||||
// openReplication directly — it races with the handler and causes dedup/termination.
|
||||
log("LiveSync active");
|
||||
const currentSettings = core.services.setting.currentSettings();
|
||||
if (!currentSettings.liveSync && !currentSettings.syncOnStart) {
|
||||
console.error("[Daemon] Warning: liveSync and syncOnStart are both disabled in settings. " +
|
||||
"No sync will occur. Set liveSync=true in your settings file for continuous sync, " +
|
||||
"or use --interval for polling mode.");
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { LiveSyncBaseCore } from "../../../LiveSyncBaseCore";
|
||||
import { ServiceContext } from "@lib/services/base/ServiceBase";
|
||||
import type { ObsidianLiveSyncSettings } from "@lib/common/types";
|
||||
|
||||
export type CLICommand =
|
||||
| "daemon"
|
||||
@@ -29,15 +30,18 @@ export interface CLIOptions {
|
||||
force?: boolean;
|
||||
command: CLICommand;
|
||||
commandArgs: string[];
|
||||
interval?: number;
|
||||
}
|
||||
|
||||
export interface CLICommandContext {
|
||||
databasePath: string;
|
||||
core: LiveSyncBaseCore<ServiceContext, any>;
|
||||
settingsPath: string;
|
||||
originalSyncSettings: Pick<ObsidianLiveSyncSettings, "liveSync" | "syncOnStart" | "periodicReplication" | "syncOnSave" | "syncOnEditorSave" | "syncOnFileOpen" | "syncAfterMerge">;
|
||||
}
|
||||
|
||||
export const VALID_COMMANDS = new Set([
|
||||
"daemon",
|
||||
"sync",
|
||||
"p2p-peers",
|
||||
"p2p-sync",
|
||||
|
||||
Reference in New Issue
Block a user