Refactor: Refactor P2P Replicator

This commit is contained in:
vorotamoroz
2026-03-15 03:33:03 +09:00
parent 33338506cf
commit 653cf8dfbe
8 changed files with 101 additions and 307 deletions

View File

@@ -1,7 +1,7 @@
import type { LiveSyncBaseCore } from "../../../LiveSyncBaseCore";
import { P2P_DEFAULT_SETTINGS, SETTING_KEY_P2P_DEVICE_NAME, type EntryDoc } from "@lib/common/types";
import { P2P_DEFAULT_SETTINGS } from "@lib/common/types";
import type { ServiceContext } from "@lib/services/base/ServiceBase";
import { TrysteroReplicator } from "@lib/replication/trystero/TrysteroReplicator";
import { LiveSyncTrysteroReplicator } from "@lib/replication/trystero/LiveSyncTrysteroReplicator";
type CLIP2PPeer = {
peerId: string;
@@ -32,42 +32,12 @@ function validateP2PSettings(core: LiveSyncBaseCore<ServiceContext, any>) {
settings.P2P_IsHeadless = true;
}
async function createReplicator(core: LiveSyncBaseCore<ServiceContext, any>): Promise<TrysteroReplicator> {
function createReplicator(core: LiveSyncBaseCore<ServiceContext, any>): LiveSyncTrysteroReplicator {
validateP2PSettings(core);
const getSettings = () => core.services.setting.currentSettings();
const getDB = () => core.services.database.localDatabase.localDatabase;
const getSimpleStore = () => core.services.keyValueDB.openSimpleStore("p2p-sync");
const getDeviceName = () =>
core.services.config.getSmallConfig(SETTING_KEY_P2P_DEVICE_NAME) || core.services.vault.getVaultName();
const env = {
get settings() {
return getSettings();
},
get db() {
return getDB();
},
get simpleStore() {
return getSimpleStore();
},
get deviceName() {
return getDeviceName();
},
get platform() {
return core.services.API.getPlatform();
},
get confirm() {
return core.services.API.confirm;
},
processReplicatedDocs: async (docs: EntryDoc[]) => {
await core.services.replication.parseSynchroniseResult(docs as any);
},
};
return new TrysteroReplicator(env as any);
return new LiveSyncTrysteroReplicator({ services: core.services });
}
function getSortedPeers(replicator: TrysteroReplicator): CLIP2PPeer[] {
function getSortedPeers(replicator: LiveSyncTrysteroReplicator): CLIP2PPeer[] {
return [...replicator.knownAdvertisements]
.map((peer) => ({ peerId: peer.peerId, name: peer.name }))
.sort((a, b) => a.peerId.localeCompare(b.peerId));
@@ -77,7 +47,7 @@ export async function collectPeers(
core: LiveSyncBaseCore<ServiceContext, any>,
timeoutSec: number
): Promise<CLIP2PPeer[]> {
const replicator = await createReplicator(core);
const replicator = createReplicator(core);
await replicator.open();
try {
await delay(timeoutSec * 1000);
@@ -107,7 +77,7 @@ export async function syncWithPeer(
peerToken: string,
timeoutSec: number
): Promise<CLIP2PPeer> {
const replicator = await createReplicator(core);
const replicator = createReplicator(core);
await replicator.open();
try {
const timeoutMs = timeoutSec * 1000;
@@ -142,8 +112,8 @@ export async function syncWithPeer(
}
}
export async function openP2PHost(core: LiveSyncBaseCore<ServiceContext, any>): Promise<TrysteroReplicator> {
const replicator = await createReplicator(core);
export async function openP2PHost(core: LiveSyncBaseCore<ServiceContext, any>): Promise<LiveSyncTrysteroReplicator> {
const replicator = createReplicator(core);
await replicator.open();
return replicator;
}

View File

@@ -1,10 +1,8 @@
import { PouchDB } from "@lib/pouchdb/pouchdb-browser";
import {
type EntryDoc,
type LOG_LEVEL,
type ObsidianLiveSyncSettings,
type P2PSyncSetting,
LOG_LEVEL_NOTICE,
LOG_LEVEL_VERBOSE,
P2P_DEFAULT_SETTINGS,
REMOTE_P2P,
@@ -12,35 +10,33 @@ import {
import { eventHub } from "@lib/hub/hub";
import type { Confirm } from "@lib/interfaces/Confirm";
import { LOG_LEVEL_INFO, Logger } from "@lib/common/logger";
import { LOG_LEVEL_NOTICE, Logger } from "@lib/common/logger";
import { storeP2PStatusLine } from "./CommandsShim";
import {
EVENT_P2P_PEER_SHOW_EXTRA_MENU,
type CommandShim,
type PeerStatus,
type PluginShim,
} from "@lib/replication/trystero/P2PReplicatorPaneCommon";
import {
closeP2PReplicator,
openP2PReplicator,
P2PLogCollector,
type P2PReplicatorBase,
useP2PReplicator,
} from "@lib/replication/trystero/P2PReplicatorCore";
import type { SimpleStore } from "octagonal-wheels/databases/SimpleStoreBase";
import { reactiveSource } from "octagonal-wheels/dataobject/reactive_v2";
import { EVENT_SETTING_SAVED } from "@lib/events/coreEvents";
import { unique } from "octagonal-wheels/collection";
import { BrowserServiceHub } from "@lib/services/BrowserServices";
import { TrysteroReplicator } from "@lib/replication/trystero/TrysteroReplicator";
import { SETTING_KEY_P2P_DEVICE_NAME } from "@lib/common/types";
import { ServiceContext } from "@lib/services/base/ServiceBase";
import type { InjectableServiceHub } from "@lib/services/InjectableServices";
import { Menu } from "@lib/services/implements/browser/Menu";
import type { InjectableVaultServiceCompat } from "@lib/services/implements/injectable/InjectableVaultService";
import { SimpleStoreIDBv2 } from "octagonal-wheels/databases/SimpleStoreIDBv2";
import type { InjectableAPIService } from "@/lib/src/services/implements/injectable/InjectableAPIService";
import type { BrowserAPIService } from "@/lib/src/services/implements/browser/BrowserAPIService";
import type { InjectableSettingService } from "@/lib/src/services/implements/injectable/InjectableSettingService";
import {
LiveSyncTrysteroReplicator,
} from "@lib/replication/trystero/LiveSyncTrysteroReplicator";
function addToList(item: string, list: string) {
return unique(
@@ -60,12 +56,10 @@ function removeFromList(item: string, list: string) {
.join(",");
}
export class P2PReplicatorShim implements P2PReplicatorBase, CommandShim {
export class P2PReplicatorShim implements P2PReplicatorBase {
storeP2PStatusLine = reactiveSource("");
plugin!: PluginShim;
// environment!: IEnvironment;
confirm!: Confirm;
// simpleStoreAPI!: ISimpleStoreAPI;
db?: PouchDB.Database<EntryDoc>;
services: InjectableServiceHub<ServiceContext>;
@@ -76,12 +70,26 @@ export class P2PReplicatorShim implements P2PReplicatorBase, CommandShim {
return this.db;
}
_simpleStore!: SimpleStore<any>;
async closeDB() {
if (this.db) {
await this.db.close();
this.db = undefined;
}
}
private _liveSyncReplicator?: LiveSyncTrysteroReplicator;
p2pLogCollector!: P2PLogCollector;
private _initP2PReplicator() {
const { replicator, p2pLogCollector, storeP2PStatusLine: p2pStatusLine } = useP2PReplicator({ services: this.services } as any);
this._liveSyncReplicator = replicator;
this.p2pLogCollector = p2pLogCollector;
p2pLogCollector.p2pReplicationLine.onChanged((line) => {
storeP2PStatusLine.set(line.value);
});
}
constructor() {
const browserServiceHub = new BrowserServiceHub<ServiceContext>();
this.services = browserServiceHub;
@@ -89,7 +97,6 @@ export class P2PReplicatorShim implements P2PReplicatorBase, CommandShim {
(this.services.API as BrowserAPIService<ServiceContext>).getSystemVaultName.setHandler(
() => "p2p-livesync-web-peer"
);
// this.services.API.addLog.setHandler(Logger);
const repStore = SimpleStoreIDBv2.open<any>("p2p-livesync-web-peer");
this._simpleStore = repStore;
let _settings = { ...P2P_DEFAULT_SETTINGS, additionalSuffixOfDatabaseName: "" } as ObsidianLiveSyncSettings;
@@ -103,14 +110,13 @@ export class P2PReplicatorShim implements P2PReplicatorBase, CommandShim {
return settings;
});
}
get settings() {
return this.services.setting.currentSettings() as P2PSyncSetting;
}
async init() {
// const { simpleStoreAPI } = await getWrappedSynchromesh();
// this.confirm = confirm;
this.confirm = this.services.UI.confirm;
// this.environment = environment;
if (this.db) {
try {
@@ -123,30 +129,16 @@ export class P2PReplicatorShim implements P2PReplicatorBase, CommandShim {
await this.services.setting.loadSettings();
this.plugin = {
// saveSettings: async () => {
// await repStore.set("settings", _settings);
// eventHub.emitEvent(EVENT_SETTING_SAVED, _settings);
// },
// get settings() {
// return _settings;
// },
// set settings(newSettings: P2PSyncSetting) {
// _settings = { ..._settings, ...newSettings };
// },
// rebuilder: null,
// core: {
// settings: this.services.setting.settings,
// },
services: this.services,
core: {
services: this.services,
},
// $$scheduleAppReload: () => {},
// $$getVaultName: () => "p2p-livesync-web-peer",
};
// const deviceName = this.getDeviceName();
const database_name = this.settings.P2P_AppID + "-" + this.settings.P2P_roomID + "p2p-livesync-web-peer";
this.db = new PouchDB<EntryDoc>(database_name);
this._initP2PReplicator();
setTimeout(() => {
if (this.settings.P2P_AutoStart && this.settings.P2P_Enabled) {
void this.open();
@@ -155,7 +147,7 @@ export class P2PReplicatorShim implements P2PReplicatorBase, CommandShim {
return this;
}
_log(msg: any, level?: LOG_LEVEL): void {
_log(msg: any, level?: any): void {
Logger(msg, level);
}
_notice(msg: string, key?: string): void {
@@ -167,14 +159,10 @@ export class P2PReplicatorShim implements P2PReplicatorBase, CommandShim {
simpleStore(): SimpleStore<any> {
return this._simpleStore;
}
handleReplicatedDocuments(docs: EntryDoc[]): Promise<boolean> {
// No op. This is a client and does not need to process the docs
handleReplicatedDocuments(_docs: EntryDoc[]): Promise<boolean> {
return Promise.resolve(true);
}
getPluginShim() {
return {};
}
getConfig(key: string) {
const vaultName = this.services.vault.getVaultName();
const dbKey = `${vaultName}-${key}`;
@@ -189,9 +177,7 @@ export class P2PReplicatorShim implements P2PReplicatorBase, CommandShim {
getDeviceName(): string {
return this.getConfig(SETTING_KEY_P2P_DEVICE_NAME) ?? this.plugin.services.vault.getVaultName();
}
getPlatform(): string {
return "pseudo-replicator";
}
m?: Menu;
afterConstructor(): void {
eventHub.onEvent(EVENT_P2P_PEER_SHOW_EXTRA_MENU, ({ peer, event }) => {
@@ -202,12 +188,6 @@ export class P2PReplicatorShim implements P2PReplicatorBase, CommandShim {
.addItem((item) => item.setTitle("📥 Only Fetch").onClick(() => this.replicateFrom(peer)))
.addItem((item) => item.setTitle("📤 Only Send").onClick(() => this.replicateTo(peer)))
.addSeparator()
// .addItem((item) => {
// item.setTitle("🔧 Get Configuration").onClick(async () => {
// await this.getRemoteConfig(peer);
// });
// })
// .addSeparator()
.addItem((item) => {
const mark = peer.syncOnConnect ? "checkmark" : null;
item.setTitle("Toggle Sync on connect")
@@ -234,97 +214,43 @@ export class P2PReplicatorShim implements P2PReplicatorBase, CommandShim {
});
void this.m.showAtPosition({ x: event.x, y: event.y });
});
this.p2pLogCollector.p2pReplicationLine.onChanged((line) => {
storeP2PStatusLine.set(line.value);
});
}
_replicatorInstance?: TrysteroReplicator;
p2pLogCollector = new P2PLogCollector();
async open() {
await openP2PReplicator(this);
await this._liveSyncReplicator?.open();
}
async close() {
await closeP2PReplicator(this);
await this._liveSyncReplicator?.close();
}
enableBroadcastCastings() {
return this?._replicatorInstance?.enableBroadcastChanges();
return this._liveSyncReplicator?.enableBroadcastChanges();
}
disableBroadcastCastings() {
return this?._replicatorInstance?.disableBroadcastChanges();
}
async initialiseP2PReplicator(): Promise<TrysteroReplicator> {
await this.init();
try {
if (this._replicatorInstance) {
await this._replicatorInstance.close();
this._replicatorInstance = undefined;
}
if (!this.settings.P2P_AppID) {
this.settings.P2P_AppID = P2P_DEFAULT_SETTINGS.P2P_AppID;
}
const getInitialDeviceName = () =>
this.getConfig(SETTING_KEY_P2P_DEVICE_NAME) || this.services.vault.getVaultName();
const getSettings = () => this.settings;
const store = () => this.simpleStore();
const getDB = () => this.getDB();
const getConfirm = () => this.confirm;
const getPlatform = () => this.getPlatform();
const env = {
get db() {
return getDB();
},
get confirm() {
return getConfirm();
},
get deviceName() {
return getInitialDeviceName();
},
get platform() {
return getPlatform();
},
get settings() {
return getSettings();
},
processReplicatedDocs: async (docs: EntryDoc[]): Promise<void> => {
await this.handleReplicatedDocuments(docs);
// No op. This is a client and does not need to process the docs
},
get simpleStore() {
return store();
},
};
this._replicatorInstance = new TrysteroReplicator(env);
return this._replicatorInstance;
} catch (e) {
this._log(
e instanceof Error ? e.message : "Something occurred on Initialising P2P Replicator",
LOG_LEVEL_INFO
);
this._log(e, LOG_LEVEL_VERBOSE);
throw e;
}
return this._liveSyncReplicator?.disableBroadcastChanges();
}
get replicator() {
return this._replicatorInstance!;
return this._liveSyncReplicator;
}
async replicateFrom(peer: PeerStatus) {
await this.replicator.replicateFrom(peer.peerId);
const r = this._liveSyncReplicator;
if (!r) return;
await r.replicateFrom(peer.peerId);
}
async replicateTo(peer: PeerStatus) {
await this.replicator.requestSynchroniseToPeer(peer.peerId);
await this._liveSyncReplicator?.requestSynchroniseToPeer(peer.peerId);
}
async getRemoteConfig(peer: PeerStatus) {
Logger(
`Requesting remote config for ${peer.name}. Please input the passphrase on the remote device`,
LOG_LEVEL_NOTICE
);
const remoteConfig = await this.replicator.getRemoteConfig(peer.peerId);
const remoteConfig = await this._liveSyncReplicator?.getRemoteConfig(peer.peerId);
if (remoteConfig) {
Logger(`Remote config for ${peer.name} is retrieved successfully`);
const DROP = "Yes, and drop local database";
@@ -344,9 +270,7 @@ export class P2PReplicatorShim implements P2PReplicatorBase, CommandShim {
if (remoteConfig.remoteType !== REMOTE_P2P) {
const yn2 = await this.confirm.askYesNoDialog(
`Do you want to set the remote type to "P2P Sync" to rebuild by "P2P replication"?`,
{
title: "Rebuild from remote device",
}
{ title: "Rebuild from remote device" }
);
if (yn2 === "yes") {
remoteConfig.remoteType = REMOTE_P2P;
@@ -355,9 +279,7 @@ export class P2PReplicatorShim implements P2PReplicatorBase, CommandShim {
}
}
await this.services.setting.applyPartial(remoteConfig, true);
if (yn === DROP) {
// await this.plugin.rebuilder.scheduleFetch();
} else {
if (yn !== DROP) {
await this.plugin.core.services.appLifecycle.scheduleRestart();
}
} else {
@@ -381,8 +303,6 @@ export class P2PReplicatorShim implements P2PReplicatorBase, CommandShim {
[targetSetting]: currentSettingAll ? currentSettingAll[targetSetting] : "",
};
if (peer[prop]) {
// this.plugin.settings[targetSetting] = removeFromList(peer.name, this.plugin.settings[targetSetting]);
// await this.plugin.saveSettings();
currentSetting[targetSetting] = removeFromList(peer.name, currentSetting[targetSetting]);
} else {
currentSetting[targetSetting] = addToList(peer.name, currentSetting[targetSetting]);