mirror of
https://github.com/vrtmrz/obsidian-livesync.git
synced 2026-02-09 05:41:36 +00:00
Fixed:
- No longer missing tasks which have queued as the same key (e.g., for the same operation to the same file). - Some trivial issues have been fixed. New feature: - Reloading Obsidian can be scheduled until that file and database operations are stable.
This commit is contained in:
149
src/main.ts
149
src/main.ts
@@ -32,7 +32,7 @@ import { LogPaneView, VIEW_TYPE_LOG } from "./ui/LogPaneView.ts";
|
||||
import { LRUCache } from "./lib/src/memory/LRUCache.ts";
|
||||
import { SerializedFileAccess } from "./storages/SerializedFileAccess.js";
|
||||
import { QueueProcessor } from "./lib/src/concurrency/processor.js";
|
||||
import { reactive, reactiveSource } from "./lib/src/dataobject/reactive.js";
|
||||
import { reactive, reactiveSource, type ReactiveValue } from "./lib/src/dataobject/reactive.js";
|
||||
import { initializeStores } from "./common/stores.js";
|
||||
import { JournalSyncMinio } from "./lib/src/replication/journal/objectstore/JournalSyncMinio.js";
|
||||
import { LiveSyncJournalReplicator, type LiveSyncJournalReplicatorEnv } from "./lib/src/replication/journal/LiveSyncJournalReplicator.js";
|
||||
@@ -1422,55 +1422,62 @@ We can perform a command in this file.
|
||||
}
|
||||
async handleFileEvent(queue: FileEventItem): Promise<any> {
|
||||
const file = queue.args.file;
|
||||
const key = `file-last-proc-${queue.type}-${file.path}`;
|
||||
const last = Number(await this.kvDB.get(key) || 0);
|
||||
let mtime = file.mtime;
|
||||
if (queue.type == "DELETE") {
|
||||
await this.deleteFromDBbyPath(file.path);
|
||||
mtime = file.mtime - 1;
|
||||
const keyD1 = `file-last-proc-CREATE-${file.path}`;
|
||||
const keyD2 = `file-last-proc-CHANGED-${file.path}`;
|
||||
await this.kvDB.set(keyD1, mtime);
|
||||
await this.kvDB.set(keyD2, mtime);
|
||||
} else if (queue.type == "INTERNAL") {
|
||||
await this.addOnHiddenFileSync.watchVaultRawEventsAsync(file.path);
|
||||
await this.addOnConfigSync.watchVaultRawEventsAsync(file.path);
|
||||
} else {
|
||||
const targetFile = this.vaultAccess.getAbstractFileByPath(file.path);
|
||||
if (!(targetFile instanceof TFile)) {
|
||||
Logger(`Target file was not found: ${file.path}`, LOG_LEVEL_INFO);
|
||||
return;
|
||||
}
|
||||
if (file.mtime == last) {
|
||||
Logger(`File has been already scanned on ${queue.type}, skip: ${file.path}`, LOG_LEVEL_VERBOSE);
|
||||
return;
|
||||
}
|
||||
|
||||
// const cache = queue.args.cache;
|
||||
if (queue.type == "CREATE" || queue.type == "CHANGED") {
|
||||
fireAndForget(() => this.checkAndApplySettingFromMarkdown(queue.args.file.path, true));
|
||||
const keyD1 = `file-last-proc-DELETED-${file.path}`;
|
||||
const lockKey = `handleFile:${file.path}`;
|
||||
return await serialized(lockKey, async () => {
|
||||
const key = `file-last-proc-${queue.type}-${file.path}`;
|
||||
const last = Number(await this.kvDB.get(key) || 0);
|
||||
let mtime = file.mtime;
|
||||
if (queue.type == "DELETE") {
|
||||
await this.deleteFromDBbyPath(file.path);
|
||||
mtime = file.mtime - 1;
|
||||
const keyD1 = `file-last-proc-CREATE-${file.path}`;
|
||||
const keyD2 = `file-last-proc-CHANGED-${file.path}`;
|
||||
await this.kvDB.set(keyD1, mtime);
|
||||
if (!await this.updateIntoDB(targetFile, undefined)) {
|
||||
Logger(`STORAGE -> DB: failed, cancel the relative operations: ${targetFile.path}`, LOG_LEVEL_INFO);
|
||||
// cancel running queues and remove one of atomic operation
|
||||
this.cancelRelativeEvent(queue);
|
||||
await this.kvDB.set(keyD2, mtime);
|
||||
} else if (queue.type == "INTERNAL") {
|
||||
await this.addOnHiddenFileSync.watchVaultRawEventsAsync(file.path);
|
||||
await this.addOnConfigSync.watchVaultRawEventsAsync(file.path);
|
||||
} else {
|
||||
const targetFile = this.vaultAccess.getAbstractFileByPath(file.path);
|
||||
if (!(targetFile instanceof TFile)) {
|
||||
Logger(`Target file was not found: ${file.path}`, LOG_LEVEL_INFO);
|
||||
return;
|
||||
}
|
||||
if (file.mtime == last) {
|
||||
Logger(`File has been already scanned on ${queue.type}, skip: ${file.path}`, LOG_LEVEL_VERBOSE);
|
||||
return;
|
||||
}
|
||||
|
||||
// const cache = queue.args.cache;
|
||||
if (queue.type == "CREATE" || queue.type == "CHANGED") {
|
||||
fireAndForget(() => this.checkAndApplySettingFromMarkdown(queue.args.file.path, true));
|
||||
const keyD1 = `file-last-proc-DELETED-${file.path}`;
|
||||
await this.kvDB.set(keyD1, mtime);
|
||||
if (!await this.updateIntoDB(targetFile, undefined)) {
|
||||
Logger(`STORAGE -> DB: failed, cancel the relative operations: ${targetFile.path}`, LOG_LEVEL_INFO);
|
||||
// cancel running queues and remove one of atomic operation
|
||||
this.cancelRelativeEvent(queue);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (queue.type == "RENAME") {
|
||||
// Obsolete
|
||||
await this.watchVaultRenameAsync(targetFile, queue.args.oldPath);
|
||||
}
|
||||
}
|
||||
if (queue.type == "RENAME") {
|
||||
// Obsolete
|
||||
await this.watchVaultRenameAsync(targetFile, queue.args.oldPath);
|
||||
}
|
||||
}
|
||||
await this.kvDB.set(key, mtime);
|
||||
await this.kvDB.set(key, mtime);
|
||||
});
|
||||
}
|
||||
|
||||
pendingFileEventCount = reactiveSource(0);
|
||||
processingFileEventCount = reactiveSource(0);
|
||||
fileEventQueue =
|
||||
new QueueProcessor(
|
||||
(items: FileEventItem[]) => this.handleFileEvent(items[0]),
|
||||
async (items: FileEventItem[]) => {
|
||||
await this.handleFileEvent(items[0]);
|
||||
return []
|
||||
}
|
||||
,
|
||||
{ suspended: true, batchSize: 1, concurrentLimit: 5, delay: 100, yieldThreshold: FileWatchEventQueueMax, totalRemainingReactiveSource: this.pendingFileEventCount, processingEntitiesReactiveSource: this.processingFileEventCount }
|
||||
).replaceEnqueueProcessor((items, newItem) => this.queueNextFileEvent(items, newItem));
|
||||
|
||||
@@ -1894,7 +1901,7 @@ We can perform a command in this file.
|
||||
observeForLogs() {
|
||||
const padSpaces = `\u{2007}`.repeat(10);
|
||||
// const emptyMark = `\u{2003}`;
|
||||
const rerenderTimer = new Map<string, [ReturnType<typeof setTimeout>, number]>;
|
||||
const rerenderTimer = new Map<string, [ReturnType<typeof setTimeout>, number]>();
|
||||
const tick = reactiveSource(0);
|
||||
function padLeftSp(num: number, mark: string) {
|
||||
const numLen = `${num}`.length + 1;
|
||||
@@ -2005,9 +2012,9 @@ We can perform a command in this file.
|
||||
};
|
||||
})
|
||||
const statusBarLabels = reactive(() => {
|
||||
|
||||
const scheduleMessage = this.isReloadingScheduled ? `WARNING! RESTARTING OBSIDIAN IS SCHEDULED!\n` : "";
|
||||
const { message } = statusLineLabel.value;
|
||||
const status = this.statusLog.value;
|
||||
const status = scheduleMessage + this.statusLog.value;
|
||||
return {
|
||||
message, status
|
||||
}
|
||||
@@ -3174,5 +3181,61 @@ Or if you are sure know what had been happened, we can unlock the database from
|
||||
// @ts-ignore
|
||||
this.app.commands.executeCommandById(id)
|
||||
}
|
||||
|
||||
_totalProcessingCount?: ReactiveValue<number>;
|
||||
get isReloadingScheduled() {
|
||||
return this._totalProcessingCount !== undefined;
|
||||
}
|
||||
askReload(message?: string) {
|
||||
if (this.isReloadingScheduled) {
|
||||
Logger(`Reloading is already scheduled`, LOG_LEVEL_VERBOSE);
|
||||
return;
|
||||
}
|
||||
scheduleTask("configReload", 250, async () => {
|
||||
const RESTART_NOW = "Yes, restart immediately";
|
||||
const RESTART_AFTER_STABLE = "Yes, schedule a restart after stabilisation";
|
||||
const RETRY_LATER = "No, Leave it to me";
|
||||
const ret = await askSelectString(this.app, message || "Do you want to restart and reload Obsidian now?", [RESTART_AFTER_STABLE, RESTART_NOW, RETRY_LATER]);
|
||||
if (ret == RESTART_NOW) {
|
||||
this.performAppReload();
|
||||
} else if (ret == RESTART_AFTER_STABLE) {
|
||||
this.scheduleAppReload();
|
||||
}
|
||||
})
|
||||
}
|
||||
scheduleAppReload() {
|
||||
if (!this._totalProcessingCount) {
|
||||
const __tick = reactiveSource(0);
|
||||
this._totalProcessingCount = reactive(() => {
|
||||
const dbCount = this.databaseQueueCount.value;
|
||||
const replicationCount = this.replicationResultCount.value;
|
||||
const storageApplyingCount = this.storageApplyingCount.value;
|
||||
const chunkCount = collectingChunks.value;
|
||||
const pluginScanCount = pluginScanningCount.value;
|
||||
const hiddenFilesCount = hiddenFilesEventCount.value + hiddenFilesProcessingCount.value;
|
||||
const conflictProcessCount = this.conflictProcessQueueCount.value;
|
||||
const e = this.pendingFileEventCount.value;
|
||||
const proc = this.processingFileEventCount.value;
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
const __ = __tick.value;
|
||||
return dbCount + replicationCount + storageApplyingCount + chunkCount + pluginScanCount + hiddenFilesCount + conflictProcessCount + e + proc;
|
||||
})
|
||||
this.registerInterval(setInterval(() => {
|
||||
__tick.value++;
|
||||
}, 1000) as unknown as number);
|
||||
|
||||
let stableCheck = 3;
|
||||
this._totalProcessingCount.onChanged(e => {
|
||||
if (e.value == 0) {
|
||||
if (stableCheck-- <= 0) {
|
||||
this.performAppReload();
|
||||
}
|
||||
Logger(`Obsidian will be restarted soon! (Within ${stableCheck} seconds)`, LOG_LEVEL_NOTICE, "restart-notice");
|
||||
} else {
|
||||
stableCheck = 3;
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user