mirror of
https://github.com/vrtmrz/obsidian-livesync.git
synced 2025-12-13 01:35:57 +00:00
Fixed:
- Now hidden file synchronisation would not be hanged, even if so many files exist. Improved: - Customisation sync works more smoothly.
This commit is contained in:
@@ -1,13 +1,13 @@
|
||||
import { writable } from 'svelte/store';
|
||||
import { Notice, PluginManifest, stringifyYaml, parseYaml } from "./deps";
|
||||
import { Notice, PluginManifest, parseYaml } from "./deps";
|
||||
|
||||
import { EntryDoc, LoadedEntry, LOG_LEVEL, InternalFileEntry, FilePathWithPrefix, FilePath, DocumentID } from "./lib/src/types";
|
||||
import { ICXHeader, PERIODIC_PLUGIN_SWEEP, } from "./types";
|
||||
import { delay, getDocData } from "./lib/src/utils";
|
||||
import { Parallels, delay, getDocData } from "./lib/src/utils";
|
||||
import { Logger } from "./lib/src/logger";
|
||||
import { PouchDB } from "./lib/src/pouchdb-browser.js";
|
||||
import { WrappedNotice } from "./lib/src/wrapper";
|
||||
import { base64ToArrayBuffer, arrayBufferToBase64, readString, writeString, uint8ArrayToHexString } from "./lib/src/strbin";
|
||||
import { base64ToArrayBuffer, arrayBufferToBase64, readString, uint8ArrayToHexString } from "./lib/src/strbin";
|
||||
import { runWithLock } from "./lib/src/lock";
|
||||
import { LiveSyncCommands } from "./LiveSyncCommands";
|
||||
import { stripAllPrefixes } from "./lib/src/path";
|
||||
@@ -17,13 +17,29 @@ import { PluginDialogModal } from "./dialogs";
|
||||
import { JsonResolveModal } from "./JsonResolveModal";
|
||||
|
||||
|
||||
function serialize<T>(obj: T): string {
|
||||
return JSON.stringify(obj, null, 1);
|
||||
}
|
||||
function deserialize<T>(str: string, def: T) {
|
||||
try {
|
||||
return JSON.parse(str) as T;
|
||||
} catch (ex) {
|
||||
try {
|
||||
return parseYaml(str);
|
||||
} catch (ex) {
|
||||
return def;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
export const pluginList = writable([] as PluginDataExDisplay[]);
|
||||
export const pluginIsEnumerating = writable(false);
|
||||
|
||||
const encoder = new TextEncoder();
|
||||
const hashString = (async (key: string) => {
|
||||
const buff = writeString(key);
|
||||
// const buff = writeString(key);
|
||||
const buff = encoder.encode(key);
|
||||
const digest = await crypto.subtle.digest('SHA-256', buff);
|
||||
return uint8ArrayToHexString(new Uint8Array(digest));
|
||||
})
|
||||
@@ -171,7 +187,7 @@ export class ConfigSync extends LiveSyncCommands {
|
||||
const entries = [] as PluginDataExDisplay[]
|
||||
const plugins = this.localDatabase.findEntries(ICXHeader + "", `${ICXHeader}\u{10ffff}`, { include_docs: true });
|
||||
const semaphore = Semaphore(4);
|
||||
const processes = [] as Promise<void>[];
|
||||
const para = Parallels();
|
||||
let count = 0;
|
||||
pluginIsEnumerating.set(true);
|
||||
let processed = false;
|
||||
@@ -184,7 +200,8 @@ export class ConfigSync extends LiveSyncCommands {
|
||||
processed = true;
|
||||
const oldEntry = (this.pluginList.find(e => e.documentPath == path));
|
||||
if (oldEntry && oldEntry.mtime == plugin.mtime) continue;
|
||||
processes.push((async (v) => {
|
||||
await para.wait(5);
|
||||
para.add((async (v) => {
|
||||
|
||||
const release = await semaphore.acquire(1);
|
||||
try {
|
||||
@@ -193,7 +210,7 @@ export class ConfigSync extends LiveSyncCommands {
|
||||
Logger(`plugin-${path}`, LOG_LEVEL.VERBOSE);
|
||||
const wx = await this.localDatabase.getDBEntry(path, null, false, false);
|
||||
if (wx) {
|
||||
const data = parseYaml(getDocData(wx.data)) as PluginDataEx;
|
||||
const data = deserialize(getDocData(wx.data), {}) as PluginDataEx;
|
||||
const xFiles = [] as PluginDataExFile[];
|
||||
for (const file of data.files) {
|
||||
const work = { ...file };
|
||||
@@ -217,7 +234,7 @@ export class ConfigSync extends LiveSyncCommands {
|
||||
}
|
||||
)(plugin));
|
||||
}
|
||||
await Promise.all(processes);
|
||||
await para.all();
|
||||
let newList = [...this.pluginList];
|
||||
for (const item of entries) {
|
||||
newList = newList.filter(x => x.documentPath != item.documentPath);
|
||||
@@ -241,9 +258,9 @@ export class ConfigSync extends LiveSyncCommands {
|
||||
const docB = await this.localDatabase.getDBEntry(dataB.documentPath);
|
||||
|
||||
if (docA && docB) {
|
||||
const pluginDataA = parseYaml(getDocData(docA.data)) as PluginDataEx;
|
||||
const pluginDataA = deserialize(getDocData(docA.data), {}) as PluginDataEx;
|
||||
pluginDataA.documentPath = dataA.documentPath;
|
||||
const pluginDataB = parseYaml(getDocData(docB.data)) as PluginDataEx;
|
||||
const pluginDataB = deserialize(getDocData(docB.data), {}) as PluginDataEx;
|
||||
pluginDataB.documentPath = dataB.documentPath;
|
||||
|
||||
// Use outer structure to wrap each data.
|
||||
@@ -282,7 +299,7 @@ export class ConfigSync extends LiveSyncCommands {
|
||||
if (dx == false) {
|
||||
throw "Not found on database"
|
||||
}
|
||||
const loadedData = parseYaml(getDocData(dx.data)) as PluginDataEx;
|
||||
const loadedData = deserialize(getDocData(dx.data), {}) as PluginDataEx;
|
||||
for (const f of loadedData.files) {
|
||||
Logger(`Applying ${f.filename} of ${data.displayName || data.name}..`);
|
||||
try {
|
||||
@@ -520,7 +537,7 @@ export class ConfigSync extends LiveSyncCommands {
|
||||
return
|
||||
}
|
||||
|
||||
const content = stringifyYaml(dt);
|
||||
const content = serialize(dt);
|
||||
try {
|
||||
const old = await this.localDatabase.getDBEntryMeta(prefixedFileName, null, false);
|
||||
let saveData: LoadedEntry;
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
import { Notice, normalizePath, PluginManifest } from "./deps";
|
||||
import { EntryDoc, LoadedEntry, LOG_LEVEL, InternalFileEntry, FilePathWithPrefix, FilePath } from "./lib/src/types";
|
||||
import { InternalFileInfo, ICHeader, ICHeaderEnd } from "./types";
|
||||
import { delay, isDocContentSame } from "./lib/src/utils";
|
||||
import { Parallels, delay, isDocContentSame } from "./lib/src/utils";
|
||||
import { Logger } from "./lib/src/logger";
|
||||
import { PouchDB } from "./lib/src/pouchdb-browser.js";
|
||||
import { disposeMemoObject, memoIfNotExist, memoObject, retrieveMemoObject, scheduleTask, isInternalMetadata, PeriodicProcessor } from "./utils";
|
||||
import { WrappedNotice } from "./lib/src/wrapper";
|
||||
import { base64ToArrayBuffer, arrayBufferToBase64 } from "./lib/src/strbin";
|
||||
import { runWithLock } from "./lib/src/lock";
|
||||
import { Semaphore } from "./lib/src/semaphore";
|
||||
import { JsonResolveModal } from "./JsonResolveModal";
|
||||
import { LiveSyncCommands } from "./LiveSyncCommands";
|
||||
import { addPrefix, stripAllPrefixes } from "./lib/src/path";
|
||||
@@ -254,37 +253,35 @@ export class HiddenFileSync extends LiveSyncCommands {
|
||||
c = pieces.shift();
|
||||
}
|
||||
};
|
||||
const p = [] as Promise<void>[];
|
||||
const semaphore = Semaphore(10);
|
||||
// Cache update time information for files which have already been processed (mainly for files that were skipped due to the same content)
|
||||
let caches: { [key: string]: { storageMtime: number; docMtime: number; }; } = {};
|
||||
caches = await this.kvDB.get<{ [key: string]: { storageMtime: number; docMtime: number; }; }>("diff-caches-internal") || {};
|
||||
const filesMap = files.reduce((acc, cur) => {
|
||||
acc[cur.path] = cur;
|
||||
return acc;
|
||||
}, {} as { [key: string]: InternalFileInfo; });
|
||||
const filesOnDBMap = filesOnDB.reduce((acc, cur) => {
|
||||
acc[stripAllPrefixes(this.getPath(cur))] = cur;
|
||||
return acc;
|
||||
}, {} as { [key: string]: InternalFileEntry; });
|
||||
const para = Parallels();
|
||||
for (const filename of allFileNames) {
|
||||
if (!filename) continue;
|
||||
processed++;
|
||||
if (processed % 100 == 0)
|
||||
if (processed % 100 == 0) {
|
||||
Logger(`Hidden file: ${processed}/${fileCount}`, logLevel, "sync_internal");
|
||||
}
|
||||
if (!filename) continue;
|
||||
if (ignorePatterns.some(e => filename.match(e)))
|
||||
continue;
|
||||
|
||||
const fileOnStorage = files.find(e => e.path == filename);
|
||||
const fileOnDatabase = filesOnDB.find(e => stripAllPrefixes(this.getPath(e)) == filename);
|
||||
const addProc = async (p: () => Promise<void>): Promise<void> => {
|
||||
const releaser = await semaphore.acquire(1);
|
||||
try {
|
||||
return p();
|
||||
} catch (ex) {
|
||||
Logger("Some process failed", logLevel);
|
||||
Logger(ex);
|
||||
} finally {
|
||||
releaser();
|
||||
}
|
||||
};
|
||||
const fileOnStorage = filename in filesMap ? filesMap[filename] : undefined;
|
||||
const fileOnDatabase = filename in filesOnDBMap ? filesOnDBMap[filename] : undefined;
|
||||
|
||||
const cache = filename in caches ? caches[filename] : { storageMtime: 0, docMtime: 0 };
|
||||
|
||||
p.push(addProc(async () => {
|
||||
const xFileOnStorage = fileOnStorage;
|
||||
const xFileOnDatabase = fileOnDatabase;
|
||||
await para.wait(5);
|
||||
const proc = (async (xFileOnStorage: InternalFileInfo, xFileOnDatabase: InternalFileEntry) => {
|
||||
|
||||
if (xFileOnStorage && xFileOnDatabase) {
|
||||
// Both => Synchronize
|
||||
if ((direction != "pullForce" && direction != "pushForce") && xFileOnDatabase.mtime == cache.docMtime && xFileOnStorage.mtime == cache.storageMtime) {
|
||||
@@ -326,9 +323,11 @@ export class HiddenFileSync extends LiveSyncCommands {
|
||||
throw new Error("Invalid state on hidden file sync");
|
||||
// Something corrupted?
|
||||
}
|
||||
}));
|
||||
|
||||
});
|
||||
para.add(proc(fileOnStorage, fileOnDatabase))
|
||||
}
|
||||
await Promise.all(p);
|
||||
await para.all();
|
||||
await this.kvDB.set("diff-caches-internal", caches);
|
||||
|
||||
// When files has been retrieved from the database. they must be reloaded.
|
||||
|
||||
51
src/main.ts
51
src/main.ts
@@ -4,7 +4,7 @@ import { Diff, DIFF_DELETE, DIFF_EQUAL, DIFF_INSERT, diff_match_patch } from "di
|
||||
import { debounce, Notice, Plugin, TFile, addIcon, TFolder, normalizePath, TAbstractFile, Editor, MarkdownView, RequestUrlParam, RequestUrlResponse, requestUrl } from "./deps";
|
||||
import { EntryDoc, LoadedEntry, ObsidianLiveSyncSettings, diff_check_result, diff_result_leaf, EntryBody, LOG_LEVEL, VER, DEFAULT_SETTINGS, diff_result, FLAGMD_REDFLAG, SYNCINFO_ID, SALT_OF_PASSPHRASE, ConfigPassphraseStore, CouchDBConnection, FLAGMD_REDFLAG2, FLAGMD_REDFLAG3, PREFIXMD_LOGFILE, DatabaseConnectingStatus, EntryHasPath, DocumentID, FilePathWithPrefix, FilePath, AnyEntry } from "./lib/src/types";
|
||||
import { InternalFileInfo, queueItem, CacheData, FileEventItem, FileWatchEventQueueMax } from "./types";
|
||||
import { getDocData, isDocContentSame } from "./lib/src/utils";
|
||||
import { getDocData, isDocContentSame, Parallels } from "./lib/src/utils";
|
||||
import { Logger } from "./lib/src/logger";
|
||||
import { PouchDB } from "./lib/src/pouchdb-browser.js";
|
||||
import { LogDisplayModal } from "./LogDisplayModal";
|
||||
@@ -1492,19 +1492,20 @@ export default class ObsidianLiveSyncPlugin extends Plugin
|
||||
}
|
||||
return proc.substring(0, p);
|
||||
}
|
||||
|
||||
const pendingTask = e.pending.length
|
||||
? "\nPending: " +
|
||||
Object.entries(e.pending.reduce((p, c) => ({ ...p, [getProcKind(c)]: (p[getProcKind(c)] ?? 0) + 1 }), {} as { [key: string]: number }))
|
||||
.map((e) => `${e[0]}${e[1] == 1 ? "" : `(${e[1]})`}`)
|
||||
.join(", ")
|
||||
: "";
|
||||
? e.pending.length < 10 ? ("\nPending: " +
|
||||
Object.entries(e.pending.reduce((p, c) => ({ ...p, [getProcKind(c)]: (p[getProcKind(c)] ?? 0) + 1 }), {} as { [key: string]: number }))
|
||||
.map((e) => `${e[0]}${e[1] == 1 ? "" : `(${e[1]})`}`)
|
||||
.join(", ")
|
||||
) : `\n Pending: ${e.pending.length}` : "";
|
||||
|
||||
const runningTask = e.running.length
|
||||
? "\nRunning: " +
|
||||
Object.entries(e.running.reduce((p, c) => ({ ...p, [getProcKind(c)]: (p[getProcKind(c)] ?? 0) + 1 }), {} as { [key: string]: number }))
|
||||
.map((e) => `${e[0]}${e[1] == 1 ? "" : `(${e[1]})`}`)
|
||||
.join(", ")
|
||||
: "";
|
||||
? e.running.length < 10 ? ("\nRunning: " +
|
||||
Object.entries(e.running.reduce((p, c) => ({ ...p, [getProcKind(c)]: (p[getProcKind(c)] ?? 0) + 1 }), {} as { [key: string]: number }))
|
||||
.map((e) => `${e[0]}${e[1] == 1 ? "" : `(${e[1]})`}`)
|
||||
.join(", ")
|
||||
) : `\n Running: ${e.running.length}` : "";
|
||||
this.setStatusBarText(message + pendingTask + runningTask);
|
||||
})
|
||||
}
|
||||
@@ -1667,25 +1668,23 @@ Or if you are sure know what had been happened, we can unlock the database from
|
||||
|
||||
const runAll = async<T>(procedureName: string, objects: T[], callback: (arg: T) => Promise<void>) => {
|
||||
Logger(procedureName);
|
||||
const semaphore = Semaphore(25);
|
||||
if (!this.localDatabase.isReady) throw Error("Database is not ready!");
|
||||
const processes = objects.map(e => (async (v) => {
|
||||
const releaser = await semaphore.acquire(1, procedureName);
|
||||
const para = Parallels();
|
||||
for (const v of objects) {
|
||||
await para.wait(10);
|
||||
para.add((async (v) => {
|
||||
try {
|
||||
await callback(v);
|
||||
} catch (ex) {
|
||||
Logger(`Error while ${procedureName}`, LOG_LEVEL.NOTICE);
|
||||
Logger(ex);
|
||||
}
|
||||
})(v));
|
||||
|
||||
try {
|
||||
await callback(v);
|
||||
} catch (ex) {
|
||||
Logger(`Error while ${procedureName}`, LOG_LEVEL.NOTICE);
|
||||
Logger(ex);
|
||||
} finally {
|
||||
releaser();
|
||||
}
|
||||
}
|
||||
)(e));
|
||||
await Promise.all(processes);
|
||||
|
||||
await para.all();
|
||||
Logger(`${procedureName} done.`);
|
||||
};
|
||||
}
|
||||
|
||||
await runAll("UPDATE DATABASE", onlyInStorage, async (e) => {
|
||||
Logger(`UPDATE DATABASE ${e.path}`);
|
||||
|
||||
Reference in New Issue
Block a user