Fixed and implemented:

- New configuration to solve synchronization failure on large vault.
- Preset misconfigurations
- Sometimes hanged on replication.
- Wrote documents
This commit is contained in:
vorotamoroz
2021-12-23 13:22:46 +09:00
parent abe613539b
commit 96165b4f9b
8 changed files with 362 additions and 174 deletions

View File

@@ -756,8 +756,8 @@ export class LocalPouchDB {
}
const syncOptionBase: PouchDB.Replication.SyncOptions = {
batch_size: 250,
batches_limit: 40,
batches_limit: setting.batches_limit,
batch_size: setting.batch_size,
};
const db = dbret.db;
@@ -851,8 +851,8 @@ export class LocalPouchDB {
}
const syncOptionBase: PouchDB.Replication.SyncOptions = {
batch_size: 250,
batches_limit: 40,
batches_limit: setting.batches_limit,
batch_size: setting.batch_size,
};
const syncOption: PouchDB.Replication.SyncOptions = keepAlive ? { live: true, retry: true, heartbeat: 30000, ...syncOptionBase } : { ...syncOptionBase };
@@ -876,87 +876,89 @@ export class LocalPouchDB {
//replicate once
this.syncStatus = "STARTED";
return new Promise<boolean>(async (res, rej) => {
let resolved = false;
const _openReplicationSync = () => {
this.syncHandler = this.cancelHandler(this.syncHandler);
this.syncHandler = this.localDatabase.sync<EntryDoc>(db, syncOption);
this.syncHandler
.on("active", () => {
this.syncStatus = "CONNECTED";
this.updateInfo();
Logger("Replication activated");
})
.on("change", async (e) => {
try {
if (e.direction == "pull") {
// console.log(`pulled data:${e.change.docs.map((e) => e._id).join(",")}`);
await callback(e.change.docs);
Logger(`replicated ${e.change.docs_read} doc(s)`);
this.docArrived += e.change.docs.length;
} else {
// console.log(`put data:${e.change.docs.map((e) => e._id).join(",")}`);
this.docSent += e.change.docs.length;
}
if (notice != null) {
notice.setMessage(`${e.change.docs_written}${e.change.docs_read}`);
}
this.updateInfo();
} catch (ex) {
Logger("Replication callback error");
Logger(ex);
}
})
.on("complete", (e) => {
this.syncStatus = "COMPLETED";
this.updateInfo();
Logger("Replication completed", showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO);
if (notice != null) notice.hide();
if (!keepAlive) {
this.syncHandler = this.cancelHandler(this.syncHandler);
// if keep alive runnning, resolve here,
res(true);
}
})
.on("denied", (e) => {
this.syncStatus = "ERRORED";
this.updateInfo();
this.syncHandler = this.cancelHandler(this.syncHandler);
if (notice != null) notice.hide();
Logger("Replication denied", LOG_LEVEL.NOTICE);
// Logger(e);
rej(e);
})
.on("error", (e) => {
this.syncStatus = "ERRORED";
this.syncHandler = this.cancelHandler(this.syncHandler);
this.updateInfo();
if (notice != null) notice.hide();
Logger("Replication error", LOG_LEVEL.NOTICE);
// Logger(e);
rej(e);
})
.on("paused", (e) => {
this.syncStatus = "PAUSED";
this.updateInfo();
if (notice != null) notice.hide();
Logger("replication paused", LOG_LEVEL.VERBOSE);
if (keepAlive && !resolved) {
// if keep alive runnning, resolve here,
resolved = true;
res(true);
}
// Logger(e);
});
};
if (!keepAlive) {
return await _openReplicationSync();
}
let resolved = false;
const docArrivedOnStart = this.docArrived;
const docSentOnStart = this.docSent;
const _openReplicationSync = () => {
Logger("Sync Main Started");
this.syncHandler = this.cancelHandler(this.syncHandler);
Logger("Pull before replicate.");
Logger(await this.localDatabase.info(), LOG_LEVEL.VERBOSE);
Logger(await db.info(), LOG_LEVEL.VERBOSE);
const replicate = this.localDatabase.replicate.from(db, syncOptionBase);
this.syncHandler = this.localDatabase.sync<EntryDoc>(db, syncOption);
this.syncHandler
.on("active", () => {
this.syncStatus = "CONNECTED";
this.updateInfo();
Logger("Replication activated");
})
.on("change", async (e) => {
try {
if (e.direction == "pull") {
// console.log(`pulled data:${e.change.docs.map((e) => e._id).join(",")}`);
await callback(e.change.docs);
Logger(`replicated ${e.change.docs_read} doc(s)`);
this.docArrived += e.change.docs.length;
} else {
// console.log(`put data:${e.change.docs.map((e) => e._id).join(",")}`);
this.docSent += e.change.docs.length;
}
if (notice != null) {
notice.setMessage(`${this.docSent - docSentOnStart}${this.docArrived - docArrivedOnStart}`);
}
this.updateInfo();
} catch (ex) {
Logger("Replication callback error");
Logger(ex);
}
})
.on("complete", (e) => {
this.syncStatus = "COMPLETED";
this.updateInfo();
Logger("Replication completed", showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO);
if (notice != null) notice.hide();
if (!keepAlive) {
this.syncHandler = this.cancelHandler(this.syncHandler);
// if keep alive runnning, resolve here,
}
})
.on("denied", (e) => {
this.syncStatus = "ERRORED";
this.updateInfo();
this.syncHandler = this.cancelHandler(this.syncHandler);
if (notice != null) notice.hide();
Logger("Replication denied", LOG_LEVEL.NOTICE);
Logger(e);
})
.on("error", (e) => {
this.syncStatus = "ERRORED";
this.syncHandler = this.cancelHandler(this.syncHandler);
this.updateInfo();
if (notice != null) notice.hide();
Logger("Replication error", LOG_LEVEL.NOTICE);
Logger(e);
})
.on("paused", (e) => {
this.syncStatus = "PAUSED";
this.updateInfo();
if (notice != null) notice.hide();
Logger("replication paused", LOG_LEVEL.VERBOSE);
if (keepAlive && !resolved) {
// if keep alive runnning, resolve here,
resolved = true;
}
// Logger(e);
});
return this.syncHandler;
};
if (!keepAlive) {
await _openReplicationSync();
return true;
}
this.syncHandler = this.cancelHandler(this.syncHandler);
Logger("Pull before replicate.");
Logger(await this.localDatabase.info(), LOG_LEVEL.VERBOSE);
Logger(await db.info(), LOG_LEVEL.VERBOSE);
let replicate: PouchDB.Replication.Replication<EntryDoc>;
try {
replicate = this.localDatabase.replicate.from(db, syncOptionBase);
replicate
.on("active", () => {
this.syncStatus = "CONNECTED";
@@ -979,36 +981,24 @@ export class LocalPouchDB {
Logger("Replication callback error");
Logger(ex);
}
})
.on("complete", async (info) => {
this.syncStatus = "COMPLETED";
this.updateInfo();
this.cancelHandler(replicate);
this.syncHandler = this.cancelHandler(this.syncHandler);
Logger("Replication pull completed.");
await _openReplicationSync();
})
.on("denied", (e) => {
this.syncStatus = "ERRORED";
this.updateInfo();
if (notice != null) notice.hide();
Logger("Pulling Replication denied", LOG_LEVEL.NOTICE);
this.cancelHandler(replicate);
this.syncHandler = this.cancelHandler(this.syncHandler);
rej(e);
})
.on("error", (e) => {
this.syncStatus = "ERRORED";
this.updateInfo();
Logger("Pulling Replication error", LOG_LEVEL.INFO);
this.cancelHandler(replicate);
this.syncHandler = this.cancelHandler(this.syncHandler);
if (notice != null) notice.hide();
// debugger;
Logger(e);
rej(e);
});
});
this.syncStatus = "COMPLETED";
this.updateInfo();
this.cancelHandler(replicate);
this.syncHandler = this.cancelHandler(this.syncHandler);
Logger("Replication pull completed.");
_openReplicationSync();
return true;
} catch (ex) {
this.syncStatus = "ERRORED";
this.updateInfo();
Logger("Pulling Replication error", LOG_LEVEL.NOTICE);
this.cancelHandler(replicate);
this.syncHandler = this.cancelHandler(this.syncHandler);
if (notice != null) notice.hide();
// debugger;
throw ex;
}
}
closeReplication() {

View File

@@ -1,4 +1,4 @@
import { App, Notice, PluginSettingTab, Setting } from "obsidian";
import { App, Notice, PluginSettingTab, Setting, sanitizeHTMLToDom } from "obsidian";
import { EntryDoc, LOG_LEVEL } from "./types";
import { escapeStringToHTML, versionNumberString2Number, path2id, id2path, runWithLock } from "./utils";
import { Logger } from "./logger";
@@ -82,6 +82,8 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
if (this.plugin.settings.syncOnFileOpen) return true;
if (this.plugin.settings.syncOnSave) return true;
if (this.plugin.settings.syncOnStart) return true;
if (this.plugin.localDatabase.syncStatus == "CONNECTED") return true;
if (this.plugin.localDatabase.syncStatus == "PAUSED") return true;
return false;
};
const applyDisplayEnabled = () => {
@@ -304,6 +306,44 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
})
);
containerLocalDatabaseEl.createEl("div", {
text: sanitizeHTMLToDom(`Advanced settings<br>
Configuration of how LiveSync makes chunks from the file.`),
});
new Setting(containerLocalDatabaseEl)
.setName("Minimum chunk size")
.setDesc("(letters), minimum chunk size.")
.addText((text) => {
text.setPlaceholder("")
.setValue(this.plugin.settings.minimumChunkSize + "")
.onChange(async (value) => {
let v = Number(value);
if (isNaN(v) || v < 10 || v > 1000) {
v = 10;
}
this.plugin.settings.minimumChunkSize = v;
await this.plugin.saveSettings();
});
text.inputEl.setAttribute("type", "number");
});
new Setting(containerLocalDatabaseEl)
.setName("LongLine Threshold")
.setDesc("(letters), If the line is longer than this, make the line to chunk")
.addText((text) => {
text.setPlaceholder("")
.setValue(this.plugin.settings.longLineThreshold + "")
.onChange(async (value) => {
let v = Number(value);
if (isNaN(v) || v < 10 || v > 1000) {
v = 10;
}
this.plugin.settings.longLineThreshold = v;
await this.plugin.saveSettings();
});
text.inputEl.setAttribute("type", "number");
});
addScreenElement("10", containerLocalDatabaseEl);
const containerGeneralSettingsEl = containerEl.createDiv();
containerGeneralSettingsEl.createEl("h3", { text: "General Settings" });
@@ -458,35 +498,40 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
await this.plugin.saveSettings();
})
);
containerSyncSettingEl.createEl("div", {
text: sanitizeHTMLToDom(`Advanced settings<br>
If you reached the payload size limit when using IBM Cloudant, please set batch size and batch limit to a lower value.`),
});
new Setting(containerSyncSettingEl)
.setName("Minimum chunk size")
.setDesc("(letters), minimum chunk size.")
.setName("Batch size")
.setDesc("Number of change feed items to process at a time. Defaults to 250.")
.addText((text) => {
text.setPlaceholder("")
.setValue(this.plugin.settings.minimumChunkSize + "")
.setValue(this.plugin.settings.batch_size + "")
.onChange(async (value) => {
let v = Number(value);
if (isNaN(v) || v < 10 || v > 1000) {
if (isNaN(v) || v < 10) {
v = 10;
}
this.plugin.settings.minimumChunkSize = v;
this.plugin.settings.batch_size = v;
await this.plugin.saveSettings();
});
text.inputEl.setAttribute("type", "number");
});
new Setting(containerSyncSettingEl)
.setName("LongLine Threshold")
.setDesc("(letters), If the line is longer than this, make the line to chunk")
.setName("Batch limit")
.setDesc("Number of batches to process at a time. Defaults to 40. This along with batch size controls how many docs are kept in memory at a time.")
.addText((text) => {
text.setPlaceholder("")
.setValue(this.plugin.settings.longLineThreshold + "")
.setValue(this.plugin.settings.batches_limit + "")
.onChange(async (value) => {
let v = Number(value);
if (isNaN(v) || v < 10 || v > 1000) {
if (isNaN(v) || v < 10) {
v = 10;
}
this.plugin.settings.longLineThreshold = v;
this.plugin.settings.batches_limit = v;
await this.plugin.saveSettings();
});
text.inputEl.setAttribute("type", "number");
@@ -544,10 +589,10 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
Logger("Synchronization setting configured as LiveSync.", LOG_LEVEL.NOTICE);
} else if (currentPrest == "PERIODIC") {
this.plugin.settings.batchSave = true;
this.plugin.settings.periodicReplication = false;
this.plugin.settings.periodicReplication = true;
this.plugin.settings.syncOnSave = false;
this.plugin.settings.syncOnStart = false;
this.plugin.settings.syncOnFileOpen = false;
this.plugin.settings.syncOnStart = true;
this.plugin.settings.syncOnFileOpen = true;
Logger("Synchronization setting configured as Periodic sync with batch database update.", LOG_LEVEL.NOTICE);
} else {
Logger("All synchronization disabled.", LOG_LEVEL.NOTICE);

View File

@@ -56,6 +56,8 @@ export interface ObsidianLiveSyncSettings {
autoSweepPluginsPeriodic: boolean;
notifyPluginOrSettingUpdated: boolean;
checkIntegrityOnSave: boolean;
batch_size: number;
batches_limit: number;
}
export const DEFAULT_SETTINGS: ObsidianLiveSyncSettings = {
@@ -94,6 +96,8 @@ export const DEFAULT_SETTINGS: ObsidianLiveSyncSettings = {
autoSweepPluginsPeriodic: false,
notifyPluginOrSettingUpdated: false,
checkIntegrityOnSave: false,
batch_size: 250,
batches_limit: 40,
};
export const PERIODIC_PLUGIN_SWEEP = 60;