diff --git a/src/CmdConfigSync.ts b/src/CmdConfigSync.ts index 657901c..c53c0ea 100644 --- a/src/CmdConfigSync.ts +++ b/src/CmdConfigSync.ts @@ -1,13 +1,13 @@ import { writable } from 'svelte/store'; -import { Notice, PluginManifest, stringifyYaml, parseYaml } from "./deps"; +import { Notice, PluginManifest, parseYaml } from "./deps"; import { EntryDoc, LoadedEntry, LOG_LEVEL, InternalFileEntry, FilePathWithPrefix, FilePath, DocumentID } from "./lib/src/types"; import { ICXHeader, PERIODIC_PLUGIN_SWEEP, } from "./types"; -import { delay, getDocData } from "./lib/src/utils"; +import { Parallels, delay, getDocData } from "./lib/src/utils"; import { Logger } from "./lib/src/logger"; import { PouchDB } from "./lib/src/pouchdb-browser.js"; import { WrappedNotice } from "./lib/src/wrapper"; -import { base64ToArrayBuffer, arrayBufferToBase64, readString, writeString, uint8ArrayToHexString } from "./lib/src/strbin"; +import { base64ToArrayBuffer, arrayBufferToBase64, readString, uint8ArrayToHexString } from "./lib/src/strbin"; import { runWithLock } from "./lib/src/lock"; import { LiveSyncCommands } from "./LiveSyncCommands"; import { stripAllPrefixes } from "./lib/src/path"; @@ -17,13 +17,29 @@ import { PluginDialogModal } from "./dialogs"; import { JsonResolveModal } from "./JsonResolveModal"; +function serialize(obj: T): string { + return JSON.stringify(obj, null, 1); +} +function deserialize(str: string, def: T) { + try { + return JSON.parse(str) as T; + } catch (ex) { + try { + return parseYaml(str); + } catch (ex) { + return def; + } + } +} export const pluginList = writable([] as PluginDataExDisplay[]); export const pluginIsEnumerating = writable(false); +const encoder = new TextEncoder(); const hashString = (async (key: string) => { - const buff = writeString(key); + // const buff = writeString(key); + const buff = encoder.encode(key); const digest = await crypto.subtle.digest('SHA-256', buff); return uint8ArrayToHexString(new Uint8Array(digest)); }) @@ -171,7 +187,7 @@ export class ConfigSync extends LiveSyncCommands { const entries = [] as PluginDataExDisplay[] const plugins = this.localDatabase.findEntries(ICXHeader + "", `${ICXHeader}\u{10ffff}`, { include_docs: true }); const semaphore = Semaphore(4); - const processes = [] as Promise[]; + const para = Parallels(); let count = 0; pluginIsEnumerating.set(true); let processed = false; @@ -184,7 +200,8 @@ export class ConfigSync extends LiveSyncCommands { processed = true; const oldEntry = (this.pluginList.find(e => e.documentPath == path)); if (oldEntry && oldEntry.mtime == plugin.mtime) continue; - processes.push((async (v) => { + await para.wait(5); + para.add((async (v) => { const release = await semaphore.acquire(1); try { @@ -193,7 +210,7 @@ export class ConfigSync extends LiveSyncCommands { Logger(`plugin-${path}`, LOG_LEVEL.VERBOSE); const wx = await this.localDatabase.getDBEntry(path, null, false, false); if (wx) { - const data = parseYaml(getDocData(wx.data)) as PluginDataEx; + const data = deserialize(getDocData(wx.data), {}) as PluginDataEx; const xFiles = [] as PluginDataExFile[]; for (const file of data.files) { const work = { ...file }; @@ -217,7 +234,7 @@ export class ConfigSync extends LiveSyncCommands { } )(plugin)); } - await Promise.all(processes); + await para.all(); let newList = [...this.pluginList]; for (const item of entries) { newList = newList.filter(x => x.documentPath != item.documentPath); @@ -241,9 +258,9 @@ export class ConfigSync extends LiveSyncCommands { const docB = await this.localDatabase.getDBEntry(dataB.documentPath); if (docA && docB) { - const pluginDataA = parseYaml(getDocData(docA.data)) as PluginDataEx; + const pluginDataA = deserialize(getDocData(docA.data), {}) as PluginDataEx; pluginDataA.documentPath = dataA.documentPath; - const pluginDataB = parseYaml(getDocData(docB.data)) as PluginDataEx; + const pluginDataB = deserialize(getDocData(docB.data), {}) as PluginDataEx; pluginDataB.documentPath = dataB.documentPath; // Use outer structure to wrap each data. @@ -282,7 +299,7 @@ export class ConfigSync extends LiveSyncCommands { if (dx == false) { throw "Not found on database" } - const loadedData = parseYaml(getDocData(dx.data)) as PluginDataEx; + const loadedData = deserialize(getDocData(dx.data), {}) as PluginDataEx; for (const f of loadedData.files) { Logger(`Applying ${f.filename} of ${data.displayName || data.name}..`); try { @@ -520,7 +537,7 @@ export class ConfigSync extends LiveSyncCommands { return } - const content = stringifyYaml(dt); + const content = serialize(dt); try { const old = await this.localDatabase.getDBEntryMeta(prefixedFileName, null, false); let saveData: LoadedEntry; diff --git a/src/CmdHiddenFileSync.ts b/src/CmdHiddenFileSync.ts index 30a8860..7990eca 100644 --- a/src/CmdHiddenFileSync.ts +++ b/src/CmdHiddenFileSync.ts @@ -1,14 +1,13 @@ import { Notice, normalizePath, PluginManifest } from "./deps"; import { EntryDoc, LoadedEntry, LOG_LEVEL, InternalFileEntry, FilePathWithPrefix, FilePath } from "./lib/src/types"; import { InternalFileInfo, ICHeader, ICHeaderEnd } from "./types"; -import { delay, isDocContentSame } from "./lib/src/utils"; +import { Parallels, delay, isDocContentSame } from "./lib/src/utils"; import { Logger } from "./lib/src/logger"; import { PouchDB } from "./lib/src/pouchdb-browser.js"; import { disposeMemoObject, memoIfNotExist, memoObject, retrieveMemoObject, scheduleTask, isInternalMetadata, PeriodicProcessor } from "./utils"; import { WrappedNotice } from "./lib/src/wrapper"; import { base64ToArrayBuffer, arrayBufferToBase64 } from "./lib/src/strbin"; import { runWithLock } from "./lib/src/lock"; -import { Semaphore } from "./lib/src/semaphore"; import { JsonResolveModal } from "./JsonResolveModal"; import { LiveSyncCommands } from "./LiveSyncCommands"; import { addPrefix, stripAllPrefixes } from "./lib/src/path"; @@ -254,37 +253,35 @@ export class HiddenFileSync extends LiveSyncCommands { c = pieces.shift(); } }; - const p = [] as Promise[]; - const semaphore = Semaphore(10); // Cache update time information for files which have already been processed (mainly for files that were skipped due to the same content) let caches: { [key: string]: { storageMtime: number; docMtime: number; }; } = {}; caches = await this.kvDB.get<{ [key: string]: { storageMtime: number; docMtime: number; }; }>("diff-caches-internal") || {}; + const filesMap = files.reduce((acc, cur) => { + acc[cur.path] = cur; + return acc; + }, {} as { [key: string]: InternalFileInfo; }); + const filesOnDBMap = filesOnDB.reduce((acc, cur) => { + acc[stripAllPrefixes(this.getPath(cur))] = cur; + return acc; + }, {} as { [key: string]: InternalFileEntry; }); + const para = Parallels(); for (const filename of allFileNames) { - if (!filename) continue; processed++; - if (processed % 100 == 0) + if (processed % 100 == 0) { Logger(`Hidden file: ${processed}/${fileCount}`, logLevel, "sync_internal"); + } + if (!filename) continue; if (ignorePatterns.some(e => filename.match(e))) continue; - const fileOnStorage = files.find(e => e.path == filename); - const fileOnDatabase = filesOnDB.find(e => stripAllPrefixes(this.getPath(e)) == filename); - const addProc = async (p: () => Promise): Promise => { - const releaser = await semaphore.acquire(1); - try { - return p(); - } catch (ex) { - Logger("Some process failed", logLevel); - Logger(ex); - } finally { - releaser(); - } - }; + const fileOnStorage = filename in filesMap ? filesMap[filename] : undefined; + const fileOnDatabase = filename in filesOnDBMap ? filesOnDBMap[filename] : undefined; + const cache = filename in caches ? caches[filename] : { storageMtime: 0, docMtime: 0 }; - p.push(addProc(async () => { - const xFileOnStorage = fileOnStorage; - const xFileOnDatabase = fileOnDatabase; + await para.wait(5); + const proc = (async (xFileOnStorage: InternalFileInfo, xFileOnDatabase: InternalFileEntry) => { + if (xFileOnStorage && xFileOnDatabase) { // Both => Synchronize if ((direction != "pullForce" && direction != "pushForce") && xFileOnDatabase.mtime == cache.docMtime && xFileOnStorage.mtime == cache.storageMtime) { @@ -326,9 +323,11 @@ export class HiddenFileSync extends LiveSyncCommands { throw new Error("Invalid state on hidden file sync"); // Something corrupted? } - })); + + }); + para.add(proc(fileOnStorage, fileOnDatabase)) } - await Promise.all(p); + await para.all(); await this.kvDB.set("diff-caches-internal", caches); // When files has been retrieved from the database. they must be reloaded. diff --git a/src/main.ts b/src/main.ts index bceb5a3..bca0275 100644 --- a/src/main.ts +++ b/src/main.ts @@ -4,7 +4,7 @@ import { Diff, DIFF_DELETE, DIFF_EQUAL, DIFF_INSERT, diff_match_patch } from "di import { debounce, Notice, Plugin, TFile, addIcon, TFolder, normalizePath, TAbstractFile, Editor, MarkdownView, RequestUrlParam, RequestUrlResponse, requestUrl } from "./deps"; import { EntryDoc, LoadedEntry, ObsidianLiveSyncSettings, diff_check_result, diff_result_leaf, EntryBody, LOG_LEVEL, VER, DEFAULT_SETTINGS, diff_result, FLAGMD_REDFLAG, SYNCINFO_ID, SALT_OF_PASSPHRASE, ConfigPassphraseStore, CouchDBConnection, FLAGMD_REDFLAG2, FLAGMD_REDFLAG3, PREFIXMD_LOGFILE, DatabaseConnectingStatus, EntryHasPath, DocumentID, FilePathWithPrefix, FilePath, AnyEntry } from "./lib/src/types"; import { InternalFileInfo, queueItem, CacheData, FileEventItem, FileWatchEventQueueMax } from "./types"; -import { getDocData, isDocContentSame } from "./lib/src/utils"; +import { getDocData, isDocContentSame, Parallels } from "./lib/src/utils"; import { Logger } from "./lib/src/logger"; import { PouchDB } from "./lib/src/pouchdb-browser.js"; import { LogDisplayModal } from "./LogDisplayModal"; @@ -1492,19 +1492,20 @@ export default class ObsidianLiveSyncPlugin extends Plugin } return proc.substring(0, p); } + const pendingTask = e.pending.length - ? "\nPending: " + - Object.entries(e.pending.reduce((p, c) => ({ ...p, [getProcKind(c)]: (p[getProcKind(c)] ?? 0) + 1 }), {} as { [key: string]: number })) - .map((e) => `${e[0]}${e[1] == 1 ? "" : `(${e[1]})`}`) - .join(", ") - : ""; + ? e.pending.length < 10 ? ("\nPending: " + + Object.entries(e.pending.reduce((p, c) => ({ ...p, [getProcKind(c)]: (p[getProcKind(c)] ?? 0) + 1 }), {} as { [key: string]: number })) + .map((e) => `${e[0]}${e[1] == 1 ? "" : `(${e[1]})`}`) + .join(", ") + ) : `\n Pending: ${e.pending.length}` : ""; const runningTask = e.running.length - ? "\nRunning: " + - Object.entries(e.running.reduce((p, c) => ({ ...p, [getProcKind(c)]: (p[getProcKind(c)] ?? 0) + 1 }), {} as { [key: string]: number })) - .map((e) => `${e[0]}${e[1] == 1 ? "" : `(${e[1]})`}`) - .join(", ") - : ""; + ? e.running.length < 10 ? ("\nRunning: " + + Object.entries(e.running.reduce((p, c) => ({ ...p, [getProcKind(c)]: (p[getProcKind(c)] ?? 0) + 1 }), {} as { [key: string]: number })) + .map((e) => `${e[0]}${e[1] == 1 ? "" : `(${e[1]})`}`) + .join(", ") + ) : `\n Running: ${e.running.length}` : ""; this.setStatusBarText(message + pendingTask + runningTask); }) } @@ -1667,25 +1668,23 @@ Or if you are sure know what had been happened, we can unlock the database from const runAll = async(procedureName: string, objects: T[], callback: (arg: T) => Promise) => { Logger(procedureName); - const semaphore = Semaphore(25); if (!this.localDatabase.isReady) throw Error("Database is not ready!"); - const processes = objects.map(e => (async (v) => { - const releaser = await semaphore.acquire(1, procedureName); + const para = Parallels(); + for (const v of objects) { + await para.wait(10); + para.add((async (v) => { + try { + await callback(v); + } catch (ex) { + Logger(`Error while ${procedureName}`, LOG_LEVEL.NOTICE); + Logger(ex); + } + })(v)); - try { - await callback(v); - } catch (ex) { - Logger(`Error while ${procedureName}`, LOG_LEVEL.NOTICE); - Logger(ex); - } finally { - releaser(); - } } - )(e)); - await Promise.all(processes); - + await para.all(); Logger(`${procedureName} done.`); - }; + } await runAll("UPDATE DATABASE", onlyInStorage, async (e) => { Logger(`UPDATE DATABASE ${e.path}`);