- Now the result of conflict resolution could be surely written into the storage.
- Deleted files can be handled correctly again in the history dialogue and conflict dialogue.
- Some wrong log messages were fixed.
- Change handling now has become more stable.
- Some event handling became to be safer.
Improved:
- Dumping document information shows conflicts and revisions.
- The timestamp-only differences can be surely cached.
- Timestamp difference detection can be rounded by two seconds.
Refactored:
- A bit of organisation to write the test.
This commit is contained in:
vorotamoroz
2024-01-30 17:31:02 +00:00
parent 93e7cbb133
commit fa3aa2702c
5 changed files with 167 additions and 94 deletions

View File

@@ -3,6 +3,8 @@ import { serialized } from "./lib/src/lock";
import type { FilePath } from "./lib/src/types"; import type { FilePath } from "./lib/src/types";
import { createBinaryBlob, isDocContentSame } from "./lib/src/utils"; import { createBinaryBlob, isDocContentSame } from "./lib/src/utils";
import type { InternalFileInfo } from "./types"; import type { InternalFileInfo } from "./types";
import { markChangesAreSame } from "./utils";
function getFileLockKey(file: TFile | TFolder | string) { function getFileLockKey(file: TFile | TFolder | string) {
return `fl:${typeof (file) == "string" ? file : file.path}`; return `fl:${typeof (file) == "string" ? file : file.path}`;
} }
@@ -16,6 +18,15 @@ function toArrayBuffer(arr: Uint8Array | ArrayBuffer | DataView): ArrayBufferLik
return arr; return arr;
} }
async function processReadFile<T>(file: TFile | TFolder | string, proc: () => Promise<T>) {
const ret = await serialized(getFileLockKey(file), () => proc());
return ret;
}
async function processWriteFile<T>(file: TFile | TFolder | string, proc: () => Promise<T>) {
const ret = await serialized(getFileLockKey(file), () => proc());
return ret;
}
export class SerializedFileAccess { export class SerializedFileAccess {
app: App app: App
constructor(app: App) { constructor(app: App) {
@@ -24,60 +35,64 @@ export class SerializedFileAccess {
async adapterStat(file: TFile | string) { async adapterStat(file: TFile | string) {
const path = file instanceof TFile ? file.path : file; const path = file instanceof TFile ? file.path : file;
return await serialized(getFileLockKey(path), () => this.app.vault.adapter.stat(path)); return await processReadFile(file, () => this.app.vault.adapter.stat(path));
} }
async adapterExists(file: TFile | string) { async adapterExists(file: TFile | string) {
const path = file instanceof TFile ? file.path : file; const path = file instanceof TFile ? file.path : file;
return await serialized(getFileLockKey(path), () => this.app.vault.adapter.exists(path)); return await processReadFile(file, () => this.app.vault.adapter.exists(path));
} }
async adapterRemove(file: TFile | string) { async adapterRemove(file: TFile | string) {
const path = file instanceof TFile ? file.path : file; const path = file instanceof TFile ? file.path : file;
return await serialized(getFileLockKey(path), () => this.app.vault.adapter.remove(path)); return await processReadFile(file, () => this.app.vault.adapter.remove(path));
} }
async adapterRead(file: TFile | string) { async adapterRead(file: TFile | string) {
const path = file instanceof TFile ? file.path : file; const path = file instanceof TFile ? file.path : file;
return await serialized(getFileLockKey(path), () => this.app.vault.adapter.read(path)); return await processReadFile(file, () => this.app.vault.adapter.read(path));
} }
async adapterReadBinary(file: TFile | string) { async adapterReadBinary(file: TFile | string) {
const path = file instanceof TFile ? file.path : file; const path = file instanceof TFile ? file.path : file;
return await serialized(getFileLockKey(path), () => this.app.vault.adapter.readBinary(path)); return await processReadFile(file, () => this.app.vault.adapter.readBinary(path));
} }
async adapterWrite(file: TFile | string, data: string | ArrayBuffer | Uint8Array, options?: DataWriteOptions) { async adapterWrite(file: TFile | string, data: string | ArrayBuffer | Uint8Array, options?: DataWriteOptions) {
const path = file instanceof TFile ? file.path : file; const path = file instanceof TFile ? file.path : file;
if (typeof (data) === "string") { if (typeof (data) === "string") {
return await serialized(getFileLockKey(path), () => this.app.vault.adapter.write(path, data, options)); return await processWriteFile(file, () => this.app.vault.adapter.write(path, data, options));
} else { } else {
return await serialized(getFileLockKey(path), () => this.app.vault.adapter.writeBinary(path, toArrayBuffer(data), options)); return await processWriteFile(file, () => this.app.vault.adapter.writeBinary(path, toArrayBuffer(data), options));
} }
} }
async vaultCacheRead(file: TFile) { async vaultCacheRead(file: TFile) {
return await serialized(getFileLockKey(file), () => this.app.vault.cachedRead(file)); return await processReadFile(file, () => this.app.vault.cachedRead(file));
} }
async vaultRead(file: TFile) { async vaultRead(file: TFile) {
return await serialized(getFileLockKey(file), () => this.app.vault.read(file)); return await processReadFile(file, () => this.app.vault.read(file));
} }
async vaultReadBinary(file: TFile) { async vaultReadBinary(file: TFile) {
return await serialized(getFileLockKey(file), () => this.app.vault.readBinary(file)); return await processReadFile(file, () => this.app.vault.readBinary(file));
} }
async vaultModify(file: TFile, data: string | ArrayBuffer | Uint8Array, options?: DataWriteOptions) { async vaultModify(file: TFile, data: string | ArrayBuffer | Uint8Array, options?: DataWriteOptions) {
if (typeof (data) === "string") { if (typeof (data) === "string") {
return await serialized(getFileLockKey(file), async () => { return await processWriteFile(file, async () => {
const oldData = await this.app.vault.read(file); const oldData = await this.app.vault.read(file);
if (data === oldData) return false if (data === oldData) {
markChangesAreSame(file, file.stat.mtime, options.mtime);
return false
}
await this.app.vault.modify(file, data, options) await this.app.vault.modify(file, data, options)
return true; return true;
} }
); );
} else { } else {
return await serialized(getFileLockKey(file), async () => { return await processWriteFile(file, async () => {
const oldData = await this.app.vault.readBinary(file); const oldData = await this.app.vault.readBinary(file);
if (await isDocContentSame(createBinaryBlob(oldData), createBinaryBlob(data))) { if (await isDocContentSame(createBinaryBlob(oldData), createBinaryBlob(data))) {
markChangesAreSame(file, file.stat.mtime, options.mtime);
return false; return false;
} }
await this.app.vault.modifyBinary(file, toArrayBuffer(data), options) await this.app.vault.modifyBinary(file, toArrayBuffer(data), options)
@@ -87,16 +102,16 @@ export class SerializedFileAccess {
} }
async vaultCreate(path: string, data: string | ArrayBuffer | Uint8Array, options?: DataWriteOptions): Promise<TFile> { async vaultCreate(path: string, data: string | ArrayBuffer | Uint8Array, options?: DataWriteOptions): Promise<TFile> {
if (typeof (data) === "string") { if (typeof (data) === "string") {
return await serialized(getFileLockKey(path), () => this.app.vault.create(path, data, options)); return await processWriteFile(path, () => this.app.vault.create(path, data, options));
} else { } else {
return await serialized(getFileLockKey(path), () => this.app.vault.createBinary(path, toArrayBuffer(data), options)); return await processWriteFile(path, () => this.app.vault.createBinary(path, toArrayBuffer(data), options));
} }
} }
async delete(file: TFile | TFolder, force = false) { async delete(file: TFile | TFolder, force = false) {
return await serialized(getFileLockKey(file), () => this.app.vault.delete(file, force)); return await processWriteFile(file, () => this.app.vault.delete(file, force));
} }
async trash(file: TFile | TFolder, force = false) { async trash(file: TFile | TFolder, force = false) {
return await serialized(getFileLockKey(file), () => this.app.vault.trash(file, force)); return await processWriteFile(file, () => this.app.vault.trash(file, force));
} }
getAbstractFileByPath(path: FilePath | string): TAbstractFile | null { getAbstractFileByPath(path: FilePath | string): TAbstractFile | null {

Submodule src/lib updated: dc9cbfe007...1d6c0cc6aa

View File

@@ -1,16 +1,16 @@
const isDebug = false; const isDebug = false;
import { type Diff, DIFF_DELETE, DIFF_EQUAL, DIFF_INSERT, diff_match_patch, stringifyYaml, parseYaml } from "./deps"; import { type Diff, DIFF_DELETE, DIFF_EQUAL, DIFF_INSERT, diff_match_patch, stringifyYaml, parseYaml } from "./deps";
import { debounce, Notice, Plugin, TFile, addIcon, TFolder, normalizePath, TAbstractFile, Editor, MarkdownView, type RequestUrlParam, type RequestUrlResponse, requestUrl, type MarkdownFileInfo } from "./deps"; import { Notice, Plugin, TFile, addIcon, TFolder, normalizePath, TAbstractFile, Editor, MarkdownView, type RequestUrlParam, type RequestUrlResponse, requestUrl, type MarkdownFileInfo } from "./deps";
import { type EntryDoc, type LoadedEntry, type ObsidianLiveSyncSettings, type diff_check_result, type diff_result_leaf, type EntryBody, LOG_LEVEL, VER, DEFAULT_SETTINGS, type diff_result, FLAGMD_REDFLAG, SYNCINFO_ID, SALT_OF_PASSPHRASE, type ConfigPassphraseStore, type CouchDBConnection, FLAGMD_REDFLAG2, FLAGMD_REDFLAG3, PREFIXMD_LOGFILE, type DatabaseConnectingStatus, type EntryHasPath, type DocumentID, type FilePathWithPrefix, type FilePath, type AnyEntry, LOG_LEVEL_DEBUG, LOG_LEVEL_INFO, LOG_LEVEL_NOTICE, LOG_LEVEL_URGENT, LOG_LEVEL_VERBOSE, type SavingEntry, MISSING_OR_ERROR, NOT_CONFLICTED, AUTO_MERGED, CANCELLED, LEAVE_TO_SUBSEQUENT, FLAGMD_REDFLAG2_HR, FLAGMD_REDFLAG3_HR, } from "./lib/src/types"; import { type EntryDoc, type LoadedEntry, type ObsidianLiveSyncSettings, type diff_check_result, type diff_result_leaf, type EntryBody, LOG_LEVEL, VER, DEFAULT_SETTINGS, type diff_result, FLAGMD_REDFLAG, SYNCINFO_ID, SALT_OF_PASSPHRASE, type ConfigPassphraseStore, type CouchDBConnection, FLAGMD_REDFLAG2, FLAGMD_REDFLAG3, PREFIXMD_LOGFILE, type DatabaseConnectingStatus, type EntryHasPath, type DocumentID, type FilePathWithPrefix, type FilePath, type AnyEntry, LOG_LEVEL_DEBUG, LOG_LEVEL_INFO, LOG_LEVEL_NOTICE, LOG_LEVEL_URGENT, LOG_LEVEL_VERBOSE, type SavingEntry, MISSING_OR_ERROR, NOT_CONFLICTED, AUTO_MERGED, CANCELLED, LEAVE_TO_SUBSEQUENT, FLAGMD_REDFLAG2_HR, FLAGMD_REDFLAG3_HR, } from "./lib/src/types";
import { type InternalFileInfo, type CacheData, type FileEventItem, FileWatchEventQueueMax } from "./types"; import { type InternalFileInfo, type CacheData, type FileEventItem, FileWatchEventQueueMax } from "./types";
import { createBinaryBlob, createTextBlob, fireAndForget, getDocData, isDocContentSame, isObjectDifferent, sendValue } from "./lib/src/utils"; import { arrayToChunkedArray, createBinaryBlob, createTextBlob, fireAndForget, getDocData, isDocContentSame, isObjectDifferent, sendValue } from "./lib/src/utils";
import { Logger, setGlobalLogFunction } from "./lib/src/logger"; import { Logger, setGlobalLogFunction } from "./lib/src/logger";
import { PouchDB } from "./lib/src/pouchdb-browser.js"; import { PouchDB } from "./lib/src/pouchdb-browser.js";
import { ConflictResolveModal } from "./ConflictResolveModal"; import { ConflictResolveModal } from "./ConflictResolveModal";
import { ObsidianLiveSyncSettingTab } from "./ObsidianLiveSyncSettingTab"; import { ObsidianLiveSyncSettingTab } from "./ObsidianLiveSyncSettingTab";
import { DocumentHistoryModal } from "./DocumentHistoryModal"; import { DocumentHistoryModal } from "./DocumentHistoryModal";
import { applyPatch, cancelAllPeriodicTask, cancelAllTasks, cancelTask, generatePatchObj, id2path, isObjectMargeApplicable, isSensibleMargeApplicable, flattenObject, path2id, scheduleTask, tryParseJSON, isValidPath, isInternalMetadata, isPluginMetadata, stripInternalMetadataPrefix, isChunk, askSelectString, askYesNo, askString, PeriodicProcessor, getPath, getPathWithoutPrefix, getPathFromTFile, performRebuildDB, memoIfNotExist, memoObject, retrieveMemoObject, disposeMemoObject, isCustomisationSyncMetadata } from "./utils"; import { applyPatch, cancelAllPeriodicTask, cancelAllTasks, cancelTask, generatePatchObj, id2path, isObjectMargeApplicable, isSensibleMargeApplicable, flattenObject, path2id, scheduleTask, tryParseJSON, isValidPath, isInternalMetadata, isPluginMetadata, stripInternalMetadataPrefix, isChunk, askSelectString, askYesNo, askString, PeriodicProcessor, getPath, getPathWithoutPrefix, getPathFromTFile, performRebuildDB, memoIfNotExist, memoObject, retrieveMemoObject, disposeMemoObject, isCustomisationSyncMetadata, compareFileFreshness, BASE_IS_NEW, TARGET_IS_NEW, EVEN, compareMTime, markChangesAreSame } from "./utils";
import { encrypt, tryDecrypt } from "./lib/src/e2ee_v2"; import { encrypt, tryDecrypt } from "./lib/src/e2ee_v2";
import { balanceChunkPurgedDBs, enableEncryption, isCloudantURI, isErrorOfMissingDoc, isValidRemoteCouchDBURI, purgeUnreferencedChunks } from "./lib/src/utils_couchdb"; import { balanceChunkPurgedDBs, enableEncryption, isCloudantURI, isErrorOfMissingDoc, isValidRemoteCouchDBURI, purgeUnreferencedChunks } from "./lib/src/utils_couchdb";
import { logStore, type LogEntry, collectingChunks, pluginScanningCount, hiddenFilesProcessingCount, hiddenFilesEventCount, logMessages } from "./lib/src/stores"; import { logStore, type LogEntry, collectingChunks, pluginScanningCount, hiddenFilesProcessingCount, hiddenFilesEventCount, logMessages } from "./lib/src/stores";
@@ -33,6 +33,7 @@ import { LRUCache } from "./lib/src/LRUCache";
import { SerializedFileAccess } from "./SerializedFileAccess.js"; import { SerializedFileAccess } from "./SerializedFileAccess.js";
import { KeyedQueueProcessor, QueueProcessor, type QueueItemWithKey } from "./lib/src/processor.js"; import { KeyedQueueProcessor, QueueProcessor, type QueueItemWithKey } from "./lib/src/processor.js";
import { reactive, reactiveSource } from "./lib/src/reactive.js"; import { reactive, reactiveSource } from "./lib/src/reactive.js";
import { initializeStores } from "./stores.js";
setNoticeClass(Notice); setNoticeClass(Notice);
@@ -728,9 +729,9 @@ Note: We can always able to read V1 format. It will be progressively converted.
this.packageVersion = packageVersion; this.packageVersion = packageVersion;
Logger(`Self-hosted LiveSync v${manifestVersion} ${packageVersion} `); Logger(`Self-hosted LiveSync v${manifestVersion} ${packageVersion} `);
await this.loadSettings();
const lsKey = "obsidian-live-sync-ver" + this.getVaultName(); const lsKey = "obsidian-live-sync-ver" + this.getVaultName();
const last_version = localStorage.getItem(lsKey); const last_version = localStorage.getItem(lsKey);
await this.loadSettings();
this.observeForLogs(); this.observeForLogs();
this.statusBar = this.addStatusBarItem(); this.statusBar = this.addStatusBarItem();
this.statusBar.addClass("syncstatusbar"); this.statusBar.addClass("syncstatusbar");
@@ -757,9 +758,9 @@ Note: We can always able to read V1 format. It will be progressively converted.
} }
localStorage.setItem(lsKey, `${VER}`); localStorage.setItem(lsKey, `${VER}`);
await this.openDatabase(); await this.openDatabase();
this.watchWorkspaceOpen = debounce(this.watchWorkspaceOpen.bind(this), 1000, false); this.watchWorkspaceOpen = this.watchWorkspaceOpen.bind(this);
this.watchWindowVisibility = debounce(this.watchWindowVisibility.bind(this), 1000, false); this.watchWindowVisibility = this.watchWindowVisibility.bind(this)
this.watchOnline = debounce(this.watchOnline.bind(this), 500, false); this.watchOnline = this.watchOnline.bind(this);
this.realizeSettingSyncMode = this.realizeSettingSyncMode.bind(this); this.realizeSettingSyncMode = this.realizeSettingSyncMode.bind(this);
this.parseReplicationResult = this.parseReplicationResult.bind(this); this.parseReplicationResult = this.parseReplicationResult.bind(this);
@@ -821,6 +822,7 @@ Note: We can always able to read V1 format. It will be progressively converted.
//@ts-ignore //@ts-ignore
this.isMobile = this.app.isMobile; this.isMobile = this.app.isMobile;
this.localDatabase = new LiveSyncLocalDB(vaultName, this); this.localDatabase = new LiveSyncLocalDB(vaultName, this);
initializeStores(vaultName);
return await this.localDatabase.initializeDatabase(); return await this.localDatabase.initializeDatabase();
} }
@@ -1175,7 +1177,7 @@ We can perform a command in this file.
watchOnline() { watchOnline() {
this.watchOnlineAsync(); scheduleTask("watch-online", 500, () => fireAndForget(() => this.watchOnlineAsync()));
} }
async watchOnlineAsync() { async watchOnlineAsync() {
// If some files were failed to retrieve, scan files again. // If some files were failed to retrieve, scan files again.
@@ -1186,7 +1188,7 @@ We can perform a command in this file.
} }
} }
watchWindowVisibility() { watchWindowVisibility() {
this.watchWindowVisibilityAsync(); scheduleTask("watch-window-visibility", 500, () => fireAndForget(() => this.watchWindowVisibilityAsync()));
} }
async watchWindowVisibilityAsync() { async watchWindowVisibilityAsync() {
@@ -1307,7 +1309,7 @@ We can perform a command in this file.
if (this.settings.suspendFileWatching) return; if (this.settings.suspendFileWatching) return;
if (!this.isReady) return; if (!this.isReady) return;
if (!file) return; if (!file) return;
this.watchWorkspaceOpenAsync(file); scheduleTask("watch-workspace-open", 500, () => fireAndForget(() => this.watchWorkspaceOpenAsync(file)));
} }
async watchWorkspaceOpenAsync(file: TFile) { async watchWorkspaceOpenAsync(file: TFile) {
@@ -1493,12 +1495,12 @@ We can perform a command in this file.
await this.deleteVaultItem(file); await this.deleteVaultItem(file);
} else { } else {
// Conflict has been resolved at this time, // Conflict has been resolved at this time,
await this.pullFile(path, null, force); await this.pullFile(path, null, true);
} }
return; return;
} }
const localMtime = ~~((file?.stat?.mtime || 0) / 1000);
const docMtime = ~~(docEntry.mtime / 1000); const compareResult = compareFileFreshness(file, docEntry);
const doc = existDoc; const doc = existDoc;
@@ -1506,7 +1508,8 @@ We can perform a command in this file.
Logger(msg + "ERROR, Invalid datatype: " + path + "(" + doc.datatype + ")", LOG_LEVEL_NOTICE); Logger(msg + "ERROR, Invalid datatype: " + path + "(" + doc.datatype + ")", LOG_LEVEL_NOTICE);
return; return;
} }
if (!force && localMtime >= docMtime) return; // if (!force && localMtime >= docMtime) return;
if (!force && (compareResult == BASE_IS_NEW || compareResult == EVEN)) return;
if (!isValidPath(path)) { if (!isValidPath(path)) {
Logger(msg + "ERROR, invalid path: " + path, LOG_LEVEL_NOTICE); Logger(msg + "ERROR, invalid path: " + path, LOG_LEVEL_NOTICE);
return; return;
@@ -1579,9 +1582,12 @@ We can perform a command in this file.
if (!this.settings.suspendParseReplicationResult) { if (!this.settings.suspendParseReplicationResult) {
const lsKey = "obsidian-livesync-queuefiles-" + this.getVaultName(); const lsKey = "obsidian-livesync-queuefiles-" + this.getVaultName();
const ids = [...new Set(JSON.parse(localStorage.getItem(lsKey) || "[]"))] as string[]; const ids = [...new Set(JSON.parse(localStorage.getItem(lsKey) || "[]"))] as string[];
const ret = await this.localDatabase.allDocsRaw<EntryDoc>({ keys: ids, include_docs: true }); const batchSize = 100;
for (const doc of ret.rows) { const chunkedIds = arrayToChunkedArray(ids, batchSize);
this.replicationResultProcessor.enqueue(doc.doc); for await (const idsBatch of chunkedIds) {
const ret = await this.localDatabase.allDocsRaw<EntryDoc>({ keys: idsBatch, include_docs: true, limit: 100 });
this.replicationResultProcessor.enqueueAll(ret.rows.map(doc => doc.doc));
await this.replicationResultProcessor.waitForPipeline();
} }
} }
} }
@@ -1667,17 +1673,18 @@ We can perform a command in this file.
this.databaseQueuedProcessor.enqueueWithKey(change.path, change); this.databaseQueuedProcessor.enqueueWithKey(change.path, change);
} }
return; return;
}, { batchSize: 1, suspended: true, concurrentLimit: 1, delay: 0, totalRemainingReactiveSource: this.replicationResultCount }).startPipeline().onUpdateProgress(() => { }, { batchSize: 1, suspended: true, concurrentLimit: 100, delay: 0, totalRemainingReactiveSource: this.replicationResultCount }).startPipeline().onUpdateProgress(() => {
this.saveQueuedFiles(); this.saveQueuedFiles();
}); });
//---> Sync //---> Sync
parseReplicationResult(docs: Array<PouchDB.Core.ExistingDocument<EntryDoc>>) { parseReplicationResult(docs: Array<PouchDB.Core.ExistingDocument<EntryDoc>>) {
if (this.settings.suspendParseReplicationResult) { if (this.settings.suspendParseReplicationResult) {
this.replicationResultProcessor.suspend() this.replicationResultProcessor.suspend()
} else {
this.replicationResultProcessor.resume()
} }
this.replicationResultProcessor.enqueueAll(docs); this.replicationResultProcessor.enqueueAll(docs);
if (!this.settings.suspendParseReplicationResult) {
this.replicationResultProcessor.resume()
}
} }
@@ -1991,7 +1998,6 @@ Or if you are sure know what had been happened, we can unlock the database from
const syncFiles = filesStorage.filter((e) => onlyInStorageNames.indexOf(e.path) == -1); const syncFiles = filesStorage.filter((e) => onlyInStorageNames.indexOf(e.path) == -1);
Logger("Updating database by new files"); Logger("Updating database by new files");
// this.setStatusBarText(`UPDATE DATABASE`);
const initProcess = []; const initProcess = [];
const logLevel = showingNotice ? LOG_LEVEL_NOTICE : LOG_LEVEL_INFO; const logLevel = showingNotice ? LOG_LEVEL_NOTICE : LOG_LEVEL_INFO;
@@ -2050,8 +2056,8 @@ Or if you are sure know what had been happened, we can unlock the database from
})); }));
} }
if (!initialScan) { if (!initialScan) {
let caches: { [key: string]: { storageMtime: number; docMtime: number } } = {}; // let caches: { [key: string]: { storageMtime: number; docMtime: number } } = {};
caches = await this.kvDB.get<{ [key: string]: { storageMtime: number; docMtime: number } }>("diff-caches") || {}; // caches = await this.kvDB.get<{ [key: string]: { storageMtime: number; docMtime: number } }>("diff-caches") || {};
type FileDocPair = { file: TFile, id: DocumentID }; type FileDocPair = { file: TFile, id: DocumentID };
const processPrepareSyncFile = new QueueProcessor( const processPrepareSyncFile = new QueueProcessor(
@@ -2077,7 +2083,7 @@ Or if you are sure know what had been happened, we can unlock the database from
new QueueProcessor( new QueueProcessor(
async (loadedPairs) => { async (loadedPairs) => {
const e = loadedPairs[0]; const e = loadedPairs[0];
await this.syncFileBetweenDBandStorage(e.file, e.doc, initialScan, caches); await this.syncFileBetweenDBandStorage(e.file, e.doc, initialScan);
return; return;
}, { batchSize: 1, concurrentLimit: 5, delay: 10, suspended: false } }, { batchSize: 1, concurrentLimit: 5, delay: 10, suspended: false }
)) ))
@@ -2085,7 +2091,7 @@ Or if you are sure know what had been happened, we can unlock the database from
processPrepareSyncFile.startPipeline(); processPrepareSyncFile.startPipeline();
initProcess.push(async () => { initProcess.push(async () => {
await processPrepareSyncFile.waitForPipeline(); await processPrepareSyncFile.waitForPipeline();
await this.kvDB.set("diff-caches", caches); // await this.kvDB.set("diff-caches", caches);
}) })
} }
await Promise.all(initProcess); await Promise.all(initProcess);
@@ -2134,6 +2140,10 @@ Or if you are sure know what had been happened, we can unlock the database from
if (baseLeaf == false || leftLeaf == false || rightLeaf == false) { if (baseLeaf == false || leftLeaf == false || rightLeaf == false) {
return false; return false;
} }
if (leftLeaf.deleted && rightLeaf.deleted) {
// Both are deleted
return false;
}
// diff between base and each revision // diff between base and each revision
const dmp = new diff_match_patch(); const dmp = new diff_match_patch();
const mapLeft = dmp.diff_linesToChars_(baseLeaf.data, leftLeaf.data); const mapLeft = dmp.diff_linesToChars_(baseLeaf.data, leftLeaf.data);
@@ -2294,6 +2304,9 @@ Or if you are sure know what had been happened, we can unlock the database from
if (baseLeaf == false || leftLeaf == false || rightLeaf == false) { if (baseLeaf == false || leftLeaf == false || rightLeaf == false) {
return false; return false;
} }
if (leftLeaf.deleted && rightLeaf.deleted) {
return false;
}
const baseObj = { data: tryParseJSON(baseLeaf.data, {}) } as Record<string | number | symbol, any>; const baseObj = { data: tryParseJSON(baseLeaf.data, {}) } as Record<string | number | symbol, any>;
const leftObj = { data: tryParseJSON(leftLeaf.data, {}) } as Record<string | number | symbol, any>; const leftObj = { data: tryParseJSON(leftLeaf.data, {}) } as Record<string | number | symbol, any>;
const rightObj = { data: tryParseJSON(rightLeaf.data, {}) } as Record<string | number | symbol, any>; const rightObj = { data: tryParseJSON(rightLeaf.data, {}) } as Record<string | number | symbol, any>;
@@ -2379,7 +2392,6 @@ Or if you are sure know what had been happened, we can unlock the database from
if (p != undefined) { if (p != undefined) {
// remove conflicted revision. // remove conflicted revision.
await this.localDatabase.deleteDBEntry(path, { rev: conflictedRev }); await this.localDatabase.deleteDBEntry(path, { rev: conflictedRev });
const file = this.vaultAccess.getAbstractFileByPath(stripAllPrefixes(path)) as TFile; const file = this.vaultAccess.getAbstractFileByPath(stripAllPrefixes(path)) as TFile;
if (file) { if (file) {
if (await this.vaultAccess.vaultModify(file, p)) { if (await this.vaultAccess.vaultModify(file, p)) {
@@ -2416,10 +2428,10 @@ Or if you are sure know what had been happened, we can unlock the database from
const isBinary = !isPlainText(path); const isBinary = !isPlainText(path);
const alwaysNewer = this.settings.resolveConflictsByNewerFile; const alwaysNewer = this.settings.resolveConflictsByNewerFile;
if (isSame || isBinary || alwaysNewer) { if (isSame || isBinary || alwaysNewer) {
const lMtime = ~~(leftLeaf.mtime / 1000); const result = compareMTime(leftLeaf.mtime, rightLeaf.mtime)
const rMtime = ~~(rightLeaf.mtime / 1000);
let loser = leftLeaf; let loser = leftLeaf;
if (lMtime > rMtime) { // if (lMtime > rMtime) {
if (result != TARGET_IS_NEW) {
loser = rightLeaf; loser = rightLeaf;
} }
await this.localDatabase.deleteDBEntry(path, { rev: loser.rev }); await this.localDatabase.deleteDBEntry(path, { rev: loser.rev });
@@ -2441,7 +2453,7 @@ Or if you are sure know what had been happened, we can unlock the database from
conflictProcessQueueCount = reactiveSource(0); conflictProcessQueueCount = reactiveSource(0);
conflictResolveQueue = conflictResolveQueue =
new KeyedQueueProcessor(async (entries: { filename: FilePathWithPrefix, file: TFile }[]) => { new KeyedQueueProcessor(async (entries: { filename: FilePathWithPrefix }[]) => {
const entry = entries[0]; const entry = entries[0];
const filename = entry.filename; const filename = entry.filename;
const conflictCheckResult = await this.checkConflictAndPerformAutoMerge(filename); const conflictCheckResult = await this.checkConflictAndPerformAutoMerge(filename);
@@ -2482,11 +2494,12 @@ Or if you are sure know what had been happened, we can unlock the database from
new QueueProcessor((files: FilePathWithPrefix[]) => { new QueueProcessor((files: FilePathWithPrefix[]) => {
const filename = files[0]; const filename = files[0];
const file = this.vaultAccess.getAbstractFileByPath(filename); const file = this.vaultAccess.getAbstractFileByPath(filename);
if (!file) return; // if (!file) return;
if (!(file instanceof TFile)) return; // if (!(file instanceof TFile)) return;
if ((file instanceof TFolder)) return;
// Check again? // Check again?
return [{ key: filename, entity: { filename, file } }]; return [{ key: filename, entity: { filename } }];
// this.conflictResolveQueue.enqueueWithKey(filename, { filename, file }); // this.conflictResolveQueue.enqueueWithKey(filename, { filename, file });
}, { }, {
suspended: false, batchSize: 1, concurrentLimit: 5, delay: 10, keepResultUntilDownstreamConnected: true, pipeTo: this.conflictResolveQueue, totalRemainingReactiveSource: this.conflictProcessQueueCount suspended: false, batchSize: 1, concurrentLimit: 5, delay: 10, keepResultUntilDownstreamConnected: true, pipeTo: this.conflictResolveQueue, totalRemainingReactiveSource: this.conflictProcessQueueCount
@@ -2573,7 +2586,7 @@ Or if you are sure know what had been happened, we can unlock the database from
//when to opened file; //when to opened file;
} }
async syncFileBetweenDBandStorage(file: TFile, doc: LoadedEntry, initialScan: boolean, caches: { [key: string]: { storageMtime: number; docMtime: number } }) { async syncFileBetweenDBandStorage(file: TFile, doc: LoadedEntry, initialScan: boolean) {
if (!doc) { if (!doc) {
throw new Error(`Missing doc:${(file as any).path}`) throw new Error(`Missing doc:${(file as any).path}`)
} }
@@ -2586,47 +2599,37 @@ Or if you are sure know what had been happened, we can unlock the database from
} }
} }
const storageMtime = ~~(file.stat.mtime / 1000); const compareResult = compareFileFreshness(file, doc);
const docMtime = ~~(doc.mtime / 1000); switch (compareResult) {
const dK = `${file.path}-diff`; case BASE_IS_NEW:
const isLastDiff = dK in caches ? caches[dK] : { storageMtime: 0, docMtime: 0 }; if (!this.isFileSizeExceeded(file.stat.size)) {
if (isLastDiff.docMtime == docMtime && isLastDiff.storageMtime == storageMtime) { Logger("STORAGE -> DB :" + file.path);
// Logger("STORAGE .. DB :" + file.path, LOG_LEVEL_VERBOSE); await this.updateIntoDB(file, initialScan);
caches[dK] = { storageMtime, docMtime }; fireAndForget(() => this.checkAndApplySettingFromMarkdown(file.path, true));
return caches; } else {
} Logger(`STORAGE -> DB : ${file.path} has been skipped due to file size exceeding the limit`, LOG_LEVEL_NOTICE);
if (storageMtime > docMtime) { }
//newer local file. break;
if (!this.isFileSizeExceeded(file.stat.size)) { case TARGET_IS_NEW:
Logger("STORAGE -> DB :" + file.path); if (!this.isFileSizeExceeded(doc.size)) {
Logger(`${storageMtime} > ${docMtime}`); Logger("STORAGE <- DB :" + file.path);
await this.updateIntoDB(file, initialScan); const docx = await this.localDatabase.getDBEntry(getPathFromTFile(file), null, false, false, true);
fireAndForget(() => this.checkAndApplySettingFromMarkdown(file.path, true)); if (docx != false) {
caches[dK] = { storageMtime, docMtime }; await this.processEntryDoc(docx, file);
return caches; } else {
} else { Logger(`STORAGE <- DB : Cloud not read ${file.path}, possibly deleted`, LOG_LEVEL_NOTICE);
Logger(`STORAGE -> DB : ${file.path} has been skipped due to file size exceeding the limit`, LOG_LEVEL_NOTICE); }
} return caches;
} else if (storageMtime < docMtime) {
//newer database file.
if (!this.isFileSizeExceeded(doc.size)) {
Logger("STORAGE <- DB :" + file.path);
Logger(`${storageMtime} < ${docMtime}`);
const docx = await this.localDatabase.getDBEntry(getPathFromTFile(file), null, false, false);
if (docx != false) {
await this.processEntryDoc(docx, file);
} else { } else {
Logger(`STORAGE <- DB : ${file.path} has been skipped due to file size exceeding the limit`, LOG_LEVEL_NOTICE); Logger(`STORAGE <- DB : ${file.path} has been skipped due to file size exceeding the limit`, LOG_LEVEL_NOTICE);
} }
caches[dK] = { storageMtime, docMtime }; break;
return caches; case EVEN:
} else { Logger("STORAGE == DB :" + file.path + "", LOG_LEVEL_VERBOSE);
Logger("STORAGE <- DB :" + file.path + " Skipped (size)"); break;
} default:
Logger("STORAGE ?? DB :" + file.path + " Something got weird");
} }
Logger("STORAGE == DB :" + file.path + "", LOG_LEVEL_VERBOSE);
caches[dK] = { storageMtime, docMtime };
return caches;
} }
@@ -2697,6 +2700,7 @@ Or if you are sure know what had been happened, we can unlock the database from
if (oldData.deleted != newData.deleted) return false; if (oldData.deleted != newData.deleted) return false;
if (!await isDocContentSame(old.data, newData.data)) return false; if (!await isDocContentSame(old.data, newData.data)) return false;
Logger(msg + "Skipped (not changed) " + fullPath + ((d._deleted || d.deleted) ? " (deleted)" : ""), LOG_LEVEL_VERBOSE); Logger(msg + "Skipped (not changed) " + fullPath + ((d._deleted || d.deleted) ? " (deleted)" : ""), LOG_LEVEL_VERBOSE);
markChangesAreSame(old, d.mtime, old.mtime);
return true; return true;
// d._rev = old._rev; // d._rev = old._rev;
} }
@@ -2715,10 +2719,11 @@ Or if you are sure know what had been happened, we can unlock the database from
return true; return true;
} }
const ret = await this.localDatabase.putDBEntry(d, initialScan); const ret = await this.localDatabase.putDBEntry(d, initialScan);
if (ret !== false) {
Logger(msg + fullPath); Logger(msg + fullPath);
if (this.settings.syncOnSave && !this.suspended) { if (this.settings.syncOnSave && !this.suspended) {
await this.replicate(); scheduleTask("perform-replicate-after-save", 250, () => this.replicate());
}
} }
return ret != false; return ret != false;
} }

7
src/stores.ts Normal file
View File

@@ -0,0 +1,7 @@
import { PersistentMap } from "./lib/src/PersistentMap";
export let sameChangePairs: PersistentMap<number[]>;
export function initializeStores(vaultName: string) {
sameChangePairs = new PersistentMap<number[]>(`ls-persist-same-changes-${vaultName}`);
}

View File

@@ -1,4 +1,4 @@
import { normalizePath, Platform, TAbstractFile, App, type RequestUrlParam, requestUrl } from "./deps"; import { normalizePath, Platform, TAbstractFile, App, type RequestUrlParam, requestUrl, TFile } from "./deps";
import { path2id_base, id2path_base, isValidFilenameInLinux, isValidFilenameInDarwin, isValidFilenameInWidows, isValidFilenameInAndroid, stripAllPrefixes } from "./lib/src/path"; import { path2id_base, id2path_base, isValidFilenameInLinux, isValidFilenameInDarwin, isValidFilenameInWidows, isValidFilenameInAndroid, stripAllPrefixes } from "./lib/src/path";
import { Logger } from "./lib/src/logger"; import { Logger } from "./lib/src/logger";
@@ -8,6 +8,7 @@ import { InputStringDialog, PopoverSelectString } from "./dialogs";
import type ObsidianLiveSyncPlugin from "./main"; import type ObsidianLiveSyncPlugin from "./main";
import { writeString } from "./lib/src/strbin"; import { writeString } from "./lib/src/strbin";
import { fireAndForget } from "./lib/src/utils"; import { fireAndForget } from "./lib/src/utils";
import { sameChangePairs } from "./stores";
export { scheduleTask, setPeriodicTask, cancelTask, cancelAllTasks, cancelPeriodicTask, cancelAllPeriodicTask, } from "./lib/src/task"; export { scheduleTask, setPeriodicTask, cancelTask, cancelAllTasks, cancelPeriodicTask, cancelAllPeriodicTask, } from "./lib/src/task";
@@ -415,3 +416,48 @@ export async function performRebuildDB(plugin: ObsidianLiveSyncPlugin, method: "
await plugin.addOnSetup.rebuildEverything(); await plugin.addOnSetup.rebuildEverything();
} }
} }
export const BASE_IS_NEW = Symbol("base");
export const TARGET_IS_NEW = Symbol("target");
export const EVEN = Symbol("even");
// Why 2000? : ZIP FILE Does not have enough resolution.
const resolution = 2000;
export function compareMTime(baseMTime: number, targetMTime: number): typeof BASE_IS_NEW | typeof TARGET_IS_NEW | typeof EVEN {
const truncatedBaseMTime = (~~(baseMTime / resolution)) * resolution;
const truncatedTargetMTime = (~~(targetMTime / resolution)) * resolution;
// Logger(`Resolution MTime ${truncatedBaseMTime} and ${truncatedTargetMTime} `, LOG_LEVEL_VERBOSE);
if (truncatedBaseMTime == truncatedTargetMTime) return EVEN;
if (truncatedBaseMTime > truncatedTargetMTime) return BASE_IS_NEW;
if (truncatedBaseMTime < truncatedTargetMTime) return TARGET_IS_NEW;
throw new Error("Unexpected error");
}
export function markChangesAreSame(file: TFile | AnyEntry | string, mtime1: number, mtime2: number) {
if (mtime1 === mtime2) return true;
const key = typeof file == "string" ? file : file instanceof TFile ? file.path : file.path ?? file._id;
const pairs = sameChangePairs.get(key, []);
if (pairs.some(e => e == mtime1 || e == mtime2)) {
sameChangePairs.set(key, [...new Set([...pairs, mtime1, mtime2])]);
} else {
sameChangePairs.set(key, [mtime1, mtime2]);
}
}
export function isMarkedAsSameChanges(file: TFile | AnyEntry | string, mtimes: number[]) {
const key = typeof file == "string" ? file : file instanceof TFile ? file.path : file.path ?? file._id;
const pairs = sameChangePairs.get(key, []);
if (mtimes.every(e => pairs.indexOf(e) !== -1)) {
return EVEN;
}
}
export function compareFileFreshness(baseFile: TFile | AnyEntry, checkTarget: TFile | AnyEntry): typeof BASE_IS_NEW | typeof TARGET_IS_NEW | typeof EVEN {
const modifiedBase = baseFile instanceof TFile ? baseFile?.stat?.mtime ?? 0 : baseFile?.mtime ?? 0;
const modifiedTarget = checkTarget instanceof TFile ? checkTarget?.stat?.mtime ?? 0 : checkTarget?.mtime ?? 0;
if (modifiedBase && modifiedTarget && isMarkedAsSameChanges(baseFile, [modifiedBase, modifiedTarget])) {
return EVEN;
}
return compareMTime(modifiedBase, modifiedTarget);
}