mirror of
https://github.com/vrtmrz/obsidian-livesync.git
synced 2026-06-20 05:10:15 +00:00
Fixed:
- Now fetch and unlock the locked remote database works well again. - No longer crash on symbolic links inside hidden folders. Improved: - Chunks are now created more efficiently. - Better performance in saving notes. - Network activities are indicated as an icon. - Less memory used for binary processing. Tidied: - Cleaned unused functions up. - Sorting out the codes that have become nonsense. Changed: - Now no longer `fetch chunks on demand` needs `Pacing replication`
This commit is contained in:
+68
-67
@@ -117,6 +117,8 @@ export default class ObsidianLiveSyncPlugin extends Plugin
|
||||
return this.isMobile;
|
||||
}
|
||||
|
||||
requestCount = reactiveSource(0);
|
||||
responseCount = reactiveSource(0);
|
||||
processReplication = (e: PouchDB.Core.ExistingDocument<EntryDoc>[]) => this.parseReplicationResult(e);
|
||||
async connectRemoteCouchDB(uri: string, auth: { username: string; password: string }, disableRequestURI: boolean, passphrase: string | false, useDynamicIterationCount: boolean, performSetup: boolean, skipInfo: boolean): Promise<string | { db: PouchDB.Database<EntryDoc>; info: PouchDB.Core.DatabaseInfo }> {
|
||||
if (!isValidRemoteCouchDBURI(uri)) return "Remote URI is not valid";
|
||||
@@ -172,6 +174,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin
|
||||
};
|
||||
|
||||
try {
|
||||
this.requestCount.value = this.requestCount.value + 1;
|
||||
const r = await fetchByAPI(requestParam);
|
||||
if (method == "POST" || method == "PUT") {
|
||||
this.last_successful_post = r.status - (r.status % 100) == 200;
|
||||
@@ -193,12 +196,15 @@ export default class ObsidianLiveSyncPlugin extends Plugin
|
||||
}
|
||||
Logger(ex);
|
||||
throw ex;
|
||||
} finally {
|
||||
this.responseCount.value = this.responseCount.value + 1;
|
||||
}
|
||||
}
|
||||
|
||||
// -old implementation
|
||||
|
||||
try {
|
||||
this.requestCount.value = this.requestCount.value + 1;
|
||||
const response: Response = await fetch(url, opts);
|
||||
if (method == "POST" || method == "PUT") {
|
||||
this.last_successful_post = response.ok;
|
||||
@@ -215,6 +221,8 @@ export default class ObsidianLiveSyncPlugin extends Plugin
|
||||
}
|
||||
Logger(ex);
|
||||
throw ex;
|
||||
} finally {
|
||||
this.responseCount.value = this.responseCount.value + 1;
|
||||
}
|
||||
// return await fetch(url, opts);
|
||||
},
|
||||
@@ -1329,7 +1337,7 @@ We can perform a command in this file.
|
||||
fireAndForget(() => this.checkAndApplySettingFromMarkdown(queue.args.file.path, true));
|
||||
const keyD1 = `file-last-proc-DELETED-${file.path}`;
|
||||
await this.kvDB.set(keyD1, mtime);
|
||||
if (!await this.updateIntoDB(targetFile, false, cache)) {
|
||||
if (!await this.updateIntoDB(targetFile, cache)) {
|
||||
Logger(`STORAGE -> DB: failed, cancel the relative operations: ${targetFile.path}`, LOG_LEVEL_INFO);
|
||||
// cancel running queues and remove one of atomic operation
|
||||
this.cancelRelativeEvent(queue);
|
||||
@@ -1402,7 +1410,7 @@ We can perform a command in this file.
|
||||
if (file instanceof TFile) {
|
||||
try {
|
||||
// Logger(`RENAMING.. ${file.path} into db`);
|
||||
if (await this.updateIntoDB(file, false, cache)) {
|
||||
if (await this.updateIntoDB(file, cache)) {
|
||||
// Logger(`deleted ${oldFile} from db`);
|
||||
await this.deleteFromDBbyPath(oldFile);
|
||||
} else {
|
||||
@@ -1785,6 +1793,10 @@ We can perform a command in this file.
|
||||
const labelConflictProcessCount = conflictProcessCount ? `🔩${conflictProcessCount} ` : "";
|
||||
return `${labelReplication}${labelDBCount}${labelStorageCount}${labelChunkCount}${labelPluginScanCount}${labelHiddenFilesCount}${labelConflictProcessCount}`;
|
||||
})
|
||||
const requestingStatLabel = reactive(() => {
|
||||
const diff = this.requestCount.value - this.responseCount.value;
|
||||
return diff != 0 ? "📲 " : "";
|
||||
})
|
||||
|
||||
const replicationStatLabel = reactive(() => {
|
||||
const e = this.replicationStat.value;
|
||||
@@ -1834,8 +1846,9 @@ We can perform a command in this file.
|
||||
const { w, sent, pushLast, arrived, pullLast } = replicationStatLabel.value;
|
||||
const queued = queueCountLabel.value;
|
||||
const waiting = waitingLabel.value;
|
||||
const networkActivity = requestingStatLabel.value;
|
||||
return {
|
||||
message: `Sync: ${w} ↑${sent}${pushLast} ↓${arrived}${pullLast}${waiting} ${queued}`,
|
||||
message: `${networkActivity}Sync: ${w} ↑${sent}${pushLast} ↓${arrived}${pullLast}${waiting} ${queued}`,
|
||||
};
|
||||
})
|
||||
const statusBarLabels = reactive(() => {
|
||||
@@ -2014,7 +2027,6 @@ Or if you are sure know what had been happened, we can unlock the database from
|
||||
}
|
||||
return;
|
||||
}
|
||||
let initialScan = false;
|
||||
if (showingNotice) {
|
||||
Logger("Initializing", LOG_LEVEL_NOTICE, "syncAll");
|
||||
}
|
||||
@@ -2047,11 +2059,7 @@ Or if you are sure know what had been happened, we can unlock the database from
|
||||
}
|
||||
Logger("Opening the key-value database", LOG_LEVEL_VERBOSE);
|
||||
const isInitialized = await (this.kvDB.get<boolean>("initialized")) || false;
|
||||
// Make chunk bigger if it is the initial scan. There must be non-active docs.
|
||||
if (filesDatabase.length == 0 && !isInitialized) {
|
||||
initialScan = true;
|
||||
Logger("Database looks empty, save files as initial sync data");
|
||||
}
|
||||
|
||||
const onlyInStorage = filesStorage.filter((e) => filesDatabase.indexOf(getPathFromTFile(e)) == -1);
|
||||
const onlyInDatabase = filesDatabase.filter((e) => filesStorageName.indexOf(e) == -1);
|
||||
|
||||
@@ -2092,69 +2100,62 @@ Or if you are sure know what had been happened, we can unlock the database from
|
||||
}
|
||||
initProcess.push(runAll("UPDATE DATABASE", onlyInStorage, async (e) => {
|
||||
if (!this.isFileSizeExceeded(e.stat.size)) {
|
||||
await this.updateIntoDB(e, initialScan);
|
||||
await this.updateIntoDB(e);
|
||||
fireAndForget(() => this.checkAndApplySettingFromMarkdown(e.path, true));
|
||||
} else {
|
||||
Logger(`UPDATE DATABASE: ${e.path} has been skipped due to file size exceeding the limit`, logLevel);
|
||||
}
|
||||
}));
|
||||
if (!initialScan) {
|
||||
initProcess.push(runAll("UPDATE STORAGE", onlyInDatabase, async (e) => {
|
||||
const w = await this.localDatabase.getDBEntryMeta(e, {}, true);
|
||||
if (w && !(w.deleted || w._deleted)) {
|
||||
if (!this.isFileSizeExceeded(w.size)) {
|
||||
await this.pullFile(e, filesStorage, false, null, false);
|
||||
fireAndForget(() => this.checkAndApplySettingFromMarkdown(e, true));
|
||||
Logger(`Check or pull from db:${e} OK`);
|
||||
} else {
|
||||
Logger(`UPDATE STORAGE: ${e} has been skipped due to file size exceeding the limit`, logLevel);
|
||||
}
|
||||
} else if (w) {
|
||||
Logger(`Deletion history skipped: ${e}`, LOG_LEVEL_VERBOSE);
|
||||
initProcess.push(runAll("UPDATE STORAGE", onlyInDatabase, async (e) => {
|
||||
const w = await this.localDatabase.getDBEntryMeta(e, {}, true);
|
||||
if (w && !(w.deleted || w._deleted)) {
|
||||
if (!this.isFileSizeExceeded(w.size)) {
|
||||
await this.pullFile(e, filesStorage, false, null, false);
|
||||
fireAndForget(() => this.checkAndApplySettingFromMarkdown(e, true));
|
||||
Logger(`Check or pull from db:${e} OK`);
|
||||
} else {
|
||||
Logger(`entry not found: ${e}`);
|
||||
Logger(`UPDATE STORAGE: ${e} has been skipped due to file size exceeding the limit`, logLevel);
|
||||
}
|
||||
}));
|
||||
}
|
||||
if (!initialScan) {
|
||||
// let caches: { [key: string]: { storageMtime: number; docMtime: number } } = {};
|
||||
// caches = await this.kvDB.get<{ [key: string]: { storageMtime: number; docMtime: number } }>("diff-caches") || {};
|
||||
type FileDocPair = { file: TFile, id: DocumentID };
|
||||
} else if (w) {
|
||||
Logger(`Deletion history skipped: ${e}`, LOG_LEVEL_VERBOSE);
|
||||
} else {
|
||||
Logger(`entry not found: ${e}`);
|
||||
}
|
||||
}));
|
||||
type FileDocPair = { file: TFile, id: DocumentID };
|
||||
|
||||
const processPrepareSyncFile = new QueueProcessor(
|
||||
async (files) => {
|
||||
const file = files[0];
|
||||
const id = await this.path2id(getPathFromTFile(file));
|
||||
const pair: FileDocPair = { file, id };
|
||||
return [pair];
|
||||
// processSyncFile.enqueue(pair);
|
||||
}
|
||||
, { batchSize: 1, concurrentLimit: 10, delay: 0, suspended: true }, syncFiles);
|
||||
processPrepareSyncFile
|
||||
.pipeTo(
|
||||
new QueueProcessor(
|
||||
async (pairs) => {
|
||||
const docs = await this.localDatabase.allDocsRaw<EntryDoc>({ keys: pairs.map(e => e.id), include_docs: true });
|
||||
const docsMap = docs.rows.reduce((p, c) => ({ ...p, [c.id]: c.doc }), {} as Record<DocumentID, EntryDoc>);
|
||||
const syncFilesToSync = pairs.map((e) => ({ file: e.file, doc: docsMap[e.id] as LoadedEntry }));
|
||||
return syncFilesToSync;
|
||||
}
|
||||
, { batchSize: 10, concurrentLimit: 5, delay: 10, suspended: false }))
|
||||
.pipeTo(
|
||||
new QueueProcessor(
|
||||
async (loadedPairs) => {
|
||||
const e = loadedPairs[0];
|
||||
await this.syncFileBetweenDBandStorage(e.file, e.doc, initialScan);
|
||||
return;
|
||||
}, { batchSize: 1, concurrentLimit: 5, delay: 10, suspended: false }
|
||||
))
|
||||
const processPrepareSyncFile = new QueueProcessor(
|
||||
async (files) => {
|
||||
const file = files[0];
|
||||
const id = await this.path2id(getPathFromTFile(file));
|
||||
const pair: FileDocPair = { file, id };
|
||||
return [pair];
|
||||
// processSyncFile.enqueue(pair);
|
||||
}
|
||||
, { batchSize: 1, concurrentLimit: 10, delay: 0, suspended: true }, syncFiles);
|
||||
processPrepareSyncFile
|
||||
.pipeTo(
|
||||
new QueueProcessor(
|
||||
async (pairs) => {
|
||||
const docs = await this.localDatabase.allDocsRaw<EntryDoc>({ keys: pairs.map(e => e.id), include_docs: true });
|
||||
const docsMap = docs.rows.reduce((p, c) => ({ ...p, [c.id]: c.doc }), {} as Record<DocumentID, EntryDoc>);
|
||||
const syncFilesToSync = pairs.map((e) => ({ file: e.file, doc: docsMap[e.id] as LoadedEntry }));
|
||||
return syncFilesToSync;
|
||||
}
|
||||
, { batchSize: 10, concurrentLimit: 5, delay: 10, suspended: false }))
|
||||
.pipeTo(
|
||||
new QueueProcessor(
|
||||
async (loadedPairs) => {
|
||||
const e = loadedPairs[0];
|
||||
await this.syncFileBetweenDBandStorage(e.file, e.doc);
|
||||
return;
|
||||
}, { batchSize: 1, concurrentLimit: 5, delay: 10, suspended: false }
|
||||
))
|
||||
|
||||
processPrepareSyncFile.startPipeline();
|
||||
initProcess.push(async () => {
|
||||
await processPrepareSyncFile.waitForPipeline();
|
||||
// await this.kvDB.set("diff-caches", caches);
|
||||
})
|
||||
}
|
||||
processPrepareSyncFile.startPipeline();
|
||||
initProcess.push(async () => {
|
||||
await processPrepareSyncFile.waitForPipeline();
|
||||
})
|
||||
await Promise.all(initProcess);
|
||||
|
||||
// this.setStatusBarText(`NOW TRACKING!`);
|
||||
@@ -2647,7 +2648,7 @@ Or if you are sure know what had been happened, we can unlock the database from
|
||||
//when to opened file;
|
||||
}
|
||||
|
||||
async syncFileBetweenDBandStorage(file: TFile, doc: LoadedEntry, initialScan: boolean) {
|
||||
async syncFileBetweenDBandStorage(file: TFile, doc: LoadedEntry) {
|
||||
if (!doc) {
|
||||
throw new Error(`Missing doc:${(file as any).path}`)
|
||||
}
|
||||
@@ -2665,7 +2666,7 @@ Or if you are sure know what had been happened, we can unlock the database from
|
||||
case BASE_IS_NEW:
|
||||
if (!this.isFileSizeExceeded(file.stat.size)) {
|
||||
Logger("STORAGE -> DB :" + file.path);
|
||||
await this.updateIntoDB(file, initialScan);
|
||||
await this.updateIntoDB(file);
|
||||
fireAndForget(() => this.checkAndApplySettingFromMarkdown(file.path, true));
|
||||
} else {
|
||||
Logger(`STORAGE -> DB : ${file.path} has been skipped due to file size exceeding the limit`, LOG_LEVEL_NOTICE);
|
||||
@@ -2694,7 +2695,7 @@ Or if you are sure know what had been happened, we can unlock the database from
|
||||
|
||||
}
|
||||
|
||||
async updateIntoDB(file: TFile, initialScan?: boolean, cache?: CacheData, force?: boolean) {
|
||||
async updateIntoDB(file: TFile, cache?: CacheData, force?: boolean) {
|
||||
if (!await this.isTargetFile(file)) return true;
|
||||
if (shouldBeIgnored(file.path)) {
|
||||
return true;
|
||||
@@ -2779,7 +2780,7 @@ Or if you are sure know what had been happened, we can unlock the database from
|
||||
Logger(msg + " Skip " + fullPath, LOG_LEVEL_VERBOSE);
|
||||
return true;
|
||||
}
|
||||
const ret = await this.localDatabase.putDBEntry(d, initialScan);
|
||||
const ret = await this.localDatabase.putDBEntry(d);
|
||||
if (ret !== false) {
|
||||
Logger(msg + fullPath);
|
||||
if (this.settings.syncOnSave && !this.suspended) {
|
||||
|
||||
Reference in New Issue
Block a user