mirror of
https://github.com/vrtmrz/obsidian-livesync.git
synced 2026-05-28 18:12:56 +00:00
New Feature:
- Skip conflicted check while replication Fixed: - Rewrited replication reflection algorithm.
This commit is contained in:
+123
-5
@@ -923,12 +923,118 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
|
||||
const doc = change;
|
||||
const file = targetFile;
|
||||
await this.doc2storage_modify(doc, file);
|
||||
this.queueConflictedCheck(file);
|
||||
if (!this.settings.checkConflictOnlyOnOpen) {
|
||||
this.queueConflictedCheck(file);
|
||||
} else {
|
||||
const af = app.workspace.getActiveFile();
|
||||
if (af && af.path == file.path) {
|
||||
this.queueConflictedCheck(file);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Logger(`${id2path(change._id)} is already exist as the folder`);
|
||||
}
|
||||
}
|
||||
|
||||
queuedFiles: {
|
||||
entry: EntryBody;
|
||||
missingChildren: string[];
|
||||
timeout?: number;
|
||||
done?: boolean;
|
||||
warned?: boolean;
|
||||
}[] = [];
|
||||
chunkWaitTimeout = 60000;
|
||||
|
||||
async saveQueuedFiles() {
|
||||
const saveData = JSON.stringify(this.queuedFiles.filter((e) => !e.done).map((e) => e.entry._id));
|
||||
const lsname = "obsidian-livesync-queuefiles-" + this.app.vault.getName();
|
||||
localStorage.setItem(lsname, saveData);
|
||||
}
|
||||
async loadQueuedFiles() {
|
||||
const lsname = "obsidian-livesync-queuefiles-" + this.app.vault.getName();
|
||||
const ids = JSON.parse(localStorage.getItem(lsname) || "[]") as string[];
|
||||
const ret = await this.localDatabase.localDatabase.allDocs({ keys: ids, include_docs: true });
|
||||
for (const doc of ret.rows) {
|
||||
if (doc.doc && !this.queuedFiles.some((e) => e.entry._id == doc.doc._id)) {
|
||||
await this.parseIncomingDoc(doc.doc as PouchDB.Core.ExistingDocument<EntryBody & PouchDB.Core.AllDocsMeta>);
|
||||
}
|
||||
}
|
||||
}
|
||||
async procQueuedFiles() {
|
||||
await runWithLock("procQueue", true, async () => {
|
||||
this.saveQueuedFiles();
|
||||
for (const queue of this.queuedFiles) {
|
||||
if (queue.done) continue;
|
||||
const now = new Date().getTime();
|
||||
if (queue.missingChildren.length == 0) {
|
||||
queue.done = true;
|
||||
Logger(`Applying ${queue.entry._id} (${queue.entry._rev}) change...`);
|
||||
await this.handleDBChanged(queue.entry);
|
||||
}
|
||||
if (now > queue.timeout) {
|
||||
if (!queue.warned) Logger(`Timed out: ${queue.entry._id} could not collect ${queue.missingChildren.length} chunks. plugin keeps watching, but you have to check the file after the replication.`, LOG_LEVEL.NOTICE);
|
||||
queue.warned = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
this.queuedFiles = this.queuedFiles.filter((e) => !e.done);
|
||||
this.saveQueuedFiles();
|
||||
});
|
||||
}
|
||||
parseIncomingChunk(chunk: PouchDB.Core.ExistingDocument<EntryDoc>) {
|
||||
const now = new Date().getTime();
|
||||
let isNewFileCompleted = false;
|
||||
|
||||
for (const queue of this.queuedFiles) {
|
||||
if (queue.done) continue;
|
||||
if (queue.missingChildren.indexOf(chunk._id) !== -1) {
|
||||
queue.missingChildren = queue.missingChildren.filter((e) => e != chunk._id);
|
||||
queue.timeout = now + this.chunkWaitTimeout;
|
||||
}
|
||||
if (queue.missingChildren.length == 0) {
|
||||
for (const e of this.queuedFiles) {
|
||||
if (e.entry._id == queue.entry._id && e.entry.mtime < queue.entry.mtime) {
|
||||
e.done = true;
|
||||
}
|
||||
}
|
||||
isNewFileCompleted = true;
|
||||
}
|
||||
}
|
||||
if (isNewFileCompleted) this.procQueuedFiles();
|
||||
}
|
||||
async parseIncomingDoc(doc: PouchDB.Core.ExistingDocument<EntryBody>) {
|
||||
const skipOldFile = this.settings.skipOlderFilesOnSync;
|
||||
if (skipOldFile) {
|
||||
const info = this.app.vault.getAbstractFileByPath(id2path(doc._id));
|
||||
|
||||
if (info && info instanceof TFile) {
|
||||
const localMtime = ~~((info as TFile).stat.mtime / 1000);
|
||||
const docMtime = ~~(doc.mtime / 1000);
|
||||
if (localMtime >= docMtime) {
|
||||
Logger(`${doc._id} Skipped, older than storage.`, LOG_LEVEL.VERBOSE);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
const now = new Date().getTime();
|
||||
const newQueue = {
|
||||
entry: doc,
|
||||
missingChildren: [] as string[],
|
||||
timeout: now + this.chunkWaitTimeout,
|
||||
};
|
||||
if ("children" in doc) {
|
||||
const c = await this.localDatabase.localDatabase.allDocs({ keys: doc.children, include_docs: false });
|
||||
const missing = c.rows.filter((e) => "error" in e).map((e) => e.key);
|
||||
if (missing.length) Logger(`${doc._id}(${doc._rev}) Queued (waiting ${missing.length} items)`, LOG_LEVEL.VERBOSE);
|
||||
newQueue.missingChildren = missing;
|
||||
this.queuedFiles.push(newQueue);
|
||||
this.saveQueuedFiles();
|
||||
} else {
|
||||
this.queuedFiles.push(newQueue);
|
||||
this.saveQueuedFiles();
|
||||
this.procQueuedFiles();
|
||||
}
|
||||
}
|
||||
periodicSyncHandler: number = null;
|
||||
|
||||
//---> Sync
|
||||
@@ -942,14 +1048,15 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
|
||||
continue;
|
||||
}
|
||||
if (change._id.startsWith("h:")) {
|
||||
await this.parseIncomingChunk(change);
|
||||
continue;
|
||||
}
|
||||
if (change._id == SYNCINFO_ID) {
|
||||
continue;
|
||||
}
|
||||
if (change.type != "leaf" && change.type != "versioninfo" && change.type != "milestoneinfo" && change.type != "nodeinfo") {
|
||||
Logger("replication change arrived", LOG_LEVEL.VERBOSE);
|
||||
await this.handleDBChanged(change);
|
||||
await this.parseIncomingDoc(change);
|
||||
continue;
|
||||
}
|
||||
if (change.type == "versioninfo") {
|
||||
if (change.version > VER) {
|
||||
@@ -1086,9 +1193,17 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
|
||||
waiting = " " + this.batchFileChange.map((e) => "🛫").join("");
|
||||
waiting = waiting.replace(/(🛫){10}/g, "🚀");
|
||||
}
|
||||
let queued = "";
|
||||
const queue = Object.entries(this.queuedFiles).filter((e) => !e[1].warned);
|
||||
const queuedCount = queue.length;
|
||||
|
||||
if (queuedCount) {
|
||||
const pieces = queue.map((e) => e[1].missingChildren).reduce((prev, cur) => prev + cur.length, 0);
|
||||
queued = ` 🧩 ${queuedCount} (${pieces})`;
|
||||
}
|
||||
const procs = getProcessingCounts();
|
||||
const procsDisp = procs == 0 ? "" : ` ⏳${procs}`;
|
||||
const message = `Sync:${w} ↑${sent} ↓${arrived}${waiting}${procsDisp}`;
|
||||
const message = `Sync:${w} ↑${sent} ↓${arrived}${waiting}${procsDisp}${queued}`;
|
||||
const locks = getLocks();
|
||||
const pendingTask = locks.pending.length ? `\nPending:${locks.pending.join(", ")}` : "";
|
||||
const runningTask = locks.running.length ? `\nRunning:${locks.running.join(", ")}` : "";
|
||||
@@ -1129,6 +1244,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
|
||||
if (this.settings.autoSweepPlugins) {
|
||||
await this.sweepPlugin(false);
|
||||
}
|
||||
await this.loadQueuedFiles();
|
||||
this.localDatabase.openReplication(this.settings, false, showMessage, this.parseReplicationResult);
|
||||
}
|
||||
|
||||
@@ -1168,6 +1284,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
|
||||
if (showingNotice) {
|
||||
notice = NewNotice("Initializing", 0);
|
||||
}
|
||||
|
||||
const filesStorage = this.app.vault.getFiles();
|
||||
const filesStorageName = filesStorage.map((e) => e.path);
|
||||
const wf = await this.localDatabase.localDatabase.allDocs();
|
||||
@@ -1555,6 +1672,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
|
||||
});
|
||||
if (isNotChanged) return;
|
||||
await this.localDatabase.putDBEntry(d);
|
||||
this.queuedFiles = this.queuedFiles.map((e) => ({ ...e, ...(e.entry._id == d._id ? { done: true } : {}) }));
|
||||
|
||||
Logger("put database:" + fullpath + "(" + datatype + ") ");
|
||||
if (this.settings.syncOnSave && !this.suspended) {
|
||||
@@ -1632,7 +1750,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
|
||||
endkey: `ps:${this.deviceAndVaultName}.`,
|
||||
include_docs: true,
|
||||
});
|
||||
Logger("OLD DOCS.", LOG_LEVEL.VERBOSE);
|
||||
// Logger("OLD DOCS.", LOG_LEVEL.VERBOSE);
|
||||
// sweep current plugin.
|
||||
// @ts-ignore
|
||||
const pl = this.app.plugins;
|
||||
|
||||
Reference in New Issue
Block a user