- No longer unexpected `Unhandled Rejections` during P2P operations (waiting acceptance).
CLI new features
- P2P sync has been implemented.
This commit is contained in:
vorotamoroz
2026-03-14 15:08:31 +09:00
parent bf93bddbdd
commit dfe13b1abd
23 changed files with 1373 additions and 24 deletions

View File

@@ -0,0 +1,149 @@
import type { LiveSyncBaseCore } from "../../../LiveSyncBaseCore";
import { P2P_DEFAULT_SETTINGS, SETTING_KEY_P2P_DEVICE_NAME, type EntryDoc } from "@lib/common/types";
import type { ServiceContext } from "@lib/services/base/ServiceBase";
import { TrysteroReplicator } from "@lib/replication/trystero/TrysteroReplicator";
type CLIP2PPeer = {
peerId: string;
name: string;
};
function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
export function parseTimeoutSeconds(value: string, commandName: string): number {
const timeoutSec = Number(value);
if (!Number.isFinite(timeoutSec) || timeoutSec < 0) {
throw new Error(`${commandName} requires a non-negative timeout in seconds`);
}
return timeoutSec;
}
function validateP2PSettings(core: LiveSyncBaseCore<ServiceContext, any>) {
const settings = core.services.setting.currentSettings();
if (!settings.P2P_Enabled) {
throw new Error("P2P is disabled in settings (P2P_Enabled=false)");
}
if (!settings.P2P_AppID) {
settings.P2P_AppID = P2P_DEFAULT_SETTINGS.P2P_AppID;
}
// CLI mode is non-interactive.
settings.P2P_IsHeadless = true;
}
async function createReplicator(core: LiveSyncBaseCore<ServiceContext, any>): Promise<TrysteroReplicator> {
validateP2PSettings(core);
const getSettings = () => core.services.setting.currentSettings();
const getDB = () => core.services.database.localDatabase.localDatabase;
const getSimpleStore = () => core.services.keyValueDB.openSimpleStore("p2p-sync");
const getDeviceName = () =>
core.services.config.getSmallConfig(SETTING_KEY_P2P_DEVICE_NAME) || core.services.vault.getVaultName();
const env = {
get settings() {
return getSettings();
},
get db() {
return getDB();
},
get simpleStore() {
return getSimpleStore();
},
get deviceName() {
return getDeviceName();
},
get platform() {
return core.services.API.getPlatform();
},
get confirm() {
return core.services.API.confirm;
},
processReplicatedDocs: async (docs: EntryDoc[]) => {
await core.services.replication.parseSynchroniseResult(docs as any);
},
};
return new TrysteroReplicator(env as any);
}
function getSortedPeers(replicator: TrysteroReplicator): CLIP2PPeer[] {
return [...replicator.knownAdvertisements]
.map((peer) => ({ peerId: peer.peerId, name: peer.name }))
.sort((a, b) => a.peerId.localeCompare(b.peerId));
}
export async function collectPeers(
core: LiveSyncBaseCore<ServiceContext, any>,
timeoutSec: number
): Promise<CLIP2PPeer[]> {
const replicator = await createReplicator(core);
await replicator.open();
try {
await delay(timeoutSec * 1000);
return getSortedPeers(replicator);
} finally {
await replicator.close();
}
}
function resolvePeer(peers: CLIP2PPeer[], peerToken: string): CLIP2PPeer | undefined {
const byId = peers.find((peer) => peer.peerId === peerToken);
if (byId) {
return byId;
}
const byName = peers.filter((peer) => peer.name === peerToken);
if (byName.length > 1) {
throw new Error(`Multiple peers matched by name '${peerToken}'. Use peer-id instead.`);
}
if (byName.length === 1) {
return byName[0];
}
return undefined;
}
export async function syncWithPeer(
core: LiveSyncBaseCore<ServiceContext, any>,
peerToken: string,
timeoutSec: number
): Promise<CLIP2PPeer> {
const replicator = await createReplicator(core);
await replicator.open();
try {
const timeoutMs = timeoutSec * 1000;
const start = Date.now();
let targetPeer: CLIP2PPeer | undefined;
while (Date.now() - start <= timeoutMs) {
const peers = getSortedPeers(replicator);
targetPeer = resolvePeer(peers, peerToken);
if (targetPeer) {
break;
}
await delay(200);
}
if (!targetPeer) {
throw new Error(`Peer '${peerToken}' was not found within ${timeoutSec} seconds`);
}
const pullResult = await replicator.replicateFrom(targetPeer.peerId, false);
if (pullResult && "error" in pullResult && pullResult.error) {
throw pullResult.error;
}
const pushResult = (await replicator.requestSynchroniseToPeer(targetPeer.peerId)) as any;
if (!pushResult || pushResult.ok !== true) {
throw pushResult?.error ?? new Error("P2P sync failed while requesting remote sync");
}
return targetPeer;
} finally {
await replicator.close();
}
}
export async function openP2PHost(core: LiveSyncBaseCore<ServiceContext, any>): Promise<TrysteroReplicator> {
const replicator = await createReplicator(core);
await replicator.open();
return replicator;
}

View File

@@ -0,0 +1,18 @@
import { describe, expect, it } from "vitest";
import { parseTimeoutSeconds } from "./p2p";
describe("p2p command helpers", () => {
it("accepts non-negative timeout", () => {
expect(parseTimeoutSeconds("0", "p2p-peers")).toBe(0);
expect(parseTimeoutSeconds("2.5", "p2p-sync")).toBe(2.5);
});
it("rejects invalid timeout values", () => {
expect(() => parseTimeoutSeconds("-1", "p2p-peers")).toThrow(
"p2p-peers requires a non-negative timeout in seconds"
);
expect(() => parseTimeoutSeconds("abc", "p2p-sync")).toThrow(
"p2p-sync requires a non-negative timeout in seconds"
);
});
});

View File

@@ -6,6 +6,7 @@ import { DEFAULT_SETTINGS, type FilePathWithPrefix, type ObsidianLiveSyncSetting
import { stripAllPrefixes } from "@lib/string_and_binary/path";
import type { CLICommandContext, CLIOptions } from "./types";
import { promptForPassphrase, readStdinAsUtf8, toArrayBuffer, toVaultRelativePath } from "./utils";
import { collectPeers, openP2PHost, parseTimeoutSeconds, syncWithPeer } from "./p2p";
import { performFullScan } from "@lib/serviceFeatures/offlineScanner";
import { UnresolvedErrorManager } from "@lib/services/base/UnresolvedErrorManager";
@@ -23,6 +24,42 @@ export async function runCommand(options: CLIOptions, context: CLICommandContext
return !!result;
}
if (options.command === "p2p-peers") {
if (options.commandArgs.length < 1) {
throw new Error("p2p-peers requires one argument: <timeout>");
}
const timeoutSec = parseTimeoutSeconds(options.commandArgs[0], "p2p-peers");
console.error(`[Command] p2p-peers timeout=${timeoutSec}s`);
const peers = await collectPeers(core as any, timeoutSec);
if (peers.length > 0) {
process.stdout.write(peers.map((peer) => `[peer]\t${peer.peerId}\t${peer.name}`).join("\n") + "\n");
}
return true;
}
if (options.command === "p2p-sync") {
if (options.commandArgs.length < 2) {
throw new Error("p2p-sync requires two arguments: <peer> <timeout>");
}
const peerToken = options.commandArgs[0].trim();
if (!peerToken) {
throw new Error("p2p-sync requires a non-empty <peer>");
}
const timeoutSec = parseTimeoutSeconds(options.commandArgs[1], "p2p-sync");
console.error(`[Command] p2p-sync peer=${peerToken} timeout=${timeoutSec}s`);
const peer = await syncWithPeer(core as any, peerToken, timeoutSec);
console.error(`[Done] P2P sync completed with ${peer.name} (${peer.peerId})`);
return true;
}
if (options.command === "p2p-host") {
console.error("[Command] p2p-host");
await openP2PHost(core as any);
console.error("[Ready] P2P host is running. Press Ctrl+C to stop.");
await new Promise(() => {});
return true;
}
if (options.command === "push") {
if (options.commandArgs.length < 2) {
throw new Error("push requires two arguments: <src> <dst>");

View File

@@ -4,6 +4,9 @@ import { ServiceContext } from "@lib/services/base/ServiceBase";
export type CLICommand =
| "daemon"
| "sync"
| "p2p-peers"
| "p2p-sync"
| "p2p-host"
| "push"
| "pull"
| "pull-rev"
@@ -36,6 +39,9 @@ export interface CLICommandContext {
export const VALID_COMMANDS = new Set([
"sync",
"p2p-peers",
"p2p-sync",
"p2p-host",
"push",
"pull",
"pull-rev",