diff --git a/manifest.json b/manifest.json index 923b38f..cea5902 100644 --- a/manifest.json +++ b/manifest.json @@ -1,7 +1,7 @@ { "id": "obsidian-livesync", "name": "Self-hosted LiveSync", - "version": "0.4.0", + "version": "0.4.1", "minAppVersion": "0.9.12", "description": "Community implementation of self-hosted livesync. Reflect your vault changes to some other devices immediately. Please make sure to disable other synchronize solutions to avoid content corruption or duplication.", "author": "vorotamoroz", diff --git a/package-lock.json b/package-lock.json index f8624a5..0e80d45 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "obsidian-livesync", - "version": "0.4.0", + "version": "0.4.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "obsidian-livesync", - "version": "0.4.0", + "version": "0.4.1", "license": "MIT", "dependencies": { "diff-match-patch": "^1.0.5", diff --git a/package.json b/package.json index bc3348c..e1d99d2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "obsidian-livesync", - "version": "0.4.0", + "version": "0.4.1", "description": "Reflect your vault changes to some other devices immediately. Please make sure to disable other synchronize solutions to avoid content corruption or duplication.", "main": "main.js", "scripts": { diff --git a/src/LocalPouchDB.ts b/src/LocalPouchDB.ts index 7465e65..b45ca4e 100644 --- a/src/LocalPouchDB.ts +++ b/src/LocalPouchDB.ts @@ -25,7 +25,7 @@ import { } from "./types"; import { resolveWithIgnoreKnownError, delay, path2id, runWithLock } from "./utils"; import { Logger } from "./logger"; -import { checkRemoteVersion, connectRemoteCouchDB } from "./utils_couchdb"; +import { checkRemoteVersion, connectRemoteCouchDB, getLastPostFailedBySize } from "./utils_couchdb"; import { decrypt, encrypt } from "./e2ee"; export class LocalPouchDB { @@ -763,6 +763,12 @@ export class LocalPouchDB { } const syncOptionBase: PouchDB.Replication.SyncOptions = { + pull: { + checkpoint: "target", + }, + push: { + checkpoint: "source", + }, batches_limit: setting.batches_limit, batch_size: setting.batch_size, }; @@ -770,7 +776,7 @@ export class LocalPouchDB { const db = dbret.db; const totalCount = (await this.localDatabase.info()).doc_count; //replicate once - const replicate = this.localDatabase.replicate.to(db, syncOptionBase); + const replicate = this.localDatabase.replicate.to(db, { checkpoint: "source", ...syncOptionBase }); replicate .on("active", () => { this.syncStatus = "CONNECTED"; @@ -806,7 +812,7 @@ export class LocalPouchDB { }); } - async checkReplicationConnectivity(setting: ObsidianLiveSyncSettings, keepAlive: boolean) { + async checkReplicationConnectivity(setting: ObsidianLiveSyncSettings, keepAlive: boolean, skipCheck: boolean) { if (!this.isReady) { Logger("Database is not ready."); return false; @@ -832,40 +838,41 @@ export class LocalPouchDB { return false; } - if (!(await checkRemoteVersion(dbret.db, this.migrate.bind(this), VER))) { - Logger("Remote database is newer or corrupted, make sure to latest version of self-hosted-livesync installed", LOG_LEVEL.NOTICE); - return false; + if (!skipCheck) { + if (!(await checkRemoteVersion(dbret.db, this.migrate.bind(this), VER))) { + Logger("Remote database is newer or corrupted, make sure to latest version of self-hosted-livesync installed", LOG_LEVEL.NOTICE); + return false; + } + + const defMilestonePoint: EntryMilestoneInfo = { + _id: MILSTONE_DOCID, + type: "milestoneinfo", + created: (new Date() as any) / 1, + locked: false, + accepted_nodes: [this.nodeid], + }; + // const remoteInfo = dbret.info; + // const localInfo = await this.localDatabase.info(); + // const remoteDocsCount = remoteInfo.doc_count; + // const localDocsCount = localInfo.doc_count; + // const remoteUpdSeq = typeof remoteInfo.update_seq == "string" ? Number(remoteInfo.update_seq.split("-")[0]) : remoteInfo.update_seq; + // const localUpdSeq = typeof localInfo.update_seq == "string" ? Number(localInfo.update_seq.split("-")[0]) : localInfo.update_seq; + + // Logger(`Database diffences: remote:${remoteDocsCount} docs / last update ${remoteUpdSeq}`); + // Logger(`Database diffences: local :${localDocsCount} docs / last update ${localUpdSeq}`); + + const remoteMilestone: EntryMilestoneInfo = await resolveWithIgnoreKnownError(dbret.db.get(MILSTONE_DOCID), defMilestonePoint); + this.remoteLocked = remoteMilestone.locked; + this.remoteLockedAndDeviceNotAccepted = remoteMilestone.locked && remoteMilestone.accepted_nodes.indexOf(this.nodeid) == -1; + + if (remoteMilestone.locked && remoteMilestone.accepted_nodes.indexOf(this.nodeid) == -1) { + Logger("Remote database marked as 'Auto Sync Locked'. And this devide does not marked as resolved device. see settings dialog.", LOG_LEVEL.NOTICE); + return false; + } + if (typeof remoteMilestone._rev == "undefined") { + await dbret.db.put(remoteMilestone); + } } - - const defMilestonePoint: EntryMilestoneInfo = { - _id: MILSTONE_DOCID, - type: "milestoneinfo", - created: (new Date() as any) / 1, - locked: false, - accepted_nodes: [this.nodeid], - }; - // const remoteInfo = dbret.info; - // const localInfo = await this.localDatabase.info(); - // const remoteDocsCount = remoteInfo.doc_count; - // const localDocsCount = localInfo.doc_count; - // const remoteUpdSeq = typeof remoteInfo.update_seq == "string" ? Number(remoteInfo.update_seq.split("-")[0]) : remoteInfo.update_seq; - // const localUpdSeq = typeof localInfo.update_seq == "string" ? Number(localInfo.update_seq.split("-")[0]) : localInfo.update_seq; - - // Logger(`Database diffences: remote:${remoteDocsCount} docs / last update ${remoteUpdSeq}`); - // Logger(`Database diffences: local :${localDocsCount} docs / last update ${localUpdSeq}`); - - const remoteMilestone: EntryMilestoneInfo = await resolveWithIgnoreKnownError(dbret.db.get(MILSTONE_DOCID), defMilestonePoint); - this.remoteLocked = remoteMilestone.locked; - this.remoteLockedAndDeviceNotAccepted = remoteMilestone.locked && remoteMilestone.accepted_nodes.indexOf(this.nodeid) == -1; - - if (remoteMilestone.locked && remoteMilestone.accepted_nodes.indexOf(this.nodeid) == -1) { - Logger("Remote database marked as 'Auto Sync Locked'. And this devide does not marked as resolved device. see settings dialog.", LOG_LEVEL.NOTICE); - return false; - } - if (typeof remoteMilestone._rev == "undefined") { - await dbret.db.put(remoteMilestone); - } - const syncOptionBase: PouchDB.Replication.SyncOptions = { batches_limit: setting.batches_limit, batch_size: setting.batch_size, @@ -877,33 +884,49 @@ export class LocalPouchDB { async openReplication(setting: ObsidianLiveSyncSettings, keepAlive: boolean, showResult: boolean, callback: (e: PouchDB.Core.ExistingDocument[]) => Promise): Promise { return await runWithLock("replicate", false, () => { - return this._openReplication(setting, keepAlive, showResult, callback); + return this._openReplication(setting, keepAlive, showResult, callback, false); }); } - async _openReplication(setting: ObsidianLiveSyncSettings, keepAlive: boolean, showResult: boolean, callback: (e: PouchDB.Core.ExistingDocument[]) => Promise): Promise { - const ret = await this.checkReplicationConnectivity(setting, keepAlive); + originalSetting: ObsidianLiveSyncSettings = null; + // last_seq: number = 200; + async _openReplication(setting: ObsidianLiveSyncSettings, keepAlive: boolean, showResult: boolean, callback: (e: PouchDB.Core.ExistingDocument[]) => Promise, retrying: boolean): Promise { + const ret = await this.checkReplicationConnectivity(setting, keepAlive, retrying); if (ret === false) return false; let notice: Notice = null; if (showResult) { - notice = new Notice("Replicating", 0); + notice = new Notice("Looking for the point last synchronized point.", 0); } const { db, syncOptionBase, syncOption } = ret; //replicate once this.syncStatus = "STARTED"; + this.updateInfo(); let resolved = false; const docArrivedOnStart = this.docArrived; const docSentOnStart = this.docSent; + const _openReplicationSync = () => { Logger("Sync Main Started"); + if (!retrying) { + this.originalSetting = setting; + } this.syncHandler = this.cancelHandler(this.syncHandler); - this.syncHandler = this.localDatabase.sync(db, syncOption); + this.syncHandler = this.localDatabase.sync(db, { + ...syncOption, + pull: { + checkpoint: "target", + }, + push: { + checkpoint: "source", + }, + }); this.syncHandler .on("active", () => { this.syncStatus = "CONNECTED"; this.updateInfo(); Logger("Replication activated"); + if (notice != null) notice.setMessage(`Activated..`); }) .on("change", async (e) => { try { @@ -924,6 +947,16 @@ export class LocalPouchDB { Logger("Replication callback error"); Logger(ex); } + // re-connect to retry with original setting + if (retrying) { + if (this.docSent - docSentOnStart + (this.docArrived - docArrivedOnStart) > this.originalSetting.batch_size * 2) { + // restore sync values + Logger("Back into original settings once."); + if (notice != null) notice.hide(); + this.syncHandler = this.cancelHandler(this.syncHandler); + this._openReplication(this.originalSetting, keepAlive, showResult, callback, false); + } + } }) .on("complete", (e) => { this.syncStatus = "COMPLETED"; @@ -948,8 +981,25 @@ export class LocalPouchDB { this.syncHandler = this.cancelHandler(this.syncHandler); this.updateInfo(); if (notice != null) notice.hide(); - Logger("Replication error", LOG_LEVEL.NOTICE); - Logger(e); + if (getLastPostFailedBySize()) { + if (keepAlive) { + Logger("Replication stopped.", LOG_LEVEL.NOTICE); + } else { + // Duplicate settings for smaller batch. + const xsetting: ObsidianLiveSyncSettings = JSON.parse(JSON.stringify(setting)); + xsetting.batch_size = Math.ceil(xsetting.batch_size / 2); + xsetting.batches_limit = Math.ceil(xsetting.batches_limit / 2); + if (xsetting.batch_size <= 3 || xsetting.batches_limit <= 3) { + Logger("We can't replicate more lower value.", showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO); + } else { + Logger(`Retry with lower batch size:${xsetting.batch_size}/${xsetting.batches_limit}`, showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO); + this._openReplication(xsetting, keepAlive, showResult, callback, true); + } + } + } else { + Logger("Replication error", LOG_LEVEL.NOTICE); + Logger(e); + } }) .on("paused", (e) => { this.syncStatus = "PAUSED"; @@ -974,7 +1024,7 @@ export class LocalPouchDB { Logger(await db.info(), LOG_LEVEL.VERBOSE); let replicate: PouchDB.Replication.Replication; try { - replicate = this.localDatabase.replicate.from(db, syncOptionBase); + replicate = this.localDatabase.replicate.from(db, { checkpoint: "target", ...syncOptionBase }); replicate .on("active", () => { this.syncStatus = "CONNECTED"; diff --git a/src/utils_couchdb.ts b/src/utils_couchdb.ts index 58fe75a..5465963 100644 --- a/src/utils_couchdb.ts +++ b/src/utils_couchdb.ts @@ -8,11 +8,54 @@ export const isValidRemoteCouchDBURI = (uri: string): boolean => { if (uri.startsWith("http://")) return true; return false; }; +let last_post_successed = false; +export const getLastPostFailedBySize = () => { + return !last_post_successed; +}; export const connectRemoteCouchDB = async (uri: string, auth: { username: string; password: string }): Promise; info: PouchDB.Core.DatabaseInfo }> => { if (!isValidRemoteCouchDBURI(uri)) return "Remote URI is not valid"; - const db: PouchDB.Database = new PouchDB(uri, { + const conf: PouchDB.HttpAdapter.HttpAdapterConfiguration = { + adapter: "http", auth, - }); + fetch: async function (url: string | Request, opts: RequestInit) { + let size_ok = true; + let size = ""; + const localURL = url.toString().substring(uri.length); + const method = opts.method ?? "GET"; + if (opts.body) { + const opts_length = opts.body.toString().length; + if (opts_length > 1024 * 1024 * 10) { + // over 10MB + size_ok = false; + if (uri.contains(".cloudantnosqldb.")) { + last_post_successed = false; + Logger("This request should fail on IBM Cloudant.", LOG_LEVEL.VERBOSE); + throw new Error("This request should fail on IBM Cloudant."); + } + } + size = ` (${opts_length})`; + } + try { + const responce: Response = await fetch(url, opts); + if (method == "POST" || method == "PUT") { + last_post_successed = responce.ok; + } else { + last_post_successed = true; + } + Logger(`HTTP:${method}${size} to:${localURL} -> ${responce.status}`, LOG_LEVEL.VERBOSE); + return responce; + } catch (ex) { + Logger(`HTTP:${method}${size} to:${localURL} -> failed`, LOG_LEVEL.VERBOSE); + if (!size_ok && (method == "POST" || method == "PUT")) { + last_post_successed = false; + } + Logger(ex); + throw ex; + } + // return await fetch(url, opts); + }, + }; + const db: PouchDB.Database = new PouchDB(uri, conf); try { const info = await db.info(); return { db: db, info: info };