diff --git a/src/CmdConfigSync.ts b/src/CmdConfigSync.ts index 404d8f8..9733e55 100644 --- a/src/CmdConfigSync.ts +++ b/src/CmdConfigSync.ts @@ -14,6 +14,7 @@ import { stripAllPrefixes } from "./lib/src/path"; import { PeriodicProcessor, askYesNo, disposeMemoObject, memoIfNotExist, memoObject, retrieveMemoObject, scheduleTask } from "./utils"; import { PluginDialogModal } from "./dialogs"; import { JsonResolveModal } from "./JsonResolveModal"; +import { pipeGeneratorToGenerator, processAllGeneratorTasksWithConcurrencyLimit } from './lib/src/task'; function serialize(obj: T): string { @@ -173,63 +174,60 @@ export class ConfigSync extends LiveSyncCommands { } scheduleTask("update-plugin-list-task", 200, async () => { await runWithLock("update-plugin-list", false, async () => { - const entries = [] as PluginDataExDisplay[] - const plugins = this.localDatabase.findEntries(ICXHeader + "", `${ICXHeader}\u{10ffff}`, { include_docs: true }); - const para = Parallels(); - let count = 0; - pluginIsEnumerating.set(true); - let processed = false; try { - for await (const plugin of plugins) { + const updatedDocumentId = updatedDocumentPath ? await this.path2id(updatedDocumentPath) : ""; + const plugins = updatedDocumentPath ? + this.localDatabase.findEntries(updatedDocumentId, updatedDocumentId + "\u{10ffff}", { include_docs: true, key: updatedDocumentId, limit: 1 }) : + this.localDatabase.findEntries(ICXHeader + "", `${ICXHeader}\u{10ffff}`, { include_docs: true }); + let count = 0; + pluginIsEnumerating.set(true); + for await (const v of processAllGeneratorTasksWithConcurrencyLimit(20, pipeGeneratorToGenerator(plugins, async plugin => { const path = plugin.path || this.getPath(plugin); if (updatedDocumentPath && updatedDocumentPath != path) { - continue; + return false; } - processed = true; const oldEntry = (this.pluginList.find(e => e.documentPath == path)); - if (oldEntry && oldEntry.mtime == plugin.mtime) continue; - await para.wait(15); - para.add((async (v) => { - try { - count++; - if (count % 10 == 0) Logger(`Enumerating files... ${count}`, logLevel, "get-plugins"); - Logger(`plugin-${path}`, LOG_LEVEL.VERBOSE); - const wx = await this.localDatabase.getDBEntry(path, null, false, false); - if (wx) { - const data = deserialize(getDocData(wx.data), {}) as PluginDataEx; - const xFiles = [] as PluginDataExFile[]; - for (const file of data.files) { - const work = { ...file }; - const tempStr = getDocData(work.data); - work.data = [crc32CKHash(tempStr)]; - xFiles.push(work); - } - entries.push({ - ...data, - documentPath: this.getPath(wx), - files: xFiles - }); + if (oldEntry && oldEntry.mtime == plugin.mtime) return false; + try { + count++; + if (count % 10 == 0) Logger(`Enumerating files... ${count}`, logLevel, "get-plugins"); + Logger(`plugin-${path}`, LOG_LEVEL.VERBOSE); + const wx = await this.localDatabase.getDBEntry(path, null, false, false); + if (wx) { + const data = deserialize(getDocData(wx.data), {}) as PluginDataEx; + const xFiles = [] as PluginDataExFile[]; + for (const file of data.files) { + const work = { ...file }; + const tempStr = getDocData(work.data); + work.data = [crc32CKHash(tempStr)]; + xFiles.push(work); } - } catch (ex) { - //TODO - Logger(`Something happened at enumerating customization :${v.path}`, LOG_LEVEL.NOTICE); - console.warn(ex); + return ({ + ...data, + documentPath: this.getPath(wx), + files: xFiles + }); + } + // return entries; + } catch (ex) { + //TODO + Logger(`Something happened at enumerating customization :${path}`, LOG_LEVEL.NOTICE); + console.warn(ex); + } + return false; + }))) { + if ("ok" in v) { + if (v.ok != false) { + let newList = [...this.pluginList]; + const item = v.ok; + newList = newList.filter(x => x.documentPath != item.documentPath); + newList.push(item) + if (updatedDocumentPath != "") newList = newList.filter(e => e.documentPath != updatedDocumentPath); + this.pluginList = newList; + pluginList.set(newList); } } - )(plugin)); } - await para.all(); - let newList = [...this.pluginList]; - for (const item of entries) { - newList = newList.filter(x => x.documentPath != item.documentPath); - newList.push(item) - } - if (updatedDocumentPath != "" && !processed) newList = newList.filter(e => e.documentPath != updatedDocumentPath); - - this.pluginList = newList; - pluginList.set(newList); - - Logger(`All files enumerated`, logLevel, "get-plugins"); } finally { pluginIsEnumerating.set(false); diff --git a/src/lib b/src/lib index 250f63e..aacf942 160000 --- a/src/lib +++ b/src/lib @@ -1 +1 @@ -Subproject commit 250f63eb48926d7b8168a594b6cbb7961fbee95e +Subproject commit aacf942453b6282b712a761a2f19c43eeb013884 diff --git a/src/main.ts b/src/main.ts index 17ac22b..44dd3e3 100644 --- a/src/main.ts +++ b/src/main.ts @@ -4,8 +4,8 @@ import { type Diff, DIFF_DELETE, DIFF_EQUAL, DIFF_INSERT, diff_match_patch } fro import { debounce, Notice, Plugin, TFile, addIcon, TFolder, normalizePath, TAbstractFile, Editor, MarkdownView, type RequestUrlParam, type RequestUrlResponse, requestUrl } from "./deps"; import { type EntryDoc, type LoadedEntry, type ObsidianLiveSyncSettings, type diff_check_result, type diff_result_leaf, type EntryBody, LOG_LEVEL, VER, DEFAULT_SETTINGS, type diff_result, FLAGMD_REDFLAG, SYNCINFO_ID, SALT_OF_PASSPHRASE, type ConfigPassphraseStore, type CouchDBConnection, FLAGMD_REDFLAG2, FLAGMD_REDFLAG3, PREFIXMD_LOGFILE, type DatabaseConnectingStatus, type EntryHasPath, type DocumentID, type FilePathWithPrefix, type FilePath, type AnyEntry } from "./lib/src/types"; import { type InternalFileInfo, type queueItem, type CacheData, type FileEventItem, FileWatchEventQueueMax } from "./types"; -import { getDocData, isDocContentSame, Parallels } from "./lib/src/utils"; -import { Logger } from "./lib/src/logger"; +import { arrayToChunkedArray, getDocData, isDocContentSame } from "./lib/src/utils"; +import { Logger, setGlobalLogFunction } from "./lib/src/logger"; import { PouchDB } from "./lib/src/pouchdb-browser.js"; import { ConflictResolveModal } from "./ConflictResolveModal"; import { ObsidianLiveSyncSettingTab } from "./ObsidianLiveSyncSettingTab"; @@ -14,7 +14,7 @@ import { applyPatch, cancelAllPeriodicTask, cancelAllTasks, cancelTask, generate import { encrypt, tryDecrypt } from "./lib/src/e2ee_v2"; import { enableEncryption, isCloudantURI, isErrorOfMissingDoc, isValidRemoteCouchDBURI } from "./lib/src/utils_couchdb"; import { getGlobalStore, ObservableStore, observeStores } from "./lib/src/store"; -import { lockStore, logMessageStore, logStore } from "./lib/src/stores"; +import { lockStore, logMessageStore, logStore, type LogEntry } from "./lib/src/stores"; import { setNoticeClass } from "./lib/src/wrapper"; import { base64ToString, versionNumberString2Number, base64ToArrayBuffer, arrayBufferToBase64 } from "./lib/src/strbin"; import { addPrefix, isPlainText, shouldBeIgnored, stripAllPrefixes } from "./lib/src/path"; @@ -31,9 +31,17 @@ import { ConfigSync } from "./CmdConfigSync"; import { confirmWithMessage } from "./dialogs"; import { GlobalHistoryView, VIEW_TYPE_GLOBAL_HISTORY } from "./GlobalHistoryView"; import { LogPaneView, VIEW_TYPE_LOG } from "./LogPaneView"; +import { mapAllTasksWithConcurrencyLimit, processAllTasksWithConcurrencyLimit } from "./lib/src/task"; setNoticeClass(Notice); +// DI the log again. +setGlobalLogFunction((message: any, level?: LOG_LEVEL, key?: string) => { + const entry = { message, level, key } as LogEntry; + logStore.push(entry); +}); +logStore.intercept(e => e.slice(Math.min(e.length - 200, 0))); + export default class ObsidianLiveSyncPlugin extends Plugin implements LiveSyncLocalDBEnv, LiveSyncReplicatorEnv { @@ -1710,21 +1718,27 @@ Or if you are sure know what had been happened, we can unlock the database from const runAll = async(procedureName: string, objects: T[], callback: (arg: T) => Promise) => { Logger(procedureName); if (!this.localDatabase.isReady) throw Error("Database is not ready!"); - const para = Parallels(); - for (const v of objects) { - await para.wait(10); - para.add((async (v) => { - try { - await callback(v); - } catch (ex) { - Logger(`Error while ${procedureName}`, LOG_LEVEL.NOTICE); - Logger(ex); - } - })(v)); + const procs = objects.map(e => async () => { + try { + await callback(e); + return true; + } catch (ex) { + Logger(`Error while ${procedureName}`, LOG_LEVEL.NOTICE); + Logger(ex, LOG_LEVEL.VERBOSE); + return false; + } + }); + let success = 0; + let failed = 0; + for await (const v of processAllTasksWithConcurrencyLimit(10, procs)) { + if ("ok" in v && v.ok) { + success++; + } else { + failed++; + } } - await para.all(); - Logger(`${procedureName} done.`); + Logger(`${procedureName}: PASS:${success}, FAILED:${failed}`); } await runAll("UPDATE DATABASE", onlyInStorage, async (e) => { @@ -1748,22 +1762,19 @@ Or if you are sure know what had been happened, we can unlock the database from if (!initialScan) { let caches: { [key: string]: { storageMtime: number; docMtime: number } } = {}; caches = await this.kvDB.get<{ [key: string]: { storageMtime: number; docMtime: number } }>("diff-caches") || {}; - const docsCount = syncFiles.length; - do { - const syncFilesXSrc = syncFiles.splice(0, 100); - const syncFilesX = [] as { file: TFile, id: DocumentID }[]; - for (const file of syncFilesXSrc) { - const id = await this.path2id(getPathFromTFile(file)); - syncFilesX.push({ file: file, id: id }); - } - const docs = await this.localDatabase.allDocsRaw({ keys: syncFilesX.map(e => e.id), include_docs: true }) - const docsMap = docs.rows.reduce((p, c) => ({ ...p, [c.id]: c.doc }), {} as Record) - const syncFilesToSync = syncFilesX.map((e) => ({ file: e.file, doc: docsMap[e.id] as LoadedEntry })); - await runAll(`CHECK FILE STATUS:${syncFiles.length}/${docsCount}`, syncFilesToSync, async (e) => { + const syncFilesBatch = [...arrayToChunkedArray(syncFiles, 100)]; + const processes = syncFilesBatch.map((files, idx, total) => async () => { + const dbEntries = await mapAllTasksWithConcurrencyLimit(10, files.map(file => async () => ({ file: file, id: await this.path2id(getPathFromTFile(file)) }))); + const dbEntriesOk = dbEntries.map(e => "ok" in e ? e.ok : undefined).filter(e => e); + const docs = await this.localDatabase.allDocsRaw({ keys: dbEntriesOk.map(e => e.id), include_docs: true }); + const docsMap = docs.rows.reduce((p, c) => ({ ...p, [c.id]: c.doc }), {} as Record); + const syncFilesToSync = dbEntriesOk.map((e) => ({ file: e.file, doc: docsMap[e.id] as LoadedEntry })); + await runAll(`CHECK FILE STATUS:${idx + 1}/${total.length}`, syncFilesToSync, async (e) => { caches = await this.syncFileBetweenDBandStorage(e.file, e.doc, initialScan, caches); }); - } while (syncFiles.length > 0); + }) + await mapAllTasksWithConcurrencyLimit(2, processes); await this.kvDB.set("diff-caches", caches); }