- Error handling on booting now works fine.
  - Replication is now started automatically in LiveSync mode.
  - Batch database update is now disabled in LiveSync mode.
  - No longer automatically reconnection while off-focused.
  - Status saves are thinned out.
  - Now Self-hosted LiveSync waits for all files between the local database and storage to be surely checked.
- Improved:
  - The job scheduler is now more robust and stable.
  - The status indicator no longer flickers and keeps zero for a while.
  - No longer meaningless frequent updates of status indicators.
  - Now we can configure regular expression filters in handy UI. Thank you so much, @eth-p!
  - `Fetch` or `Rebuild everything` is now more safely performed.
- Minor things
  - Some utility function has been added.
  - Customisation sync now less wrong messages.
  - Digging the weeds for eradication of type errors.
This commit is contained in:
vorotamoroz
2024-04-12 01:30:35 +09:00
parent 6952ef37f5
commit d54b7e2d93
10 changed files with 361 additions and 197 deletions
+179 -104
View File
@@ -4,7 +4,7 @@ import { type Diff, DIFF_DELETE, DIFF_EQUAL, DIFF_INSERT, diff_match_patch, stri
import { Notice, Plugin, TFile, addIcon, TFolder, normalizePath, TAbstractFile, Editor, MarkdownView, type RequestUrlParam, type RequestUrlResponse, requestUrl, type MarkdownFileInfo } from "./deps";
import { type EntryDoc, type LoadedEntry, type ObsidianLiveSyncSettings, type diff_check_result, type diff_result_leaf, type EntryBody, LOG_LEVEL, VER, DEFAULT_SETTINGS, type diff_result, FLAGMD_REDFLAG, SYNCINFO_ID, SALT_OF_PASSPHRASE, type ConfigPassphraseStore, type CouchDBConnection, FLAGMD_REDFLAG2, FLAGMD_REDFLAG3, PREFIXMD_LOGFILE, type DatabaseConnectingStatus, type EntryHasPath, type DocumentID, type FilePathWithPrefix, type FilePath, type AnyEntry, LOG_LEVEL_DEBUG, LOG_LEVEL_INFO, LOG_LEVEL_NOTICE, LOG_LEVEL_URGENT, LOG_LEVEL_VERBOSE, type SavingEntry, MISSING_OR_ERROR, NOT_CONFLICTED, AUTO_MERGED, CANCELLED, LEAVE_TO_SUBSEQUENT, FLAGMD_REDFLAG2_HR, FLAGMD_REDFLAG3_HR, } from "./lib/src/types";
import { type InternalFileInfo, type CacheData, type FileEventItem, FileWatchEventQueueMax } from "./types";
import { arrayToChunkedArray, createBlob, determineTypeFromBlob, fireAndForget, getDocData, isAnyNote, isDocContentSame, isObjectDifferent, readContent, sendValue } from "./lib/src/utils";
import { arrayToChunkedArray, createBlob, delay, determineTypeFromBlob, fireAndForget, getDocData, isAnyNote, isDocContentSame, isObjectDifferent, readContent, sendValue, throttle } from "./lib/src/utils";
import { Logger, setGlobalLogFunction } from "./lib/src/logger";
import { PouchDB } from "./lib/src/pouchdb-browser.js";
import { ConflictResolveModal } from "./ConflictResolveModal";
@@ -31,7 +31,7 @@ import { GlobalHistoryView, VIEW_TYPE_GLOBAL_HISTORY } from "./GlobalHistoryView
import { LogPaneView, VIEW_TYPE_LOG } from "./LogPaneView";
import { LRUCache } from "./lib/src/LRUCache";
import { SerializedFileAccess } from "./SerializedFileAccess.js";
import { KeyedQueueProcessor, QueueProcessor, type QueueItemWithKey } from "./lib/src/processor.js";
import { QueueProcessor } from "./lib/src/processor.js";
import { reactive, reactiveSource } from "./lib/src/reactive.js";
import { initializeStores } from "./stores.js";
@@ -312,8 +312,9 @@ export default class ObsidianLiveSyncPlugin extends Plugin
this.replicator = new LiveSyncDBReplicator(this);
}
async onResetDatabase(db: LiveSyncLocalDB): Promise<void> {
const lsKey = "obsidian-livesync-queuefiles-" + this.getVaultName();
localStorage.removeItem(lsKey);
const kvDBKey = "queued-files"
this.kvDB.del(kvDBKey);
// localStorage.removeItem(lsKey);
await this.kvDB.destroy();
this.kvDB = await OpenKeyValueDatabase(db.dbname + "-livesync-kv");
this.replicator = new LiveSyncDBReplicator(this);
@@ -535,7 +536,7 @@ Click anywhere to stop counting down.
this.registerWatchEvents();
await this.realizeSettingSyncMode();
this.swapSaveCommand();
if (this.settings.syncOnStart) {
if (!this.settings.liveSync && this.settings.syncOnStart) {
this.replicator.openReplication(this.settings, false, false);
}
this.scanStat();
@@ -1007,7 +1008,7 @@ Note: We can always able to read V1 format. It will be progressively converted.
}
this.deviceAndVaultName = localStorage.getItem(lsKey) || "";
this.ignoreFiles = this.settings.ignoreFiles.split(",").map(e => e.trim());
this.fileEventQueue.delay = this.settings.batchSave ? 5000 : 100;
this.fileEventQueue.delay = (!this.settings.liveSync && this.settings.batchSave) ? 5000 : 100;
}
async saveSettingData() {
@@ -1039,7 +1040,7 @@ Note: We can always able to read V1 format. It will be progressively converted.
}
await this.saveData(settings);
this.localDatabase.settings = this.settings;
this.fileEventQueue.delay = this.settings.batchSave ? 5000 : 100;
this.fileEventQueue.delay = (!this.settings.liveSync && this.settings.batchSave) ? 5000 : 100;
this.ignoreFiles = this.settings.ignoreFiles.split(",").map(e => e.trim());
if (this.settings.settingSyncFile != "") {
fireAndForget(() => this.saveSettingToMarkdown(this.settings.settingSyncFile));
@@ -1237,9 +1238,13 @@ We can perform a command in this file.
_this.performCommand('editor:save-file');
};
}
hasFocus = true;
isLastHidden = false;
registerWatchEvents() {
this.registerEvent(this.app.workspace.on("file-open", this.watchWorkspaceOpen));
this.registerDomEvent(document, "visibilitychange", this.watchWindowVisibility);
this.registerDomEvent(window, "focus", () => this.setHasFocus(true));
this.registerDomEvent(window, "blur", () => this.setHasFocus(false));
this.registerDomEvent(window, "online", this.watchOnline);
this.registerDomEvent(window, "offline", this.watchOnline);
}
@@ -1255,15 +1260,30 @@ We can perform a command in this file.
await this.syncAllFiles();
}
}
setHasFocus(hasFocus: boolean) {
this.hasFocus = hasFocus;
this.watchWindowVisibility();
}
watchWindowVisibility() {
scheduleTask("watch-window-visibility", 500, () => fireAndForget(() => this.watchWindowVisibilityAsync()));
scheduleTask("watch-window-visibility", 100, () => fireAndForget(() => this.watchWindowVisibilityAsync()));
}
async watchWindowVisibilityAsync() {
if (this.settings.suspendFileWatching) return;
if (!this.settings.isConfigured) return;
if (!this.isReady) return;
if (this.isLastHidden && !this.hasFocus) {
// NO OP while non-focused after made hidden;
return;
}
const isHidden = document.hidden;
if (this.isLastHidden === isHidden) {
return;
}
this.isLastHidden = isHidden;
await this.applyBatchChange();
if (isHidden) {
this.replicator.closeReplication();
@@ -1283,12 +1303,12 @@ We can perform a command in this file.
}
cancelRelativeEvent(item: FileEventItem) {
this.fileEventQueue.modifyQueue((items) => [...items.filter(e => e.entity.key != item.key)])
this.fileEventQueue.modifyQueue((items) => [...items.filter(e => e.key != item.key)])
}
queueNextFileEvent(items: QueueItemWithKey<FileEventItem>[], newItem: QueueItemWithKey<FileEventItem>): QueueItemWithKey<FileEventItem>[] {
queueNextFileEvent(items: FileEventItem[], newItem: FileEventItem): FileEventItem[] {
if (this.settings.batchSave && !this.settings.liveSync) {
const file = newItem.entity.args.file;
const file = newItem.args.file;
// if the latest event is the same type, omit that
// a.md MODIFY <- this should be cancelled when a.md MODIFIED
// b.md MODIFY <- this should be cancelled when b.md MODIFIED
@@ -1300,16 +1320,16 @@ We can perform a command in this file.
while (i >= 0) {
i--;
if (i < 0) break L1;
if (items[i].entity.args.file.path != file.path) {
if (items[i].args.file.path != file.path) {
continue L1;
}
if (items[i].entity.type != newItem.entity.type) break L1;
if (items[i].type != newItem.type) break L1;
items.remove(items[i]);
}
}
items.push(newItem);
// When deleting or renaming, the queue must be flushed once before processing subsequent processes to prevent unexpected race condition.
if (newItem.entity.type == "DELETE" || newItem.entity.type == "RENAME") {
if (newItem.type == "DELETE" || newItem.type == "RENAME") {
this.fileEventQueue.requestNextFlush();
}
return items;
@@ -1363,7 +1383,7 @@ We can perform a command in this file.
pendingFileEventCount = reactiveSource(0);
processingFileEventCount = reactiveSource(0);
fileEventQueue =
new KeyedQueueProcessor(
new QueueProcessor(
(items: FileEventItem[]) => this.handleFileEvent(items[0]),
{ suspended: true, batchSize: 1, concurrentLimit: 5, delay: 100, yieldThreshold: FileWatchEventQueueMax, totalRemainingReactiveSource: this.pendingFileEventCount, processingEntitiesReactiveSource: this.processingFileEventCount }
).replaceEnqueueProcessor((items, newItem) => this.queueNextFileEvent(items, newItem));
@@ -1622,21 +1642,32 @@ We can perform a command in this file.
this.conflictCheckQueue.enqueue(path);
}
_saveQueuedFiles = throttle(() => {
const saveData = this.replicationResultProcessor._queue.filter(e => e !== undefined && e !== null).map((e) => e?._id ?? "" as string) as string[];
const kvDBKey = "queued-files"
// localStorage.setItem(lsKey, saveData);
fireAndForget(() => this.kvDB.set(kvDBKey, saveData));
}, 100);
saveQueuedFiles() {
const saveData = JSON.stringify(this.replicationResultProcessor._queue.map((e) => e._id));
const lsKey = "obsidian-livesync-queuefiles-" + this.getVaultName();
localStorage.setItem(lsKey, saveData);
this._saveQueuedFiles();
}
async loadQueuedFiles() {
if (this.settings.suspendParseReplicationResult) return;
if (!this.settings.isConfigured) return;
const lsKey = "obsidian-livesync-queuefiles-" + this.getVaultName();
const ids = [...new Set(JSON.parse(localStorage.getItem(lsKey) || "[]"))] as string[];
const kvDBKey = "queued-files"
// const ids = [...new Set(JSON.parse(localStorage.getItem(lsKey) || "[]"))] as string[];
const ids = [...new Set(await this.kvDB.get<string[]>(kvDBKey) ?? [])];
const batchSize = 100;
const chunkedIds = arrayToChunkedArray(ids, batchSize);
for await (const idsBatch of chunkedIds) {
const ret = await this.localDatabase.allDocsRaw<EntryDoc>({ keys: idsBatch, include_docs: true, limit: 100 });
this.replicationResultProcessor.enqueueAll(ret.rows.map(doc => doc.doc!));
const docs = ret.rows.filter(e => e.doc).map(e => e.doc) as PouchDB.Core.ExistingDocument<EntryDoc>[];
const errors = ret.rows.filter(e => !e.doc && !e.value.deleted);
if (errors.length > 0) {
Logger("Some queued processes were not resurrected");
Logger(JSON.stringify(errors), LOG_LEVEL_VERBOSE);
}
this.replicationResultProcessor.enqueueAll(docs);
await this.replicationResultProcessor.waitForPipeline();
}
@@ -1658,34 +1689,43 @@ We can perform a command in this file.
const filename = this.getPathWithoutPrefix(doc);
this.isTargetFile(filename).then((ret) => ret ? this.addOnHiddenFileSync.procInternalFile(filename) : Logger(`Skipped (Not target:${filename})`, LOG_LEVEL_VERBOSE));
} else if (isValidPath(this.getPath(doc))) {
this.storageApplyingProcessor.enqueueWithKey(doc.path, doc);
this.storageApplyingProcessor.enqueue(doc);
} else {
Logger(`Skipped: ${doc._id.substring(0, 8)}`, LOG_LEVEL_VERBOSE);
}
return;
}, { suspended: true, batchSize: 1, concurrentLimit: 10, yieldThreshold: 1, delay: 0, totalRemainingReactiveSource: this.databaseQueueCount }).startPipeline();
}, { suspended: true, batchSize: 1, concurrentLimit: 10, yieldThreshold: 1, delay: 0, totalRemainingReactiveSource: this.databaseQueueCount }).replaceEnqueueProcessor((queue, newItem) => {
const q = queue.filter(e => e._id != newItem._id);
return [...q, newItem];
}).startPipeline();
storageApplyingCount = reactiveSource(0);
storageApplyingProcessor = new KeyedQueueProcessor(async (docs: LoadedEntry[]) => {
storageApplyingProcessor = new QueueProcessor(async (docs: LoadedEntry[]) => {
const entry = docs[0];
const path = this.getPath(entry);
Logger(`Processing ${path} (${entry._id.substring(0, 8)}: ${entry._rev?.substring(0, 5)}) :Started...`, LOG_LEVEL_VERBOSE);
const targetFile = this.vaultAccess.getAbstractFileByPath(this.getPathWithoutPrefix(entry));
if (targetFile instanceof TFolder) {
Logger(`${this.getPath(entry)} is already exist as the folder`);
} else {
await this.processEntryDoc(entry, targetFile instanceof TFile ? targetFile : undefined);
Logger(`Processing ${path} (${entry._id.substring(0, 8)} :${entry._rev?.substring(0, 5)}) : Done`);
}
await serialized(entry.path, async () => {
const path = this.getPath(entry);
Logger(`Processing ${path} (${entry._id.substring(0, 8)}: ${entry._rev?.substring(0, 5)}) :Started...`, LOG_LEVEL_VERBOSE);
const targetFile = this.vaultAccess.getAbstractFileByPath(this.getPathWithoutPrefix(entry));
if (targetFile instanceof TFolder) {
Logger(`${this.getPath(entry)} is already exist as the folder`);
} else {
await this.processEntryDoc(entry, targetFile instanceof TFile ? targetFile : undefined);
Logger(`Processing ${path} (${entry._id.substring(0, 8)} :${entry._rev?.substring(0, 5)}) : Done`);
}
});
return;
}, { suspended: true, batchSize: 1, concurrentLimit: 2, yieldThreshold: 1, delay: 0, totalRemainingReactiveSource: this.storageApplyingCount }).startPipeline()
}, { suspended: true, batchSize: 1, concurrentLimit: 6, yieldThreshold: 1, delay: 0, totalRemainingReactiveSource: this.storageApplyingCount }).replaceEnqueueProcessor((queue, newItem) => {
const q = queue.filter(e => e._id != newItem._id);
return [...q, newItem];
}).startPipeline()
replicationResultCount = reactiveSource(0);
replicationResultProcessor = new QueueProcessor(async (docs: PouchDB.Core.ExistingDocument<EntryDoc>[]) => {
if (this.settings.suspendParseReplicationResult) return;
const change = docs[0];
if (!change) return;
if (isChunk(change._id)) {
// SendSignal?
// this.parseIncomingChunk(change);
@@ -1722,16 +1762,19 @@ We can perform a command in this file.
this.databaseQueuedProcessor.enqueue(change);
}
return;
}, { batchSize: 1, suspended: true, concurrentLimit: 100, delay: 0, totalRemainingReactiveSource: this.replicationResultCount }).startPipeline().onUpdateProgress(() => {
}, { batchSize: 1, suspended: true, concurrentLimit: 100, delay: 0, totalRemainingReactiveSource: this.replicationResultCount }).replaceEnqueueProcessor((queue, newItem) => {
const q = queue.filter(e => e._id != newItem._id);
return [...q, newItem];
}).startPipeline().onUpdateProgress(() => {
this.saveQueuedFiles();
});
//---> Sync
parseReplicationResult(docs: Array<PouchDB.Core.ExistingDocument<EntryDoc>>) {
if (this.settings.suspendParseReplicationResult) {
if (this.settings.suspendParseReplicationResult && !this.replicationResultProcessor.isSuspended) {
this.replicationResultProcessor.suspend()
}
this.replicationResultProcessor.enqueueAll(docs);
if (!this.settings.suspendParseReplicationResult) {
if (!this.settings.suspendParseReplicationResult && this.replicationResultProcessor.isSuspended) {
this.replicationResultProcessor.resume()
}
}
@@ -1761,8 +1804,33 @@ We can perform a command in this file.
lastMessage = "";
observeForLogs() {
const padSpaces = `\u{2007}`.repeat(10);
// const emptyMark = `\u{2003}`;
const rerenderTimer = new Map<string, [ReturnType<typeof setTimeout>, number]>;
const tick = reactiveSource(0);
function padLeftSp(num: number, mark: string) {
const numLen = `${num}`.length + 1;
const [timer, len] = rerenderTimer.get(mark) ?? [undefined, numLen];
if (num || timer) {
if (num) {
if (timer) clearTimeout(timer);
rerenderTimer.set(mark, [setTimeout(async () => {
rerenderTimer.delete(mark);
await delay(100);
tick.value = tick.value + 1;
}, 3000), Math.max(len, numLen)]);
}
return ` ${mark}${`${padSpaces}${num}`.slice(-(len))}`;
} else {
return "";
}
}
// const logStore
const queueCountLabel = reactive(() => {
// For invalidating
// @ts-ignore
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const _ = tick.value;
const dbCount = this.databaseQueueCount.value;
const replicationCount = this.replicationResultCount.value;
const storageApplyingCount = this.storageApplyingCount.value;
@@ -1770,13 +1838,13 @@ We can perform a command in this file.
const pluginScanCount = pluginScanningCount.value;
const hiddenFilesCount = hiddenFilesEventCount.value + hiddenFilesProcessingCount.value;
const conflictProcessCount = this.conflictProcessQueueCount.value;
const labelReplication = replicationCount ? `📥 ${replicationCount} ` : "";
const labelDBCount = dbCount ? `📄 ${dbCount} ` : "";
const labelStorageCount = storageApplyingCount ? `💾 ${storageApplyingCount}` : "";
const labelChunkCount = chunkCount ? `🧩${chunkCount} ` : "";
const labelPluginScanCount = pluginScanCount ? `🔌${pluginScanCount} ` : "";
const labelHiddenFilesCount = hiddenFilesCount ? `⚙️${hiddenFilesCount} ` : "";
const labelConflictProcessCount = conflictProcessCount ? `🔩${conflictProcessCount} ` : "";
const labelReplication = padLeftSp(replicationCount, `📥`);
const labelDBCount = padLeftSp(dbCount, `📄`);
const labelStorageCount = padLeftSp(storageApplyingCount, `💾`);
const labelChunkCount = padLeftSp(chunkCount, `🧩`);
const labelPluginScanCount = padLeftSp(pluginScanCount, `🔌`);
const labelHiddenFilesCount = padLeftSp(hiddenFilesCount, `⚙️`)
const labelConflictProcessCount = padLeftSp(conflictProcessCount, `🔩`);
return `${labelReplication}${labelDBCount}${labelStorageCount}${labelChunkCount}${labelPluginScanCount}${labelHiddenFilesCount}${labelConflictProcessCount}`;
})
const requestingStatLabel = reactive(() => {
@@ -1821,11 +1889,15 @@ We can perform a command in this file.
return { w, sent, pushLast, arrived, pullLast };
})
const waitingLabel = reactive(() => {
// For invalidating
// @ts-ignore
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const _ = tick.value;
const e = this.pendingFileEventCount.value;
const proc = this.processingFileEventCount.value;
const pend = e - proc;
const labelProc = proc != 0 ? `${proc} ` : "";
const labelPend = pend != 0 ? ` 🛫${pend}` : "";
const labelProc = padLeftSp(proc, ``);
const labelPend = padLeftSp(pend, `🛫`);
return `${labelProc}${labelPend}`;
})
const statusLineLabel = reactive(() => {
@@ -1834,7 +1906,7 @@ We can perform a command in this file.
const waiting = waitingLabel.value;
const networkActivity = requestingStatLabel.value;
return {
message: `${networkActivity}Sync: ${w}${sent}${pushLast}${arrived}${pullLast}${waiting} ${queued}`,
message: `${networkActivity}Sync: ${w} ${sent}${pushLast} ${arrived}${pullLast}${waiting}${queued}`,
};
})
const statusBarLabels = reactive(() => {
@@ -1845,31 +1917,20 @@ We can perform a command in this file.
message, status
}
})
let last = 0;
const applyToDisplay = () => {
const applyToDisplay = throttle(() => {
const v = statusBarLabels.value;
const now = Date.now();
if (now - last < 10) {
scheduleTask("applyToDisplay", 20, () => applyToDisplay());
return;
}
this.applyStatusBarText(v.message, v.status);
last = now;
}
}, 20);
statusBarLabels.onChanged(applyToDisplay);
}
applyStatusBarText(message: string, log: string) {
const newMsg = message;
const newLog = log;
// scheduleTask("update-display", 50, () => {
const newMsg = message.replace(/\n/g, "\\A ");
const newLog = log.replace(/\n/g, "\\A ");
this.statusBar?.setText(newMsg.split("\n")[0]);
// const selector = `.CodeMirror-wrap,` +
// `.markdown-preview-view.cm-s-obsidian,` +
// `.markdown-source-view.cm-s-obsidian,` +
// `.canvas-wrapper,` +
// `.empty-state`
// ;
if (this.settings.showStatusOnEditor) {
const root = activeDocument.documentElement;
root.style.setProperty("--sls-log-text", "'" + (newMsg + "\\A " + newLog) + "'");
@@ -1877,7 +1938,6 @@ We can perform a command in this file.
// const root = activeDocument.documentElement;
// root.style.setProperty("--log-text", "'" + (newMsg + "\\A " + newLog) + "'");
}
// }, true);
scheduleTask("log-hide", 3000, () => { this.statusLog.value = "" });
@@ -2053,9 +2113,15 @@ Or if you are sure know what had been happened, we can unlock the database from
const syncFiles = filesStorage.filter((e) => onlyInStorageNames.indexOf(e.path) == -1);
Logger("Updating database by new files");
const processStatus = {} as Record<string, string>;
const logLevel = showingNotice ? LOG_LEVEL_NOTICE : LOG_LEVEL_INFO;
const updateLog = throttle((key: string, msg: string) => {
processStatus[key] = msg;
const log = Object.values(processStatus).join("\n");
Logger(log, logLevel, "syncAll");
}, 25);
const initProcess = [];
const logLevel = showingNotice ? LOG_LEVEL_NOTICE : LOG_LEVEL_INFO;
const runAll = async<T>(procedureName: string, objects: T[], callback: (arg: T) => Promise<void>) => {
if (objects.length == 0) {
Logger(`${procedureName}: Nothing to do`);
@@ -2077,12 +2143,14 @@ Or if you are sure know what had been happened, we can unlock the database from
failed++;
}
if ((success + failed) % step == 0) {
Logger(`${procedureName}: DONE:${success}, FAILED:${failed}, LAST:${processor._queue.length}`, logLevel, `log-${procedureName}`);
const msg = `${procedureName}: DONE:${success}, FAILED:${failed}, LAST:${processor._queue.length}`;
updateLog(procedureName, msg);
}
return;
}, { batchSize: 1, concurrentLimit: 10, delay: 0, suspended: true }, objects)
await processor.waitForPipeline();
Logger(`${procedureName} All done: DONE:${success}, FAILED:${failed}`, logLevel, `log-${procedureName}`);
const msg = `${procedureName} All done: DONE:${success}, FAILED:${failed}`;
updateLog(procedureName, msg)
}
initProcess.push(runAll("UPDATE DATABASE", onlyInStorage, async (e) => {
if (!this.isFileSizeExceeded(e.stat.size)) {
@@ -2116,7 +2184,6 @@ Or if you are sure know what had been happened, we can unlock the database from
const id = await this.path2id(getPathFromTFile(file));
const pair: FileDocPair = { file, id };
return [pair];
// processSyncFile.enqueue(pair);
}
, { batchSize: 1, concurrentLimit: 10, delay: 0, suspended: true }, syncFiles);
processPrepareSyncFile
@@ -2138,10 +2205,18 @@ Or if you are sure know what had been happened, we can unlock the database from
}, { batchSize: 1, concurrentLimit: 5, delay: 10, suspended: false }
))
processPrepareSyncFile.startPipeline();
initProcess.push(async () => {
await processPrepareSyncFile.waitForPipeline();
})
const allSyncFiles = syncFiles.length;
let lastRemain = allSyncFiles;
const step = 25;
const remainLog = (remain: number) => {
if (lastRemain - remain > step) {
const msg = ` CHECK AND SYNC: ${remain} / ${allSyncFiles}`;
updateLog("sync", msg);
lastRemain = remain;
}
}
processPrepareSyncFile.startPipeline().onUpdateProgress(() => remainLog(processPrepareSyncFile.totalRemaining + processPrepareSyncFile.nowProcessing))
initProcess.push(processPrepareSyncFile.waitForPipeline());
await Promise.all(initProcess);
// this.setStatusBarText(`NOW TRACKING!`);
@@ -2501,38 +2576,39 @@ Or if you are sure know what had been happened, we can unlock the database from
conflictProcessQueueCount = reactiveSource(0);
conflictResolveQueue =
new KeyedQueueProcessor(async (entries: { filename: FilePathWithPrefix }[]) => {
const entry = entries[0];
const filename = entry.filename;
const conflictCheckResult = await this.checkConflictAndPerformAutoMerge(filename);
if (conflictCheckResult === MISSING_OR_ERROR || conflictCheckResult === NOT_CONFLICTED || conflictCheckResult === CANCELLED) {
// nothing to do.
return;
}
if (conflictCheckResult === AUTO_MERGED) {
//auto resolved, but need check again;
if (this.settings.syncAfterMerge && !this.suspended) {
//Wait for the running replication, if not running replication, run it once.
await shareRunningResult(`replication`, () => this.replicate());
}
Logger("conflict:Automatically merged, but we have to check it again");
this.conflictCheckQueue.enqueue(filename);
return;
}
if (this.settings.showMergeDialogOnlyOnActive) {
const af = this.getActiveFile();
if (af && af.path != filename) {
Logger(`${filename} is conflicted. Merging process has been postponed to the file have got opened.`, LOG_LEVEL_NOTICE);
new QueueProcessor(async (filenames: FilePathWithPrefix[]) => {
const filename = filenames[0];
await serialized(`conflict-resolve:${filename}`, async () => {
const conflictCheckResult = await this.checkConflictAndPerformAutoMerge(filename);
if (conflictCheckResult === MISSING_OR_ERROR || conflictCheckResult === NOT_CONFLICTED || conflictCheckResult === CANCELLED) {
// nothing to do.
return;
}
}
Logger("conflict:Manual merge required!");
await this.resolveConflictByUI(filename, conflictCheckResult);
if (conflictCheckResult === AUTO_MERGED) {
//auto resolved, but need check again;
if (this.settings.syncAfterMerge && !this.suspended) {
//Wait for the running replication, if not running replication, run it once.
await shareRunningResult(`replication`, () => this.replicate());
}
Logger("conflict:Automatically merged, but we have to check it again");
this.conflictCheckQueue.enqueue(filename);
return;
}
if (this.settings.showMergeDialogOnlyOnActive) {
const af = this.getActiveFile();
if (af && af.path != filename) {
Logger(`${filename} is conflicted. Merging process has been postponed to the file have got opened.`, LOG_LEVEL_NOTICE);
return;
}
}
Logger("conflict:Manual merge required!");
await this.resolveConflictByUI(filename, conflictCheckResult);
});
}, { suspended: false, batchSize: 1, concurrentLimit: 1, delay: 10, keepResultUntilDownstreamConnected: false }).replaceEnqueueProcessor(
(queue, newEntity) => {
const filename = newEntity.entity.filename;
const filename = newEntity;
sendValue("cancel-resolve-conflict:" + filename, true);
const newQueue = [...queue].filter(e => e.key != newEntity.key);
const newQueue = [...queue].filter(e => e != newEntity);
return [...newQueue, newEntity];
});
@@ -2544,10 +2620,9 @@ Or if you are sure know what had been happened, we can unlock the database from
const file = this.vaultAccess.getAbstractFileByPath(filename);
// if (!file) return;
// if (!(file instanceof TFile)) return;
if ((file instanceof TFolder)) return;
if ((file instanceof TFolder)) return [];
// Check again?
return [{ key: filename, entity: { filename } }];
return [filename];
// this.conflictResolveQueue.enqueueWithKey(filename, { filename, file });
}, {
suspended: false, batchSize: 1, concurrentLimit: 5, delay: 10, keepResultUntilDownstreamConnected: true, pipeTo: this.conflictResolveQueue, totalRemainingReactiveSource: this.conflictProcessQueueCount