From 8f583e36806738012b516156e2b490962eeeb05b Mon Sep 17 00:00:00 2001 From: vorotamoroz Date: Tue, 9 Aug 2022 17:10:08 +0900 Subject: [PATCH] Fixed: - Now, we can synchronise hidden files that conflicted on each devices. Enhanced: - We can search for conflicting docs. - Pending processes can now be run at any time. - Performance improved on synchronising large numbers of files at once. --- manifest.json | 2 +- package.json | 2 +- src/main.ts | 312 ++++++++++++++++++++++++++++++++++---------------- updates.md | 7 +- 4 files changed, 220 insertions(+), 103 deletions(-) diff --git a/manifest.json b/manifest.json index bc71c49..0530dc7 100644 --- a/manifest.json +++ b/manifest.json @@ -1,7 +1,7 @@ { "id": "obsidian-livesync", "name": "Self-hosted LiveSync", - "version": "0.13.3", + "version": "0.13.4", "minAppVersion": "0.9.12", "description": "Community implementation of self-hosted livesync. Reflect your vault changes to some other devices immediately. Please make sure to disable other synchronize solutions to avoid content corruption or duplication.", "author": "vorotamoroz", diff --git a/package.json b/package.json index 65d88f8..577a979 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "obsidian-livesync", - "version": "0.13.3", + "version": "0.13.4", "description": "Reflect your vault changes to some other devices immediately. Please make sure to disable other synchronize solutions to avoid content corruption or duplication.", "main": "main.js", "type": "module", diff --git a/src/main.ts b/src/main.ts index afa1f4f..d4f33e4 100644 --- a/src/main.ts +++ b/src/main.ts @@ -29,7 +29,7 @@ import { DocumentHistoryModal } from "./DocumentHistoryModal"; -import { clearAllPeriodic, clearAllTriggers, disposeMemoObject, id2path, memoIfNotExist, memoObject, path2id, retriveMemoObject, setTrigger } from "./utils"; +import { clearAllPeriodic, clearAllTriggers, clearTrigger, disposeMemoObject, id2path, memoIfNotExist, memoObject, path2id, retriveMemoObject, setTrigger } from "./utils"; import { decrypt, encrypt } from "./lib/src/e2ee_v2"; const isDebug = false; @@ -174,6 +174,45 @@ export default class ObsidianLiveSyncPlugin extends Plugin { this.showHistory(target); } } + async pickFileForResolve() { + const pageLimit = 1000; + let nextKey = ""; + const notes: { path: string, mtime: number }[] = []; + do { + const docs = await this.localDatabase.localDatabase.allDocs({ limit: pageLimit, startkey: nextKey, conflicts: true, include_docs: true }); + nextKey = ""; + for (const row of docs.rows) { + const doc = row.doc; + nextKey = `${row.id}\u{10ffff}`; + if (!("_conflicts" in doc)) continue; + if (isInteralChunk(row.id)) continue; + if (doc._deleted) continue; + if ("deleted" in doc && doc.deleted) continue; + if (doc.type == "newnote" || doc.type == "plain") { + // const docId = doc._id.startsWith("i:") ? doc._id.substring("i:".length) : doc._id; + notes.push({ path: id2path(doc._id), mtime: doc.mtime }); + } + if (isChunk(nextKey)) { + // skip the chunk zone. + nextKey = CHeaderEnd; + } + } + } while (nextKey != ""); + notes.sort((a, b) => b.mtime - a.mtime); + const notesList = notes.map(e => e.path); + if (notesList.length == 0) { + Logger("There are no conflicted documents", LOG_LEVEL.NOTICE); + return; + } + const target = await askSelectString(this.app, "File to view History", notesList); + if (target) { + if (isInteralChunk(target)) { + //NOP + } else { + await this.showIfConflicted(this.app.vault.getAbstractFileByPath(target) as TFile); + } + } + } async onload() { setLogger(this.addLog.bind(this)); // Logger moved to global. @@ -492,6 +531,20 @@ export default class ObsidianLiveSyncPlugin extends Plugin { callback: () => { this.fileHistory(); }, + }); + this.addCommand({ + id: "livesync-conflictcheck", + name: "Pick a file to resolive conflict", + callback: () => { + this.pickFileForResolve(); + }, + }) + this.addCommand({ + id: "livesync-runbatch", + name: "Run pending batch processes", + callback: async () => { + await this.applyBatchChange(); + }, }) } @@ -997,8 +1050,8 @@ export default class ObsidianLiveSyncPlugin extends Plugin { return; } if (docEntry._deleted || docEntry.deleted) { - //basically pass. - //but if there are no docs left, delete file. + // This occurs not only when files are deleted, but also when conflicts are resolved. + // We have to check no other revisions are left. const lastDocs = await this.localDatabase.getDBEntry(pathSrc); if (lastDocs === false) { await this.deleteVaultItem(file); @@ -1006,7 +1059,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin { // it perhaps delete some revisions. // may be we have to reload this await this.pullFile(pathSrc, null, true); - Logger(`delete skipped:${lastDocs._id}`); + Logger(`delete skipped:${lastDocs._id}`, LOG_LEVEL.VERBOSE); } return; } @@ -1064,7 +1117,37 @@ export default class ObsidianLiveSyncPlugin extends Plugin { } } - async handleDBChanged(change: EntryBody) { + queuedEntries: EntryBody[] = []; + handleDBChanged(change: EntryBody) { + // If queued same file, cancel previous one. + this.queuedEntries.remove(this.queuedEntries.find(e => e._id == change._id)); + // If the file is opened, we have to apply immediately + const af = app.workspace.getActiveFile(); + if (af && af.path == id2path(change._id)) { + return this.handleDBChangedAsync(change); + } + this.queuedEntries.push(change); + if (this.queuedEntries.length > 50) { + clearTrigger("dbchanged"); + this.execDBchanged(); + } + setTrigger("dbchanged", 500, () => this.execDBchanged()); + } + async execDBchanged() { + await runWithLock("dbchanged", false, async () => { + const w = [...this.queuedEntries]; + this.queuedEntries = []; + Logger(`Applyng ${w.length} files`); + for (const entry of w) { + Logger(`Applying ${entry._id} (${entry._rev}) change...`, LOG_LEVEL.VERBOSE); + await this.handleDBChangedAsync(entry); + Logger(`Applied ${entry._id} (${entry._rev}) change...`); + } + } + ); + } + async handleDBChangedAsync(change: EntryBody) { + const targetFile = this.app.vault.getAbstractFileByPath(id2path(change._id)); if (targetFile == null) { if (change._deleted || change.deleted) { @@ -1113,36 +1196,48 @@ export default class ObsidianLiveSyncPlugin extends Plugin { } } } - async procQueuedFiles() { - await runWithLock("procQueue", false, async () => { - this.saveQueuedFiles(); - for (const queue of this.queuedFiles) { - if (queue.done) continue; - const now = new Date().getTime(); - if (queue.missingChildren.length == 0) { - queue.done = true; - if (isInteralChunk(queue.entry._id)) { - //system file - const filename = id2path(id2filenameInternalChunk(queue.entry._id)); - Logger(`Applying hidden file, ${queue.entry._id} (${queue.entry._rev}) change...`); - await this.syncInternalFilesAndDatabase("pull", false, false, [filename]) - Logger(`Applied hidden file, ${queue.entry._id} (${queue.entry._rev}) change...`); - } - if (isValidPath(id2path(queue.entry._id))) { - Logger(`Applying ${queue.entry._id} (${queue.entry._rev}) change...`); - await this.handleDBChanged(queue.entry); - Logger(`Applied ${queue.entry._id} (${queue.entry._rev})`); - } - } else if (now > queue.timeout) { - if (!queue.warned) Logger(`Timed out: ${queue.entry._id} could not collect ${queue.missingChildren.length} chunks. plugin keeps watching, but you have to check the file after the replication.`, LOG_LEVEL.NOTICE); - queue.warned = true; - continue; - } - } - this.queuedFiles = this.queuedFiles.filter((e) => !e.done); - this.saveQueuedFiles(); + procInternalFiles: string[] = []; + async execInternalFile() { + await runWithLock("execinternal", false, async () => { + const w = [...this.procInternalFiles]; + this.procInternalFiles = []; + Logger(`Applying hidden ${w.length} files change...`); + await this.syncInternalFilesAndDatabase("pull", false, false, w); + Logger(`Applying hidden ${w.length} files changed`); }); } + procInternalFile(filename: string) { + this.procInternalFiles.push(filename); + setTrigger("procInternal", 500, async () => { + await this.execInternalFile(); + }); + } + procQueuedFiles() { + + this.saveQueuedFiles(); + for (const queue of this.queuedFiles) { + if (queue.done) continue; + const now = new Date().getTime(); + if (queue.missingChildren.length == 0) { + queue.done = true; + if (isInteralChunk(queue.entry._id)) { + //system file + const filename = id2path(id2filenameInternalChunk(queue.entry._id)); + // await this.syncInternalFilesAndDatabase("pull", false, false, [filename]) + this.procInternalFile(filename); + } + if (isValidPath(id2path(queue.entry._id))) { + this.handleDBChanged(queue.entry); + } + } else if (now > queue.timeout) { + if (!queue.warned) Logger(`Timed out: ${queue.entry._id} could not collect ${queue.missingChildren.length} chunks. plugin keeps watching, but you have to check the file after the replication.`, LOG_LEVEL.NOTICE); + queue.warned = true; + continue; + } + } + this.queuedFiles = this.queuedFiles.filter((e) => !e.done); + this.saveQueuedFiles(); + } parseIncomingChunk(chunk: PouchDB.Core.ExistingDocument) { const now = new Date().getTime(); let isNewFileCompleted = false; @@ -2328,7 +2423,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin { const content = await arrayBufferToBase64(contentBin); if (content == fileOnDB.data && !force) { // Logger(`STORAGE <-- DB:${filename}: skipped (hidden) Not changed`, LOG_LEVEL.VERBOSE); - return false; + return true; } await this.app.vault.adapter.writeBinary(filename, base64ToArrayBuffer(fileOnDB.data), { mtime: fileOnDB.mtime, ctime: fileOnDB.ctime }); Logger(`STORAGE <-- DB:${filename}: written (hidden, overwrite${force ? ", force" : ""})`); @@ -2353,8 +2448,46 @@ export default class ObsidianLiveSyncPlugin extends Plugin { } confirmPopup: WrappedNotice = null; + + async resolveConflictOnInternalFiles() { + // Scan all conflicted internal files + const docs = await this.localDatabase.localDatabase.allDocs({ startkey: ICHeader, endkey: ICHeaderEnd, conflicts: true, include_docs: true }); + for (const row of docs.rows) { + const doc = row.doc; + if (!("_conflicts" in doc)) continue; + if (isInteralChunk(row.id)) { + await this.resolveConflictOnInternalFile(row.id); + } + } + } + async resolveConflictOnInternalFile(id: string): Promise { + // Retrieve data + const doc = await this.localDatabase.localDatabase.get(id, { conflicts: true }); + // If there is no conflict, return with false. + if (!("_conflicts" in doc)) return false; + if (doc._conflicts.length == 0) return false; + Logger(`Hidden file conflicetd:${id2filenameInternalChunk(id)}`); + const revA = doc._rev; + const revB = doc._conflicts[0]; + + const revBdoc = await this.localDatabase.localDatabase.get(id, { rev: revB }); + // determine which revision sould been deleted. + // simply check modified time + const mtimeA = ("mtime" in doc && doc.mtime) || 0; + const mtimeB = ("mtime" in revBdoc && revBdoc.mtime) || 0; + // Logger(`Revisions:${new Date(mtimeA).toLocaleString} and ${new Date(mtimeB).toLocaleString}`); + // console.log(`mtime:${mtimeA} - ${mtimeB}`); + const delRev = mtimeA < mtimeB ? revA : revB; + // delete older one. + await this.localDatabase.localDatabase.remove(id, delRev); + Logger(`Older one has been deleted:${id2filenameInternalChunk(id)}`); + // check the file again + return this.resolveConflictOnInternalFile(id); + + } //TODO: Tidy up. Even though it is experimental feature, So dirty... async syncInternalFilesAndDatabase(direction: "push" | "pull" | "safe", showMessage: boolean, files: InternalFileInfo[] | false = false, targetFiles: string[] | false = false) { + await this.resolveConflictOnInternalFiles(); const logLevel = showMessage ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO; Logger("Scanning hidden files.", logLevel, "sync_internal"); const ignorePatterns = this.settings.syncInternalFilesIgnorePatterns.toLocaleLowerCase() @@ -2409,73 +2542,52 @@ export default class ObsidianLiveSyncPlugin extends Plugin { const fileOnStorage = files.find(e => e.path == filename); const fileOnDatabase = filesOnDB.find(e => e._id == filename2idInternalChunk(id2path(filename))); - // TODO: Fix this somehow smart. - let proc: Promise | null; - - if (fileOnStorage && fileOnDatabase) { - // Both => Synchronize - const cache = filename in caches ? caches[filename] : { storageMtime: 0, docMtime: 0 }; - if (fileOnDatabase.mtime == cache.docMtime && fileOnStorage.mtime == cache.storageMtime) { - continue; - } - const nw = compareMTime(fileOnStorage.mtime, fileOnDatabase.mtime); - if (nw == 0) continue; - - if (nw > 0) { - proc = (async (fileOnStorage) => { - await this.storeInternaFileToDatabase(fileOnStorage); - cache.docMtime = fileOnDatabase.mtime; - cache.storageMtime = fileOnStorage.mtime; - caches[filename] = cache; - })(fileOnStorage); - - } - if (nw < 0) { - proc = (async (filename) => { - if (await this.extractInternaFileFromDatabase(filename)) { - cache.docMtime = fileOnDatabase.mtime; - cache.storageMtime = fileOnStorage.mtime; - caches[filename] = cache; - countUpdatedFolder(filename); - } - })(filename); - - } - } else if (!fileOnStorage && fileOnDatabase) { - if (direction == "push") { - if (fileOnDatabase.deleted) { - // await this.storeInternaFileToDatabase(fileOnStorage); - } else { - proc = (async () => { - await this.deleteInternaFileOnDatabase(filename); - })(); - } - } else if (direction == "pull") { - proc = (async () => { - if (await this.extractInternaFileFromDatabase(filename)) { - countUpdatedFolder(filename); - } - })(); - } else if (direction == "safe") { - if (fileOnDatabase.deleted) { - // await this.storeInternaFileToDatabase(fileOnStorage); - } else { - proc = (async () => { - if (await this.extractInternaFileFromDatabase(filename)) { - countUpdatedFolder(filename); - } - })(); - } - } - } else if (fileOnStorage && !fileOnDatabase) { - proc = (async () => { - await this.storeInternaFileToDatabase(fileOnStorage); - })(); - } else { - throw new Error("Invalid state on hidden file sync"); - // Something corrupted? + const addProc = (p: () => Promise): Promise => { + return p(); } - if (proc) p.add(proc); + const cache = filename in caches ? caches[filename] : { storageMtime: 0, docMtime: 0 }; + + p.add(addProc(async () => { + if (fileOnStorage && fileOnDatabase) { + // Both => Synchronize + if (fileOnDatabase.mtime == cache.docMtime && fileOnStorage.mtime == cache.storageMtime) { + return; + } + const nw = compareMTime(fileOnStorage.mtime, fileOnDatabase.mtime); + if (nw > 0) { + await this.storeInternaFileToDatabase(fileOnStorage); + } + if (nw < 0) { + // skip if not extraction performed. + if (!await this.extractInternaFileFromDatabase(filename)) return; + } + // If process successfly updated or file contents are same, update cache. + cache.docMtime = fileOnDatabase.mtime; + cache.storageMtime = fileOnStorage.mtime; + caches[filename] = cache; + countUpdatedFolder(filename); + } else if (!fileOnStorage && fileOnDatabase) { + console.log("pushpull") + if (direction == "push") { + if (fileOnDatabase.deleted) return; + await this.deleteInternaFileOnDatabase(filename); + } else if (direction == "pull") { + if (await this.extractInternaFileFromDatabase(filename)) { + countUpdatedFolder(filename); + } + } else if (direction == "safe") { + if (fileOnDatabase.deleted) return + if (await this.extractInternaFileFromDatabase(filename)) { + countUpdatedFolder(filename); + } + } + } else if (fileOnStorage && !fileOnDatabase) { + await this.storeInternaFileToDatabase(fileOnStorage); + } else { + throw new Error("Invalid state on hidden file sync"); + // Something corrupted? + } + })); await p.wait(limit); } await p.all(); diff --git a/updates.md b/updates.md index 5f48f34..66a469a 100644 --- a/updates.md +++ b/updates.md @@ -9,4 +9,9 @@ #### Minors - 0.13.1 Fixed on conflict resolution. -- 0.13.2 Fixed file deletion failures. \ No newline at end of file +- 0.13.2 Fixed file deletion failures. +- 0.13.4 + - Now, we can synchronise hidden files that conflicted on each devices. + - We can search for conflicting docs. + - Pending processes can now be run at any time. + - Performance improved on synchronising large numbers of files at once. \ No newline at end of file