### Behaviour change

- The plug-in automatically fetches the missing chunks even if `Fetch chunks on demand` is disabled.
    - This change is to avoid loss of data when receiving a bulk of revisions.
    - This can be prevented by enabling `Use Only Local Chunks` in the settings.
- Storage application now saved during each event and restored on startup.
- Synchronisation result application is also now saved during each event and restored on startup.
    - These may avoid some unexpected loss of data when the editor crashes.

### Fixed

- Now the plug-in waits for the application of pended batch changes before the synchronisation starts.
    - This may avoid some unexpected loss or unexpected conflicts.
      Plug-in sends custom headers correctly when RequestAPI is used.
- No longer causing unexpected chunk creation during `Reset synchronisation on This Device` with bucket sync.

### Refactored

- Synchronisation result application process has been refactored.
- Storage application process has been refactored.
    - Please report if you find any unexpected behaviour after this update. A bit of large refactoring.
This commit is contained in:
vorotamoroz
2025-12-10 09:45:57 +00:00
parent f06f8d1eb6
commit ac0378ca4b
12 changed files with 848 additions and 409 deletions

View File

@@ -14,34 +14,15 @@ import { isLockAcquired, shareRunningResult, skipIfDuplicated } from "octagonal-
import { balanceChunkPurgedDBs } from "@/lib/src/pouchdb/chunks";
import { purgeUnreferencedChunks } from "@/lib/src/pouchdb/chunks";
import { LiveSyncCouchDBReplicator } from "../../lib/src/replication/couchdb/LiveSyncReplicator";
import { throttle } from "octagonal-wheels/function";
import { arrayToChunkedArray } from "octagonal-wheels/collection";
import {
SYNCINFO_ID,
VER,
type EntryBody,
type EntryDoc,
type EntryLeaf,
type LoadedEntry,
type MetaEntry,
type RemoteType,
} from "../../lib/src/common/types";
import { QueueProcessor } from "octagonal-wheels/concurrency/processor";
import {
getPath,
isChunk,
isValidPath,
rateLimitedSharedExecution,
scheduleTask,
updatePreviousExecutionTime,
} from "../../common/utils";
import { isAnyNote } from "../../lib/src/common/utils";
import { type EntryDoc, type RemoteType } from "../../lib/src/common/types";
import { rateLimitedSharedExecution, scheduleTask, updatePreviousExecutionTime } from "../../common/utils";
import { EVENT_FILE_SAVED, EVENT_ON_UNRESOLVED_ERROR, EVENT_SETTING_SAVED, eventHub } from "../../common/events";
import type { LiveSyncAbstractReplicator } from "../../lib/src/replication/LiveSyncAbstractReplicator";
import { $msg } from "../../lib/src/common/i18n";
import { clearHandlers } from "../../lib/src/replication/SyncParamsHandler";
import type { LiveSyncCore } from "../../main";
import { ReplicateResultProcessor } from "./ReplicateResultProcessor";
const KEY_REPLICATION_ON_EVENT = "replicationOnEvent";
const REPLICATION_ON_EVENT_FORECASTED_TIME = 5000;
@@ -49,6 +30,7 @@ const REPLICATION_ON_EVENT_FORECASTED_TIME = 5000;
export class ModuleReplicator extends AbstractModule {
_replicatorType?: RemoteType;
_previousErrors = new Set<string>();
processor: ReplicateResultProcessor = new ReplicateResultProcessor(this);
showError(msg: string, max_log_level: LOG_LEVEL = LEVEL_NOTICE) {
const level = this._previousErrors.has(msg) ? LEVEL_INFO : max_log_level;
@@ -73,6 +55,11 @@ export class ModuleReplicator extends AbstractModule {
if (this._replicatorType !== setting.remoteType) {
void this.setReplicator();
}
if (this.core.settings.suspendParseReplicationResult) {
this.processor.suspend();
} else {
this.processor.resume();
}
});
return Promise.resolve(true);
@@ -103,6 +90,10 @@ export class ModuleReplicator extends AbstractModule {
_everyOnInitializeDatabase(db: LiveSyncLocalDB): Promise<boolean> {
return this.setReplicator();
}
_everyOnDatabaseInitialized(showNotice: boolean): Promise<boolean> {
fireAndForget(() => this.processor.restoreFromSnapshotOnce());
return Promise.resolve(true);
}
_everyOnResetDatabase(db: LiveSyncLocalDB): Promise<boolean> {
return this.setReplicator();
@@ -128,7 +119,7 @@ export class ModuleReplicator extends AbstractModule {
this.showError("Failed to initialise the encryption key, preventing replication.");
return false;
}
await this.loadQueuedFiles();
await this.processor.restoreFromSnapshotOnce();
this.clearErrors();
return true;
}
@@ -287,249 +278,10 @@ Even if you choose to clean up, you will see this option again if you exit Obsid
}
return await shareRunningResult(`replication`, () => this.services.replication.replicate());
}
_parseReplicationResult(docs: Array<PouchDB.Core.ExistingDocument<EntryDoc>>): void {
if (this.settings.suspendParseReplicationResult && !this.replicationResultProcessor.isSuspended) {
this.replicationResultProcessor.suspend();
}
this.replicationResultProcessor.enqueueAll(docs);
if (!this.settings.suspendParseReplicationResult && this.replicationResultProcessor.isSuspended) {
this.replicationResultProcessor.resume();
}
this.processor.enqueueAll(docs);
}
_saveQueuedFiles = throttle(() => {
const saveData = this.replicationResultProcessor._queue
.filter((e) => e !== undefined && e !== null)
.map((e) => e?._id ?? ("" as string)) as string[];
const kvDBKey = "queued-files";
// localStorage.setItem(lsKey, saveData);
fireAndForget(() => this.core.kvDB.set(kvDBKey, saveData));
}, 100);
saveQueuedFiles() {
this._saveQueuedFiles();
}
async loadQueuedFiles() {
if (this.settings.suspendParseReplicationResult) return;
if (!this.settings.isConfigured) return;
try {
const kvDBKey = "queued-files";
// const ids = [...new Set(JSON.parse(localStorage.getItem(lsKey) || "[]"))] as string[];
const ids = [...new Set((await this.core.kvDB.get<string[]>(kvDBKey)) ?? [])];
const batchSize = 100;
const chunkedIds = arrayToChunkedArray(ids, batchSize);
// suspendParseReplicationResult is true, so we have to resume it if it is suspended.
if (this.replicationResultProcessor.isSuspended) {
this.replicationResultProcessor.resume();
}
for await (const idsBatch of chunkedIds) {
const ret = await this.localDatabase.allDocsRaw<EntryDoc>({
keys: idsBatch,
include_docs: true,
limit: 100,
});
const docs = ret.rows
.filter((e) => e.doc)
.map((e) => e.doc) as PouchDB.Core.ExistingDocument<EntryDoc>[];
const errors = ret.rows.filter((e) => !e.doc && !e.value.deleted);
if (errors.length > 0) {
Logger("Some queued processes were not resurrected");
Logger(JSON.stringify(errors), LOG_LEVEL_VERBOSE);
}
this.replicationResultProcessor.enqueueAll(docs);
}
} catch (e) {
Logger(`Failed to load queued files.`, LOG_LEVEL_NOTICE);
Logger(e, LOG_LEVEL_VERBOSE);
} finally {
// Check again before awaiting,
if (this.replicationResultProcessor.isSuspended) {
this.replicationResultProcessor.resume();
}
}
// Wait for all queued files to be processed.
try {
await this.replicationResultProcessor.waitForAllProcessed();
} catch (e) {
Logger(`Failed to wait for all queued files to be processed.`, LOG_LEVEL_NOTICE);
Logger(e, LOG_LEVEL_VERBOSE);
}
}
replicationResultProcessor = new QueueProcessor(
async (docs: PouchDB.Core.ExistingDocument<EntryDoc>[]) => {
if (this.settings.suspendParseReplicationResult) return;
const change = docs[0];
if (!change) return;
if (isChunk(change._id)) {
this.localDatabase.onNewLeaf(change as EntryLeaf);
return;
}
if (await this.services.replication.processVirtualDocument(change)) return;
// any addon needs this item?
// for (const proc of this.core.addOns) {
// if (await proc.parseReplicationResultItem(change)) {
// return;
// }
// }
if (change.type == "versioninfo") {
if (change.version > VER) {
this.core.replicator.closeReplication();
Logger(
`Remote database updated to incompatible version. update your Self-hosted LiveSync plugin.`,
LOG_LEVEL_NOTICE
);
}
return;
}
if (
change._id == SYNCINFO_ID || // Synchronisation information data
change._id.startsWith("_design") //design document
) {
return;
}
if (isAnyNote(change)) {
const docPath = getPath(change);
if (!(await this.services.vault.isTargetFile(docPath))) {
Logger(`Skipped: ${docPath}`, LOG_LEVEL_VERBOSE);
return;
}
if (this.databaseQueuedProcessor._isSuspended) {
Logger(`Processing scheduled: ${docPath}`, LOG_LEVEL_INFO);
}
const size = change.size;
if (this.services.vault.isFileSizeTooLarge(size)) {
Logger(
`Processing ${docPath} has been skipped due to file size exceeding the limit`,
LOG_LEVEL_NOTICE
);
return;
}
this.databaseQueuedProcessor.enqueue(change);
}
return;
},
{
batchSize: 1,
suspended: true,
concurrentLimit: 100,
delay: 0,
totalRemainingReactiveSource: this.core.replicationResultCount,
}
)
.replaceEnqueueProcessor((queue, newItem) => {
const q = queue.filter((e) => e._id != newItem._id);
return [...q, newItem];
})
.startPipeline()
.onUpdateProgress(() => {
this.saveQueuedFiles();
});
async checkIsChangeRequiredForDatabaseProcessing(dbDoc: LoadedEntry): Promise<boolean> {
const path = getPath(dbDoc);
try {
const savedDoc = await this.localDatabase.getRaw<LoadedEntry>(dbDoc._id, {
conflicts: true,
revs_info: true,
});
const newRev = dbDoc._rev ?? "";
const latestRev = savedDoc._rev ?? "";
const revisions = savedDoc._revs_info?.map((e) => e.rev) ?? [];
if (savedDoc._conflicts && savedDoc._conflicts.length > 0) {
// There are conflicts, so we have to process it.
return true;
}
if (newRev == latestRev) {
// The latest revision. We need to process it.
return true;
}
const index = revisions.indexOf(newRev);
if (index >= 0) {
// the revision has been inserted before.
return false; // Already processed.
}
return true; // This mostly should not happen, but we have to process it just in case.
} catch (e: any) {
if ("status" in e && e.status == 404) {
return true;
// Not existing, so we have to process it.
} else {
Logger(
`Failed to get existing document for ${path} (${dbDoc._id.substring(0, 8)}, ${dbDoc._rev?.substring(0, 10)}) `,
LOG_LEVEL_NOTICE
);
Logger(e, LOG_LEVEL_VERBOSE);
return true;
}
}
return true;
}
databaseQueuedProcessor = new QueueProcessor(
async (docs: EntryBody[]) => {
const dbDoc = docs[0] as LoadedEntry; // It has no `data`
const path = getPath(dbDoc);
// If the document is existing with any revision, confirm that we have to process it.
const isRequired = await this.checkIsChangeRequiredForDatabaseProcessing(dbDoc);
if (!isRequired) {
Logger(`Skipped (Not latest): ${path} (${dbDoc._id.substring(0, 8)})`, LOG_LEVEL_VERBOSE);
return;
}
// If `Read chunks online` is disabled, chunks should be transferred before here.
// However, in some cases, chunks are after that. So, if missing chunks exist, we have to wait for them.
const doc = await this.localDatabase.getDBEntryFromMeta({ ...dbDoc }, false, true);
if (!doc) {
Logger(
`Something went wrong while gathering content of ${path} (${dbDoc._id.substring(0, 8)}, ${dbDoc._rev?.substring(0, 10)}) `,
LOG_LEVEL_NOTICE
);
return;
}
if (await this.services.replication.processOptionalSynchroniseResult(dbDoc)) {
// Already processed
} else if (isValidPath(getPath(doc))) {
this.storageApplyingProcessor.enqueue(doc as MetaEntry);
} else {
Logger(`Skipped: ${path} (${doc._id.substring(0, 8)})`, LOG_LEVEL_VERBOSE);
}
return;
},
{
suspended: true,
batchSize: 1,
concurrentLimit: 10,
yieldThreshold: 1,
delay: 0,
totalRemainingReactiveSource: this.core.databaseQueueCount,
}
)
.replaceEnqueueProcessor((queue, newItem) => {
const q = queue.filter((e) => e._id != newItem._id);
return [...q, newItem];
})
.startPipeline();
storageApplyingProcessor = new QueueProcessor(
async (docs: MetaEntry[]) => {
const entry = docs[0];
await this.services.replication.processSynchroniseResult(entry);
return;
},
{
suspended: true,
batchSize: 1,
concurrentLimit: 6,
yieldThreshold: 1,
delay: 0,
totalRemainingReactiveSource: this.core.storageApplyingCount,
}
)
.replaceEnqueueProcessor((queue, newItem) => {
const q = queue.filter((e) => e._id != newItem._id);
return [...q, newItem];
})
.startPipeline();
_everyBeforeSuspendProcess(): Promise<boolean> {
this.core.replicator?.closeReplication();
@@ -579,6 +331,7 @@ Even if you choose to clean up, you will see this option again if you exit Obsid
onBindFunction(core: LiveSyncCore, services: typeof core.services): void {
services.replicator.handleGetActiveReplicator(this._getReplicator.bind(this));
services.databaseEvents.handleOnDatabaseInitialisation(this._everyOnInitializeDatabase.bind(this));
services.databaseEvents.handleDatabaseInitialised(this._everyOnDatabaseInitialized.bind(this));
services.databaseEvents.handleOnResetDatabase(this._everyOnResetDatabase.bind(this));
services.appLifecycle.handleOnSettingLoaded(this._everyOnloadAfterLoadSettings.bind(this));
services.replication.handleParseSynchroniseResult(this._parseReplicationResult.bind(this));