From 29ed4d2b95178e1696aaac0ef866ac4361176daa Mon Sep 17 00:00:00 2001 From: vorotamoroz Date: Fri, 14 Jun 2024 12:35:56 +0100 Subject: [PATCH] Fixed: - No longer batch-saving ignores editor inputs. - The file-watching and serialisation processes have been changed to the one which is similar to previous implementations. - We can configure the settings (Especially about text-boxes) even if we have configured the device name. Improved: - We can configure the delay of batch-saving. - Default: 5 seconds, the same as the previous hard-coded value. (Note: also, the previous behaviour was not correct). - Also, we can configure the limit of delaying batch-saving. - The performance of showing status indicators has been improved. --- README.md | 1 + src/common/types.ts | 3 + src/lib | 2 +- src/main.ts | 202 ++++++++++++-------------- src/storages/StorageEventManager.ts | 205 ++++++++++++++++++++++++--- src/ui/ObsidianLiveSyncSettingTab.ts | 19 +++ src/ui/settingConstants.ts | 12 +- 7 files changed, 308 insertions(+), 136 deletions(-) diff --git a/README.md b/README.md index 6c30f06..7b0c9b0 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,7 @@ Synchronization status is shown in the status bar with the following icons. - 💾 Working write storage processes - ⏳ Working read storage processes - 🛫 Pending read storage processes + - 📬 Batched read storage processes - ⚙️ Working or pending storage processes of hidden files - 🧩 Waiting chunks - 🔌 Working Customisation items (Configuration, snippets, and plug-ins) diff --git a/src/common/types.ts b/src/common/types.ts index 221e8d5..2d0f118 100644 --- a/src/common/types.ts +++ b/src/common/types.ts @@ -60,6 +60,9 @@ export type FileEventItem = { type: FileEventType, args: FileEventArgs, key: string, + skipBatchWait?: boolean, + cancelled?: boolean, + batched?: boolean } // Hidden items (Now means `chunk`) diff --git a/src/lib b/src/lib index eb39d53..acad314 160000 --- a/src/lib +++ b/src/lib @@ -1 +1 @@ -Subproject commit eb39d533e4b4f172ab46e2451c29af36446b9333 +Subproject commit acad314d1aec1465fc4970b2485d733b461ca223 diff --git a/src/main.ts b/src/main.ts index 042afb5..7baf20e 100644 --- a/src/main.ts +++ b/src/main.ts @@ -3,8 +3,8 @@ const isDebug = false; import { type Diff, DIFF_DELETE, DIFF_EQUAL, DIFF_INSERT, diff_match_patch, stringifyYaml, parseYaml } from "./deps"; import { Notice, Plugin, TFile, addIcon, TFolder, normalizePath, TAbstractFile, Editor, MarkdownView, type RequestUrlParam, type RequestUrlResponse, requestUrl, type MarkdownFileInfo } from "./deps"; import { type EntryDoc, type LoadedEntry, type ObsidianLiveSyncSettings, type diff_check_result, type diff_result_leaf, type EntryBody, LOG_LEVEL, VER, DEFAULT_SETTINGS, type diff_result, FLAGMD_REDFLAG, SYNCINFO_ID, SALT_OF_PASSPHRASE, type ConfigPassphraseStore, type CouchDBConnection, FLAGMD_REDFLAG2, FLAGMD_REDFLAG3, PREFIXMD_LOGFILE, type DatabaseConnectingStatus, type EntryHasPath, type DocumentID, type FilePathWithPrefix, type FilePath, type AnyEntry, LOG_LEVEL_DEBUG, LOG_LEVEL_INFO, LOG_LEVEL_NOTICE, LOG_LEVEL_URGENT, LOG_LEVEL_VERBOSE, type SavingEntry, MISSING_OR_ERROR, NOT_CONFLICTED, AUTO_MERGED, CANCELLED, LEAVE_TO_SUBSEQUENT, FLAGMD_REDFLAG2_HR, FLAGMD_REDFLAG3_HR, REMOTE_MINIO, REMOTE_COUCHDB, type BucketSyncSetting, TweakValuesShouldMatchedTemplate, confName, type TweakValues, } from "./lib/src/common/types.ts"; -import { type InternalFileInfo, type CacheData, type FileEventItem, FileWatchEventQueueMax } from "./common/types.ts"; -import { arrayToChunkedArray, createBlob, delay, determineTypeFromBlob, escapeMarkdownValue, extractObject, fireAndForget, getDocData, isAnyNote, isDocContentSame, isObjectDifferent, readContent, sendValue, throttle, type SimpleStore } from "./lib/src/common/utils.ts"; +import { type InternalFileInfo, type CacheData, type FileEventItem } from "./common/types.ts"; +import { arrayToChunkedArray, createBlob, determineTypeFromBlob, escapeMarkdownValue, extractObject, fireAndForget, getDocData, isAnyNote, isDocContentSame, isObjectDifferent, readContent, sendValue, throttle, type SimpleStore } from "./lib/src/common/utils.ts"; import { Logger, setGlobalLogFunction } from "./lib/src/common/logger.ts"; import { PouchDB } from "./lib/src/pouchdb/pouchdb-browser.js"; import { ConflictResolveModal } from "./ui/ConflictResolveModal.ts"; @@ -18,7 +18,7 @@ import { setNoticeClass } from "./lib/src/mock_and_interop/wrapper.ts"; import { versionNumberString2Number, writeString, decodeBinary, readString } from "./lib/src/string_and_binary/convert.ts"; import { addPrefix, isAcceptedAll, isPlainText, shouldBeIgnored, stripAllPrefixes } from "./lib/src/string_and_binary/path.ts"; import { isLockAcquired, serialized, shareRunningResult, skipIfDuplicated } from "./lib/src/concurrency/lock.ts"; -import { StorageEventManager, StorageEventManagerObsidian } from "./storages/StorageEventManager.ts"; +import { StorageEventManager, StorageEventManagerObsidian, type FileEvent } from "./storages/StorageEventManager.ts"; import { LiveSyncLocalDB, type LiveSyncLocalDBEnv } from "./lib/src/pouchdb/LiveSyncLocalDB.ts"; import { LiveSyncAbstractReplicator, type LiveSyncReplicatorEnv } from "./lib/src/replication/LiveSyncAbstractReplicator.js"; import { type KeyValueDatabase, OpenKeyValueDatabase } from "./common/KeyValueDB.ts"; @@ -32,7 +32,7 @@ import { LogPaneView, VIEW_TYPE_LOG } from "./ui/LogPaneView.ts"; import { LRUCache } from "./lib/src/memory/LRUCache.ts"; import { SerializedFileAccess } from "./storages/SerializedFileAccess.js"; import { QueueProcessor, stopAllRunningProcessors } from "./lib/src/concurrency/processor.js"; -import { reactive, reactiveSource, type ReactiveValue } from "./lib/src/dataobject/reactive.js"; +import { computed, reactive, reactiveSource, type ReactiveValue } from "./lib/src/dataobject/reactive.js"; import { initializeStores } from "./common/stores.js"; import { JournalSyncMinio } from "./lib/src/replication/journal/objectstore/JournalSyncMinio.js"; import { LiveSyncJournalReplicator, type LiveSyncJournalReplicatorEnv } from "./lib/src/replication/journal/LiveSyncJournalReplicator.js"; @@ -99,6 +99,15 @@ export default class ObsidianLiveSyncPlugin extends Plugin set suspended(value: boolean) { this._suspended = value; } + get shouldBatchSave() { + return this.settings?.batchSave && this.settings?.liveSync != true; + } + get batchSaveMinimumDelay(): number { + return this.settings?.batchSaveMinimumDelay ?? DEFAULT_SETTINGS.batchSaveMinimumDelay + } + get batchSaveMaximumDelay(): number { + return this.settings?.batchSaveMaximumDelay ?? DEFAULT_SETTINGS.batchSaveMaximumDelay + } deviceAndVaultName = ""; isReady = false; packageVersion = ""; @@ -895,6 +904,7 @@ Note: We can always able to read V1 format. It will be progressively converted. const writePiece = piece.substring(1, piece.length - 1) + ","; fireAndForget(async () => { try { + await this.vaultAccess.ensureDirectory(this.app.vault.configDir + "/ls-debug/"); await this.vaultAccess.adapterAppend(this.app.vault.configDir + "/ls-debug/" + outFile, writePiece + "\n") } catch (ex) { Logger(`Could not write ${outFile}`, LOG_LEVEL_VERBOSE); @@ -946,6 +956,7 @@ Note: We can always able to read V1 format. It will be progressively converted. localStorage.setItem(lsKey, `${VER}`); await this.openDatabase(); this.watchWorkspaceOpen = this.watchWorkspaceOpen.bind(this); + this.watchEditorChange = this.watchEditorChange.bind(this); this.watchWindowVisibility = this.watchWindowVisibility.bind(this) this.watchOnline = this.watchOnline.bind(this); this.realizeSettingSyncMode = this.realizeSettingSyncMode.bind(this); @@ -1136,7 +1147,6 @@ Note: We can always able to read V1 format. It will be progressively converted. } this.deviceAndVaultName = localStorage.getItem(lsKey) || ""; this.ignoreFiles = this.settings.ignoreFiles.split(",").map(e => e.trim()); - this.fileEventQueue.delay = (!this.settings.liveSync && this.settings.batchSave) ? 5000 : 100; this.settingTab.requestReload() } @@ -1185,7 +1195,6 @@ Note: We can always able to read V1 format. It will be progressively converted. this.localDatabase.settings = this.settings; setLang(this.settings.displayLanguage); this.settingTab.requestReload(); - this.fileEventQueue.delay = (!this.settings.liveSync && this.settings.batchSave) ? 5000 : 100; this.ignoreFiles = this.settings.ignoreFiles.split(",").map(e => e.trim()); if (this.settings.settingSyncFile != "") { fireAndForget(() => this.saveSettingToMarkdown(this.settings.settingSyncFile)); @@ -1351,6 +1360,7 @@ We can perform a command in this file. vaultManager: StorageEventManager = new StorageEventManagerObsidian(this); registerFileWatchEvents() { this.vaultManager.beginWatch(); + this.registerEvent(this.app.workspace.on("editor-change", this.watchEditorChange)); } _initialCallback: any; swapSaveCommand() { @@ -1368,8 +1378,8 @@ We can perform a command in this file. saveCommandDefinition.callback = this._initialCallback; this._initialCallback = undefined; } else { - Logger("Sync on Editor Save.", LOG_LEVEL_VERBOSE); if (this.settings.syncOnEditorSave) { + Logger("Sync on Editor Save.", LOG_LEVEL_VERBOSE); this.replicate(); } } @@ -1452,37 +1462,10 @@ We can perform a command in this file. } cancelRelativeEvent(item: FileEventItem) { - this.fileEventQueue.modifyQueue((items) => [...items.filter(e => e.key != item.key)]) + this.vaultManager.cancelQueue(item.key); } - queueNextFileEvent(items: FileEventItem[], newItem: FileEventItem): FileEventItem[] { - if (this.settings.batchSave && !this.settings.liveSync) { - const file = newItem.args.file; - // if the latest event is the same type, omit that - // a.md MODIFY <- this should be cancelled when a.md MODIFIED - // b.md MODIFY <- this should be cancelled when b.md MODIFIED - // a.md MODIFY - // a.md CREATE - // : - let i = items.length; - L1: - while (i >= 0) { - i--; - if (i < 0) break L1; - if (items[i].args.file.path != file.path) { - continue L1; - } - if (items[i].type != newItem.type) break L1; - items.remove(items[i]); - } - } - items.push(newItem); - // When deleting or renaming, the queue must be flushed once before processing subsequent processes to prevent unexpected race condition. - if (newItem.type == "DELETE" || newItem.type == "RENAME") { - this.fileEventQueue.requestNextFlush(); - } - return items; - } + async handleFileEvent(queue: FileEventItem): Promise { const file = queue.args.file; const lockKey = `handleFile:${file.path}`; @@ -1536,21 +1519,8 @@ We can perform a command in this file. pendingFileEventCount = reactiveSource(0); processingFileEventCount = reactiveSource(0); - fileEventQueue = - new QueueProcessor( - async (items: FileEventItem[]) => { - await this.handleFileEvent(items[0]); - return [] - } - , - { suspended: true, batchSize: 1, concurrentLimit: 5, delay: 100, yieldThreshold: FileWatchEventQueueMax, totalRemainingReactiveSource: this.pendingFileEventCount, processingEntitiesReactiveSource: this.processingFileEventCount } - ).replaceEnqueueProcessor((items, newItem) => this.queueNextFileEvent(items, newItem)); - flushFileEventQueue() { - return this.fileEventQueue.flush(); - } - watchWorkspaceOpen(file: TFile | null) { if (this.settings.suspendFileWatching) return; if (!this.settings.isConfigured) return; @@ -1559,6 +1529,34 @@ We can perform a command in this file. scheduleTask("watch-workspace-open", 500, () => fireAndForget(() => this.watchWorkspaceOpenAsync(file))); } + + flushFileEventQueue() { + return this.vaultManager.flushQueue(); + } + + watchEditorChange(editor: Editor, info: any) { + if (!("path" in info)) { + return; + } + if (!this.shouldBatchSave) { + return; + } + const file = info?.file as TFile; + if (!file) return; + if (!this.vaultManager.isWaiting(file.path as FilePath)) { + return; + } + const data = info?.data as string; + const fi: FileEvent = { + type: "CHANGED", + file: file, + cachedData: data, + } + this.vaultManager.appendQueue([ + fi + ]) + } + async watchWorkspaceOpenAsync(file: TFile) { if (this.settings.suspendFileWatching) return; if (!this.settings.isConfigured) return; @@ -1966,53 +1964,44 @@ We can perform a command in this file. observeForLogs() { const padSpaces = `\u{2007}`.repeat(10); // const emptyMark = `\u{2003}`; - const rerenderTimer = new Map, number]>(); - const tick = reactiveSource(0); - function padLeftSp(num: number, mark: string) { - const numLen = `${num}`.length + 1; - const [timer, len] = rerenderTimer.get(mark) ?? [undefined, numLen]; - if (num || timer) { - if (num) { - if (timer) clearTimeout(timer); - rerenderTimer.set(mark, [setTimeout(async () => { - rerenderTimer.delete(mark); - await delay(100); - tick.value = tick.value + 1; - }, 3000), Math.max(len, numLen)]); + function padLeftSpComputed(numI: ReactiveValue, mark: string) { + const formatted = reactiveSource(""); + let timer: ReturnType | undefined = undefined; + let maxLen = 1; + numI.onChanged(numX => { + const num = numX.value; + const numLen = `${Math.abs(num)}`.length + 1; + maxLen = maxLen < numLen ? numLen : maxLen; + if (timer) clearTimeout(timer); + if (num == 0) { + timer = setTimeout(() => { + formatted.value = ""; + maxLen = 1; + }, 3000); } - return ` ${mark}${`${padSpaces}${num}`.slice(-(len))}`; - } else { - return ""; - } + formatted.value = ` ${mark}${`${padSpaces}${num}`.slice(-(maxLen))}`; + }) + return computed(() => formatted.value); } - // const logStore - const queueCountLabel = reactive(() => { - // For invalidating - // @ts-ignore - // eslint-disable-next-line @typescript-eslint/no-unused-vars - const _ = tick.value; - const dbCount = this.databaseQueueCount.value; - const replicationCount = this.replicationResultCount.value; - const storageApplyingCount = this.storageApplyingCount.value; - const chunkCount = collectingChunks.value; - const pluginScanCount = pluginScanningCount.value; - const hiddenFilesCount = hiddenFilesEventCount.value + hiddenFilesProcessingCount.value; - const conflictProcessCount = this.conflictProcessQueueCount.value; - const labelReplication = padLeftSp(replicationCount, `📥`); - const labelDBCount = padLeftSp(dbCount, `📄`); - const labelStorageCount = padLeftSp(storageApplyingCount, `💾`); - const labelChunkCount = padLeftSp(chunkCount, `🧩`); - const labelPluginScanCount = padLeftSp(pluginScanCount, `🔌`); - const labelHiddenFilesCount = padLeftSp(hiddenFilesCount, `⚙️`) - const labelConflictProcessCount = padLeftSp(conflictProcessCount, `🔩`); - return `${labelReplication}${labelDBCount}${labelStorageCount}${labelChunkCount}${labelPluginScanCount}${labelHiddenFilesCount}${labelConflictProcessCount}`; + const labelReplication = padLeftSpComputed(this.replicationResultCount, `📥`); + const labelDBCount = padLeftSpComputed(this.databaseQueueCount, `📄`); + const labelStorageCount = padLeftSpComputed(this.storageApplyingCount, `💾`); + const labelChunkCount = padLeftSpComputed(collectingChunks, `🧩`); + const labelPluginScanCount = padLeftSpComputed(pluginScanningCount, `🔌`); + const labelConflictProcessCount = padLeftSpComputed(this.conflictProcessQueueCount, `🔩`); + const hiddenFilesCount = reactive(() => hiddenFilesEventCount.value + hiddenFilesProcessingCount.value); + const labelHiddenFilesCount = padLeftSpComputed(hiddenFilesCount, `⚙️`) + const queueCountLabelX = reactive(() => { + return `${labelReplication()}${labelDBCount()}${labelStorageCount()}${labelChunkCount()}${labelPluginScanCount()}${labelHiddenFilesCount()}${labelConflictProcessCount()}`; }) - const requestingStatLabel = reactive(() => { + const queueCountLabel = () => queueCountLabelX.value; + + const requestingStatLabel = computed(() => { const diff = this.requestCount.value - this.responseCount.value; return diff != 0 ? "📲 " : ""; }) - const replicationStatLabel = reactive(() => { + const replicationStatLabel = computed(() => { const e = this.replicationStat.value; const sent = e.sent; const arrived = e.arrived; @@ -2055,42 +2044,36 @@ We can perform a command in this file. } return { w, sent, pushLast, arrived, pullLast }; }) - const waitingLabel = reactive(() => { - // For invalidating - // @ts-ignore - // eslint-disable-next-line @typescript-eslint/no-unused-vars - const _ = tick.value; - const e = this.pendingFileEventCount.value; - const proc = this.processingFileEventCount.value; - const pend = e - proc; - const labelProc = padLeftSp(proc, `⏳`); - const labelPend = padLeftSp(pend, `🛫`); - return `${labelProc}${labelPend}`; + const labelProc = padLeftSpComputed(this.vaultManager.processing, `⏳`); + const labelPend = padLeftSpComputed(this.vaultManager.totalQueued, `🛫`); + const labelInBatchDelay = padLeftSpComputed(this.vaultManager.batched, `📬`); + const waitingLabel = computed(() => { + return `${labelProc()}${labelPend()}${labelInBatchDelay()}`; }) - const statusLineLabel = reactive(() => { - const { w, sent, pushLast, arrived, pullLast } = replicationStatLabel.value; - const queued = queueCountLabel.value; - const waiting = waitingLabel.value; - const networkActivity = requestingStatLabel.value; + const statusLineLabel = computed(() => { + const { w, sent, pushLast, arrived, pullLast } = replicationStatLabel(); + const queued = queueCountLabel(); + const waiting = waitingLabel(); + const networkActivity = requestingStatLabel(); return { message: `${networkActivity}Sync: ${w} ↑ ${sent}${pushLast} ↓ ${arrived}${pullLast}${waiting}${queued}`, }; }) const statusBarLabels = reactive(() => { const scheduleMessage = this.isReloadingScheduled ? `WARNING! RESTARTING OBSIDIAN IS SCHEDULED!\n` : ""; - const { message } = statusLineLabel.value; + const { message } = statusLineLabel(); const status = scheduleMessage + this.statusLog.value; return { message, status } }) - const applyToDisplay = throttle(() => { - const v = statusBarLabels.value; + const applyToDisplay = throttle((label: typeof statusBarLabels.value) => { + const v = label; this.applyStatusBarText(v.message, v.status); }, 20); - statusBarLabels.onChanged(applyToDisplay); + statusBarLabels.onChanged(label => applyToDisplay(label.value)) } applyStatusBarText(message: string, log: string) { @@ -2106,7 +2089,6 @@ We can perform a command in this file. // root.style.setProperty("--log-text", "'" + (newMsg + "\\A " + newLog) + "'"); } - scheduleTask("log-hide", 3000, () => { this.statusLog.value = "" }); } async askResolvingMismatchedTweaks(): Promise<"OK" | "CHECKAGAIN" | "IGNORE"> { diff --git a/src/storages/StorageEventManager.ts b/src/storages/StorageEventManager.ts index a2e44cb..7d2688e 100644 --- a/src/storages/StorageEventManager.ts +++ b/src/storages/StorageEventManager.ts @@ -2,14 +2,34 @@ import type { SerializedFileAccess } from "./SerializedFileAccess.ts"; import { Plugin, TAbstractFile, TFile, TFolder } from "../deps.ts"; import { Logger } from "../lib/src/common/logger.ts"; import { shouldBeIgnored } from "../lib/src/string_and_binary/path.ts"; -import type { QueueProcessor } from "../lib/src/concurrency/processor.ts"; -import { LOG_LEVEL_NOTICE, type FilePath, type ObsidianLiveSyncSettings } from "../lib/src/common/types.ts"; -import { delay } from "../lib/src/common/utils.ts"; +import { LOG_LEVEL_DEBUG, LOG_LEVEL_INFO, LOG_LEVEL_NOTICE, type FilePath, type ObsidianLiveSyncSettings } from "../lib/src/common/types.ts"; +import { delay, fireAndForget } from "../lib/src/common/utils.ts"; import { type FileEventItem, type FileEventType, type FileInfo, type InternalFileInfo } from "../common/types.ts"; +import { skipIfDuplicated } from "../lib/src/concurrency/lock.ts"; +import { finishAllWaitingForTimeout, finishWaitingForTimeout, isWaitingForTimeout, waitForTimeout } from "../lib/src/concurrency/task.ts"; +import { reactiveSource, type ReactiveSource } from "../lib/src/dataobject/reactive.ts"; +import { Semaphore } from "../lib/src/concurrency/semaphore.ts"; + + +export type FileEvent = { + type: FileEventType; + file: TAbstractFile | InternalFileInfo; + oldPath?: string; + cachedData?: string; + skipBatchWait?: boolean; +}; export abstract class StorageEventManager { abstract beginWatch(): void; + abstract flushQueue(): void; + abstract appendQueue(items: FileEvent[], ctx?: any): void; + abstract cancelQueue(key: string): void; + abstract isWaiting(filename: FilePath): boolean; + abstract totalQueued: ReactiveSource; + abstract batched: ReactiveSource; + abstract processing: ReactiveSource; + } type LiveSyncForStorageEventManager = Plugin & @@ -17,15 +37,33 @@ type LiveSyncForStorageEventManager = Plugin & settings: ObsidianLiveSyncSettings ignoreFiles: string[], vaultAccess: SerializedFileAccess + shouldBatchSave: boolean + batchSaveMinimumDelay: number; + batchSaveMaximumDelay: number; + } & { isTargetFile: (file: string | TAbstractFile) => Promise, - fileEventQueue: QueueProcessor, + // fileEventQueue: QueueProcessor, + handleFileEvent: (queue: FileEventItem) => Promise, isFileSizeExceeded: (size: number) => boolean; }; export class StorageEventManagerObsidian extends StorageEventManager { + totalQueued = reactiveSource(0); + batched = reactiveSource(0); + processing = reactiveSource(0); plugin: LiveSyncForStorageEventManager; + + get shouldBatchSave() { + return this.plugin.shouldBatchSave; + } + get batchSaveMinimumDelay(): number { + return this.plugin.batchSaveMinimumDelay; + } + get batchSaveMaximumDelay(): number { + return this.plugin.batchSaveMaximumDelay + } constructor(plugin: LiveSyncForStorageEventManager) { super(); this.plugin = plugin; @@ -43,25 +81,25 @@ export class StorageEventManagerObsidian extends StorageEventManager { plugin.registerEvent(plugin.app.vault.on("create", this.watchVaultCreate)); //@ts-ignore : Internal API plugin.registerEvent(plugin.app.vault.on("raw", this.watchVaultRawEvents)); - plugin.fileEventQueue.startPipeline(); + // plugin.fileEventQueue.startPipeline(); } watchVaultCreate(file: TAbstractFile, ctx?: any) { - this.appendWatchEvent([{ type: "CREATE", file }], ctx); + this.appendQueue([{ type: "CREATE", file }], ctx); } watchVaultChange(file: TAbstractFile, ctx?: any) { - this.appendWatchEvent([{ type: "CHANGED", file }], ctx); + this.appendQueue([{ type: "CHANGED", file }], ctx); } watchVaultDelete(file: TAbstractFile, ctx?: any) { - this.appendWatchEvent([{ type: "DELETE", file }], ctx); + this.appendQueue([{ type: "DELETE", file }], ctx); } watchVaultRename(file: TAbstractFile, oldFile: string, ctx?: any) { if (file instanceof TFile) { - this.appendWatchEvent([ - { type: "DELETE", file: { path: oldFile as FilePath, mtime: file.stat.mtime, ctime: file.stat.ctime, size: file.stat.size, deleted: true } }, - { type: "CREATE", file }, + this.appendQueue([ + { type: "DELETE", file: { path: oldFile as FilePath, mtime: file.stat.mtime, ctime: file.stat.ctime, size: file.stat.size, deleted: true }, skipBatchWait: true }, + { type: "CREATE", file, skipBatchWait: true }, ], ctx); } } @@ -83,16 +121,17 @@ export class StorageEventManagerObsidian extends StorageEventManager { .replace(/\n| /g, "") .split(",").filter(e => e).map(e => new RegExp(e, "i")); if (ignorePatterns.some(e => path.match(e))) return; - this.appendWatchEvent( + this.appendQueue( [{ type: "INTERNAL", file: { path, mtime: 0, ctime: 0, size: 0 } }], null); } // Cache file and waiting to can be proceed. - async appendWatchEvent(params: { type: FileEventType, file: TAbstractFile | InternalFileInfo, oldPath?: string }[], ctx?: any) { + async appendQueue(params: FileEvent[], ctx?: any) { if (!this.plugin.settings.isConfigured) return; if (this.plugin.settings.suspendFileWatching) return; + const processFiles = new Set(); for (const param of params) { if (shouldBeIgnored(param.file.path)) { continue; @@ -118,13 +157,6 @@ export class StorageEventManagerObsidian extends StorageEventManager { if (this.plugin.vaultAccess.recentlyTouched(file)) { continue; } - // cache = await this.plugin.vaultAccess.vaultReadAuto(file); - // if (!isPlainText(file.name)) { - // cache = await this.plugin.vaultAccess.vaultReadBinary(file); - // } else { - // cache = await this.plugin.vaultAccess.vaultCacheRead(file); - // if (!cache) cache = await this.plugin.vaultAccess.vaultRead(file); - // } } const fileInfo = file instanceof TFile ? { ctime: file.stat.ctime, @@ -133,16 +165,143 @@ export class StorageEventManagerObsidian extends StorageEventManager { path: file.path, size: file.stat.size } as FileInfo : file as InternalFileInfo; - this.plugin.fileEventQueue.enqueue({ + let cache: string | undefined = undefined; + if (param.cachedData) { + cache = param.cachedData + } + this.enqueue({ type, args: { file: fileInfo, oldPath, - // cache, - ctx + cache, + ctx, }, + skipBatchWait: param.skipBatchWait, key: atomicKey }) + processFiles.add(file.path as FilePath); + if (oldPath) { + processFiles.add(oldPath as FilePath); + } + } + for (const path of processFiles) { + fireAndForget(() => this.startStandingBy(path)); } } + bufferedQueuedItems = [] as FileEventItem[]; + + enqueue(newItem: FileEventItem) { + const filename = newItem.args.file.path; + if (this.shouldBatchSave) { + Logger(`Request cancel for waiting of previous ${filename}`, LOG_LEVEL_DEBUG); + finishWaitingForTimeout(`storage-event-manager-batchsave-${filename}`); + } + this.bufferedQueuedItems.push(newItem); + // When deleting or renaming, the queue must be flushed once before processing subsequent processes to prevent unexpected race condition. + if (newItem.type == "DELETE" || newItem.type == "RENAME") { + return this.flushQueue(); + } + } + concurrentProcessing = Semaphore(5); + waitedSince = new Map(); + async startStandingBy(filename: FilePath) { + // If waited, cancel previous waiting. + await skipIfDuplicated(`storage-event-manager-${filename}`, async () => { + Logger(`Processing ${filename}: Starting`, LOG_LEVEL_DEBUG); + const release = await this.concurrentProcessing.acquire(); + try { + Logger(`Processing ${filename}: Started`, LOG_LEVEL_DEBUG); + let noMoreFiles = false; + do { + const target = this.bufferedQueuedItems.find(e => e.args.file.path == filename); + if (target === undefined) { + noMoreFiles = true; + break; + } + const operationType = target.type; + + // if (target.waitedFrom + this.batchSaveMaximumDelay > now) { + // this.requestProcessQueue(target); + // continue; + // } + const type = target.type; + if (target.cancelled) { + Logger(`Processing ${filename}: Cancelled (scheduled): ${operationType}`, LOG_LEVEL_DEBUG) + this.cancelStandingBy(target); + continue; + } + if (!target.skipBatchWait) { + if (this.shouldBatchSave && (type == "CREATE" || type == "CHANGED")) { + const waitedSince = this.waitedSince.get(filename); + let canWait = true; + const now = Date.now(); + if (waitedSince !== undefined) { + if (waitedSince + (this.batchSaveMaximumDelay * 1000) < now) { + Logger(`Processing ${filename}: Could not wait no more: ${operationType}`, LOG_LEVEL_INFO) + canWait = false; + } + } + if (canWait) { + if (waitedSince === undefined) this.waitedSince.set(filename, now) + target.batched = true + Logger(`Processing ${filename}: Waiting for batch save delay: ${operationType}`, LOG_LEVEL_DEBUG) + this.updateStatus(); + const result = await waitForTimeout(`storage-event-manager-batchsave-${filename}`, this.batchSaveMinimumDelay * 1000); + if (!result) { + Logger(`Processing ${filename}: Cancelled by new queue: ${operationType}`, LOG_LEVEL_DEBUG) + // If could not wait for the timeout, possibly we got a new queue. therefore, currently processing one should be cancelled + this.cancelStandingBy(target); + continue; + } + } + } + } else { + Logger(`Processing ${filename}:Requested to perform immediately ${filename}: ${operationType}`, LOG_LEVEL_DEBUG) + } + Logger(`Processing ${filename}: Request main to process: ${operationType}`, LOG_LEVEL_DEBUG) + this.requestProcessQueue(target); + } while (!noMoreFiles) + } finally { + release() + } + Logger(`Processing ${filename}: Finished`, LOG_LEVEL_DEBUG); + }) + } + + cancelStandingBy(fei: FileEventItem) { + this.bufferedQueuedItems.remove(fei); + this.updateStatus(); + } + processingCount = 0; + async requestProcessQueue(fei: FileEventItem) { + try { + this.processingCount++; + this.bufferedQueuedItems.remove(fei); + this.updateStatus() + this.waitedSince.delete(fei.args.file.path); + await this.plugin.handleFileEvent(fei); + } finally { + this.processingCount--; + this.updateStatus() + } + } + isWaiting(filename: FilePath) { + return isWaitingForTimeout(`storage-event-manager-batchsave-${filename}`); + } + flushQueue() { + this.bufferedQueuedItems.forEach(e => e.skipBatchWait = true) + finishAllWaitingForTimeout("storage-event-manager-batchsave-", true); + } + cancelQueue(key: string) { + this.bufferedQueuedItems.forEach(e => { + if (e.key === key) e.skipBatchWait = true + }) + } + updateStatus() { + const allItems = this.bufferedQueuedItems.filter(e => !e.cancelled) + this.batched.value = allItems.filter(e => e.batched && !e.skipBatchWait).length; + this.processing.value = this.processingCount; + this.totalQueued.value = allItems.length - this.batched.value; + } } \ No newline at end of file diff --git a/src/ui/ObsidianLiveSyncSettingTab.ts b/src/ui/ObsidianLiveSyncSettingTab.ts index 32fe405..8b2cc93 100644 --- a/src/ui/ObsidianLiveSyncSettingTab.ts +++ b/src/ui/ObsidianLiveSyncSettingTab.ts @@ -644,6 +644,7 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab { const keys = Object.keys(newConf) as (keyof ObsidianLiveSyncSettings)[]; let hasLoaded = false; for (const k of keys) { + if (k === "deviceAndVaultName") continue; if (isObjectDifferent(newConf[k], this.initialSettings?.[k])) { // Something has changed if (this.isDirty(k as AllSettingItemKey)) { @@ -1739,6 +1740,24 @@ However, your report is needed to stabilise this. I appreciate you for your grea new Setting(containerSyncSettingEl) .setClass("wizardHidden") .autoWireToggle("batchSave") + new Setting(containerSyncSettingEl) + .setClass("wizardHidden") + .autoWireNumeric("batchSaveMinimumDelay", + { + acceptZero: true, + onUpdate: visibleOnly(() => this.isConfiguredAs("batchSave", true)) + } + ) + new Setting(containerSyncSettingEl) + .setClass("wizardHidden") + .autoWireNumeric("batchSaveMaximumDelay", + { + acceptZero: true, + onUpdate: visibleOnly(() => this.isConfiguredAs("batchSave", true)) + } + ) + + new Setting(containerSyncSettingEl) .setClass("wizardHidden") diff --git a/src/ui/settingConstants.ts b/src/ui/settingConstants.ts index 43deff2..31a882a 100644 --- a/src/ui/settingConstants.ts +++ b/src/ui/settingConstants.ts @@ -67,11 +67,11 @@ export const SettingInformation: Partial