From 7f422d58f2b3124497ab77c5908fa4cf7f630124 Mon Sep 17 00:00:00 2001 From: vorotamoroz Date: Fri, 12 Jan 2024 09:36:49 +0000 Subject: [PATCH] - Refined: - Task scheduling logics has been rewritten. - Possibly many bugs and fragile behaviour has been fixed - Fixed: - Remote-chunk-fetching now works with keeping request intervals - New feature: - We can show only the icons in the editor. --- src/CmdConfigSync.ts | 120 +++-- src/CmdHiddenFileSync.ts | 66 +-- src/CmdSetupLiveSync.ts | 1 - src/LogDisplayModal.ts | 11 +- src/LogPane.svelte | 29 +- src/ObsidianLiveSyncSettingTab.ts | 14 +- src/StorageEventManager.ts | 65 +-- src/lib | 2 +- src/main.ts | 839 ++++++++++++++---------------- src/utils.ts | 47 +- tsconfig.json | 1 + 11 files changed, 545 insertions(+), 650 deletions(-) diff --git a/src/CmdConfigSync.ts b/src/CmdConfigSync.ts index ff10398..3b89da6 100644 --- a/src/CmdConfigSync.ts +++ b/src/CmdConfigSync.ts @@ -7,14 +7,16 @@ import { ICXHeader, PERIODIC_PLUGIN_SWEEP, } from "./types"; import { createTextBlob, delay, getDocData } from "./lib/src/utils"; import { Logger } from "./lib/src/logger"; import { WrappedNotice } from "./lib/src/wrapper"; -import { readString, crc32CKHash, decodeBinary, arrayBufferToBase64 } from "./lib/src/strbin"; +import { readString, decodeBinary, arrayBufferToBase64, sha1 } from "./lib/src/strbin"; import { serialized } from "./lib/src/lock"; import { LiveSyncCommands } from "./LiveSyncCommands"; import { stripAllPrefixes } from "./lib/src/path"; import { PeriodicProcessor, askYesNo, disposeMemoObject, memoIfNotExist, memoObject, retrieveMemoObject, scheduleTask } from "./utils"; import { PluginDialogModal } from "./dialogs"; import { JsonResolveModal } from "./JsonResolveModal"; -import { pipeGeneratorToGenerator, processAllGeneratorTasksWithConcurrencyLimit } from './lib/src/task'; +import { QueueProcessor } from './lib/src/processor'; +import { pluginScanningCount } from './lib/src/stores'; +import type ObsidianLiveSyncPlugin from './main'; const d = "\u200b"; const d2 = "\n"; @@ -162,6 +164,16 @@ export type PluginDataEx = { mtime: number, }; export class ConfigSync extends LiveSyncCommands { + constructor(plugin: ObsidianLiveSyncPlugin) { + super(plugin); + pluginScanningCount.onChanged((e) => { + const total = e.value; + pluginIsEnumerating.set(total != 0); + if (total == 0) { + Logger(`Processing configurations done`, LOG_LEVEL_INFO, "get-plugins"); + } + }) + } confirmPopup: WrappedNotice = null; get kvDB() { return this.plugin.kvDB; @@ -270,7 +282,7 @@ export class ConfigSync extends LiveSyncCommands { for (const file of data.files) { const work = { ...file }; const tempStr = getDocData(work.data); - work.data = [crc32CKHash(tempStr)]; + work.data = [await sha1(tempStr)]; xFiles.push(work); } return ({ @@ -302,65 +314,65 @@ export class ConfigSync extends LiveSyncCommands { this.plugin.saveSettingData(); } } + + pluginScanProcessor = new QueueProcessor(async (v: AnyEntry[]) => { + const plugin = v[0]; + const path = plugin.path || this.getPath(plugin); + const oldEntry = (this.pluginList.find(e => e.documentPath == path)); + if (oldEntry && oldEntry.mtime == plugin.mtime) return; + try { + const pluginData = await this.loadPluginData(path); + if (pluginData) { + return [pluginData]; + } + // Failed to load + return; + + } catch (ex) { + Logger(`Something happened at enumerating customization :${path}`, LOG_LEVEL_NOTICE); + Logger(ex, LOG_LEVEL_VERBOSE); + } + return; + }, { suspended: true, batchSize: 1, concurrentLimit: 5, delay: 300, yieldThreshold: 10 }).pipeTo( + new QueueProcessor( + (pluginDataList) => { + let newList = [...this.pluginList]; + for (const item of pluginDataList) { + newList = newList.filter(x => x.documentPath != item.documentPath); + newList.push(item) + } + this.pluginList = newList; + pluginList.set(newList); + return; + } + , { suspended: true, batchSize: 1000, concurrentLimit: 10, delay: 200, yieldThreshold: 25, totalRemainingReactiveSource: pluginScanningCount })).startPipeline().root.onIdle(() => { + Logger(`All files enumerated`, LOG_LEVEL_INFO, "get-plugins"); + this.createMissingConfigurationEntry(); + }); + + async updatePluginList(showMessage: boolean, updatedDocumentPath?: FilePathWithPrefix): Promise { - const logLevel = showMessage ? LOG_LEVEL_NOTICE : LOG_LEVEL_INFO; // pluginList.set([]); if (!this.settings.usePluginSync) { + this.pluginScanProcessor.clearQueue(); this.pluginList = []; pluginList.set(this.pluginList) return; } - await Promise.resolve(); // Just to prevent warning. - scheduleTask("update-plugin-list-task", 200, async () => { - await serialized("update-plugin-list", async () => { - try { - const updatedDocumentId = updatedDocumentPath ? await this.path2id(updatedDocumentPath) : ""; - const plugins = updatedDocumentPath ? - this.localDatabase.findEntries(updatedDocumentId, updatedDocumentId + "\u{10ffff}", { include_docs: true, key: updatedDocumentId, limit: 1 }) : - this.localDatabase.findEntries(ICXHeader + "", `${ICXHeader}\u{10ffff}`, { include_docs: true }); - let count = 0; - pluginIsEnumerating.set(true); - for await (const v of processAllGeneratorTasksWithConcurrencyLimit(20, pipeGeneratorToGenerator(plugins, async plugin => { - const path = plugin.path || this.getPath(plugin); - if (updatedDocumentPath && updatedDocumentPath != path) { - return false; - } - const oldEntry = (this.pluginList.find(e => e.documentPath == path)); - if (oldEntry && oldEntry.mtime == plugin.mtime) return false; - try { - count++; - if (count % 10 == 0) Logger(`Enumerating files... ${count}`, logLevel, "get-plugins"); - Logger(`plugin-${path}`, LOG_LEVEL_VERBOSE); - return this.loadPluginData(path); - // return entries; - } catch (ex) { - //TODO - Logger(`Something happened at enumerating customization :${path}`, LOG_LEVEL_NOTICE); - console.warn(ex); - } - return false; - }))) { - if ("ok" in v) { - if (v.ok !== false) { - let newList = [...this.pluginList]; - const item = v.ok; - newList = newList.filter(x => x.documentPath != item.documentPath); - newList.push(item) - if (updatedDocumentPath != "") newList = newList.filter(e => e.documentPath != updatedDocumentPath); - this.pluginList = newList; - pluginList.set(newList); - } - } - } - Logger(`All files enumerated`, logLevel, "get-plugins"); - pluginIsEnumerating.set(false); - this.createMissingConfigurationEntry(); - } finally { - pluginIsEnumerating.set(false); - } - }); + try { + const updatedDocumentId = updatedDocumentPath ? await this.path2id(updatedDocumentPath) : ""; + const plugins = updatedDocumentPath ? + this.localDatabase.findEntries(updatedDocumentId, updatedDocumentId + "\u{10ffff}", { include_docs: true, key: updatedDocumentId, limit: 1 }) : + this.localDatabase.findEntries(ICXHeader + "", `${ICXHeader}\u{10ffff}`, { include_docs: true }); + for await (const v of plugins) { + const path = v.path || this.getPath(v); + if (updatedDocumentPath && updatedDocumentPath != path) continue; + this.pluginScanProcessor.enqueue(v); + } + } finally { pluginIsEnumerating.set(false); - }); + } + pluginIsEnumerating.set(false); // return entries; } async compareUsingDisplayData(dataA: PluginDataExDisplay, dataB: PluginDataExDisplay) { diff --git a/src/CmdHiddenFileSync.ts b/src/CmdHiddenFileSync.ts index 94ac5a2..2f7b354 100644 --- a/src/CmdHiddenFileSync.ts +++ b/src/CmdHiddenFileSync.ts @@ -1,16 +1,18 @@ import { normalizePath, type PluginManifest } from "./deps"; import { type EntryDoc, type LoadedEntry, type InternalFileEntry, type FilePathWithPrefix, type FilePath, LOG_LEVEL_INFO, LOG_LEVEL_NOTICE, LOG_LEVEL_VERBOSE, MODE_SELECTIVE, MODE_PAUSED, type SavingEntry } from "./lib/src/types"; import { type InternalFileInfo, ICHeader, ICHeaderEnd } from "./types"; -import { Parallels, createBinaryBlob, delay, isDocContentSame } from "./lib/src/utils"; +import { createBinaryBlob, delay, isDocContentSame } from "./lib/src/utils"; import { Logger } from "./lib/src/logger"; import { PouchDB } from "./lib/src/pouchdb-browser.js"; -import { scheduleTask, isInternalMetadata, PeriodicProcessor } from "./utils"; +import { isInternalMetadata, PeriodicProcessor } from "./utils"; import { WrappedNotice } from "./lib/src/wrapper"; import { decodeBinary, encodeBinary } from "./lib/src/strbin"; import { serialized } from "./lib/src/lock"; import { JsonResolveModal } from "./JsonResolveModal"; import { LiveSyncCommands } from "./LiveSyncCommands"; import { addPrefix, stripAllPrefixes } from "./lib/src/path"; +import { KeyedQueueProcessor, QueueProcessor } from "./lib/src/processor"; +import { hiddenFilesEventCount, hiddenFilesProcessingCount } from "./lib/src/stores"; export class HiddenFileSync extends LiveSyncCommands { periodicInternalFileScanProcessor: PeriodicProcessor = new PeriodicProcessor(this.plugin, async () => this.settings.syncInternalFiles && this.localDatabase.isReady && await this.syncInternalFilesAndDatabase("push", false)); @@ -75,22 +77,17 @@ export class HiddenFileSync extends LiveSyncCommands { return; } - procInternalFiles: string[] = []; - async execInternalFile() { - await serialized("execInternal", async () => { - const w = [...this.procInternalFiles]; - this.procInternalFiles = []; - Logger(`Applying hidden ${w.length} files change...`); - await this.syncInternalFilesAndDatabase("pull", false, false, w); - Logger(`Applying hidden ${w.length} files changed`); - }); - } procInternalFile(filename: string) { - this.procInternalFiles.push(filename); - scheduleTask("procInternal", 500, async () => { - await this.execInternalFile(); - }); + this.internalFileProcessor.enqueueWithKey(filename, filename); } + internalFileProcessor = new KeyedQueueProcessor( + async (filenames) => { + Logger(`START :Applying hidden ${filenames.length} files change`, LOG_LEVEL_VERBOSE); + await this.syncInternalFilesAndDatabase("pull", false, false, filenames); + Logger(`DONE :Applying hidden ${filenames.length} files change`, LOG_LEVEL_VERBOSE); + return; + }, { batchSize: 100, concurrentLimit: 1, delay: 100, yieldThreshold: 10, suspended: false, totalRemainingReactiveSource: hiddenFilesEventCount } + ); recentProcessedInternalFiles = [] as string[]; async watchVaultRawEventsAsync(path: FilePath) { @@ -278,28 +275,38 @@ export class HiddenFileSync extends LiveSyncCommands { acc[stripAllPrefixes(this.getPath(cur))] = cur; return acc; }, {} as { [key: string]: InternalFileEntry; }); - const para = Parallels(); - for (const filename of allFileNames) { + await new QueueProcessor(async (filenames: FilePath[]) => { + const filename = filenames[0]; processed++; if (processed % 100 == 0) { Logger(`Hidden file: ${processed}/${fileCount}`, logLevel, "sync_internal"); } - if (!filename) continue; + if (!filename) return; if (ignorePatterns.some(e => filename.match(e))) - continue; + return; if (await this.plugin.isIgnoredByIgnoreFiles(filename)) { - continue; + return; } const fileOnStorage = filename in filesMap ? filesMap[filename] : undefined; const fileOnDatabase = filename in filesOnDBMap ? filesOnDBMap[filename] : undefined; - const cache = filename in caches ? caches[filename] : { storageMtime: 0, docMtime: 0 }; - - await para.wait(5); - const proc = (async (xFileOnStorage: InternalFileInfo, xFileOnDatabase: InternalFileEntry) => { + return [{ + filename, + fileOnStorage, + fileOnDatabase, + }] + }, { suspended: true, batchSize: 1, concurrentLimit: 10, delay: 0, totalRemainingReactiveSource: hiddenFilesProcessingCount }) + .pipeTo(new QueueProcessor(async (params) => { + const + { + filename, + fileOnStorage: xFileOnStorage, + fileOnDatabase: xFileOnDatabase + } = params[0]; if (xFileOnStorage && xFileOnDatabase) { + const cache = filename in caches ? caches[filename] : { storageMtime: 0, docMtime: 0 }; // Both => Synchronize if ((direction != "pullForce" && direction != "pushForce") && xFileOnDatabase.mtime == cache.docMtime && xFileOnStorage.mtime == cache.storageMtime) { return; @@ -340,11 +347,12 @@ export class HiddenFileSync extends LiveSyncCommands { throw new Error("Invalid state on hidden file sync"); // Something corrupted? } + return; + }, { suspended: true, batchSize: 1, concurrentLimit: 5, delay: 0 })) + .root + .enqueueAll(allFileNames) + .startPipeline().waitForPipeline(); - }); - para.add(proc(fileOnStorage, fileOnDatabase)) - } - await para.all(); await this.kvDB.set("diff-caches-internal", caches); // When files has been retrieved from the database. they must be reloaded. diff --git a/src/CmdSetupLiveSync.ts b/src/CmdSetupLiveSync.ts index 857b83d..f986328 100644 --- a/src/CmdSetupLiveSync.ts +++ b/src/CmdSetupLiveSync.ts @@ -321,7 +321,6 @@ Of course, we are able to disable these features.` this.plugin.settings.suspendFileWatching = false; await this.plugin.syncAllFiles(true); await this.plugin.loadQueuedFiles(); - this.plugin.procQueuedFiles(); await this.plugin.saveSettings(); } diff --git a/src/LogDisplayModal.ts b/src/LogDisplayModal.ts index 4ec52ac..932452a 100644 --- a/src/LogDisplayModal.ts +++ b/src/LogDisplayModal.ts @@ -1,5 +1,6 @@ import { App, Modal } from "./deps"; -import { logMessageStore } from "./lib/src/stores"; +import type { ReactiveInstance, } from "./lib/src/reactive"; +import { logMessages } from "./lib/src/stores"; import { escapeStringToHTML } from "./lib/src/strbin"; import ObsidianLiveSyncPlugin from "./main"; @@ -21,14 +22,16 @@ export class LogDisplayModal extends Modal { div.addClass("op-scrollable"); div.addClass("op-pre"); this.logEl = div; - this.unsubscribe = logMessageStore.observe((e) => { + function updateLog(logs: ReactiveInstance) { + const e = logs.value; let msg = ""; for (const v of e) { msg += escapeStringToHTML(v) + "
"; } this.logEl.innerHTML = msg; - }) - logMessageStore.invalidate(); + } + logMessages.onChanged(updateLog); + this.unsubscribe = () => logMessages.offChanged(updateLog); } onClose() { const { contentEl } = this; diff --git a/src/LogPane.svelte b/src/LogPane.svelte index 828841f..c222689 100644 --- a/src/LogPane.svelte +++ b/src/LogPane.svelte @@ -1,26 +1,27 @@