- Unexpected massive palallel running of file checking in boot sequence is solved.
- Batch file change is not  missing changes now.
- Ignore changes caused by the plug-ins themselves.
- Garbage collection is completely disabled.
- Fixed sometimes fails initial replication after dropping local DB.
Improved:
- a bit more understandable messages
- Save the file into the big chunk on initial scan.
- Use history is always enabled.
- Boot sequence got faster.
This commit is contained in:
vorotamoroz
2022-06-30 17:46:42 +09:00
parent 124a49b80f
commit 89de551fd7
6 changed files with 553 additions and 648 deletions

View File

@@ -2,6 +2,20 @@ NOTE: This document surely became outdated. I'll improve this doc in a while. bu
# Settings of this plugin
The settings dialog has been quite long, so I split each configuration into tabs.
If you feel something, please feel free to inform me.
| icon | description |
| :---: | ----------------------------------------------------------------- |
| 🛰️ | [Remote Database Configurations](#remote-database-configurations) |
| 📦 | [Local Database Configurations](#local-database-configurations) |
| ⚙️ | [General Settings](#general-settings) |
| 🔁 | [Sync setting](#sync-setting) |
| 🔧 | [Miscellaneous](#miscellaneous) |
| 🧰 | [Hatch](#miscellaneous) |
| 🔌 | [Plugin and its settings](#plugin-and-its-settings) |
| 🚑 | [Corrupted data](#corrupted-data) |
## Remote Database Configurations
Configure settings of synchronize server. If any synchronization is enabled, you can't edit this section. Please disable all synchronization to change.
@@ -21,12 +35,6 @@ The Database name to synchronize.
If not exist, created automatically.
### Use the old connecting method
This option has been removed at v0.10.0
### End to End Encryption
Encrypt your database. It affects only the database, your files are left as plain.
@@ -38,26 +46,16 @@ Note: If you want to use "Plugins and their settings", you have to enable this.
The passphrase to used as the key of encryption. Please use the long text.
### Apply
To enable End-to-End encryption, there must be no items of the same content encrypted with different passphrases to avoid attackers guessing passphrases. Self-hosted LiveSync uses crc32 of the chunks, It is really a must.
Set the End to End encryption enabled and its passphrase for use in replication.
If you change the passphrase with existen database, overwriting remote database is strongly recommended.
So, this plugin completely deletes everything from both local and remote databases before enabling it and then synchronizing again.
To enable, "Apply and send" from the most powerful device.
If you want to synchronize an existing database, set passphrase and press "Just apply".
### Overwrite by local DB
Overwrite the remote database by the local database using the passphrase you applied.
- Apply and send
1. Initialize the Local Database and set (or clear) passphrase, put all files into the database again.
2. Initialize the Remote Database.
3. Lock the Remote Database.
4. Send it all.
This process is simply heavy. Using a PC or Mac is preferred.
- Apply and receive
1. Initialize the Local Database and set (or clear) the passphrase.
2. Unlock the Remote Database.
3. Retrieve all and decrypt to file.
When running these operations, every synchronization settings is disabled.
### Rebuild
Rebuild remote and local databases with local files. It will delete all document history and retained chunks, and shrink the database.
### Test Database connection
You can check the connection by clicking this button.
@@ -65,6 +63,9 @@ You can check the connection by clicking this button.
### Check database configuration
You can check and modify your CouchDB's configuration from here directly.
### Lock remote database.
Other devices are banned from the database when you have locked the database.
If you have something troubled with other devices, you can protect the vault and remote database by your device.
## Local Database Configurations
"Local Database" is created inside your obsidian.
@@ -73,27 +74,17 @@ You can check and modify your CouchDB's configuration from here directly.
Delay database update until raise replication, open another file, window visibility changed, or file events except for file modification.
This option can not be used with LiveSync at the same time.
### Auto Garbage Collection delay
When the note has been modified, Self-hosted LiveSync splits the note into some chunks by parsing the markdown structure. And saving only file information and modified chunks into the Local Database. At this time, do not delete old chunks.
So, Self-hosted LiveSync has to delete old chunks somewhen.
### Garbage check
This plugin saves the file by splitting it into chunks to speed replication up and keep low bandwidth.
However, the chunk is represented as the crc32 of their contents and shared between all notes. In this way, Self-hosted LiveSync dedupes the entries and keeps low bandwidth and low transfer amounts.
They share the chunk if you use the same paragraph in some notes. And if you change the file, only the paragraph you changed is transferred with metadata of the file. And I know that editing notes are not so straight. Sometimes paragraphs will be back into an old phrase. In these cases, we do not have to transfer the chunk again if the chunk will not be deleted. So all chunks will be reused.
In addition to this, when we edit notes, sometimes back to the previous expression. So It cannot be said that it will be unnecessary immediately.
As the side effect of this, you can see history the file.
Therefore, the plugin deletes unused chunks at once when you leave Obsidian for a while (after this setting seconds).
The check will show the number of chunks used or retained. If there are so many retained chunks, you can rebuild the database.
This process is called "Garbage Collection"
As a result, Obsidian's behavior is temporarily slowed down.
Default is 300 seconds.
If you are an early adopter, maybe this value is left as 30 seconds. Please change this value to larger values.
Note: If you want to use "Use history", this vault must be set to 0.
### Manual Garbage Collect
Run "Garbage Collection" manually.
### Fetch rebuilt DB.
If one device rebuilds or locks the remote database, every other device will be locked out from the remote database until it fetches rebuilt DB.
### minimum chunk size and LongLine threshold
The configuration of chunk splitting.
@@ -212,28 +203,6 @@ You can set synchronization method at once as these pattern:
- Sync on File Open : disabled
- Sync on Start : disabled
### Use history
If you enable this option, you can keep document histories in your database.
(Not all intermediate changes are synchronized.)
You can check the changes caused by your edit and/or replication.
### Enable plugin synchronization
If you want to use this feature, you have to activate this feature by this switch.
### Sweep plugins automatically
Plugin sweep will run before replication automatically.
### Sweep plugins periodically
Plugin sweep will run each 1 minute.
### Notify updates
When replication is complete, a message will be notified if a newer version of the plugin applied to this device is configured on another device.
### Device and Vault name
To save the plugins, you have to set a unique name every each device.
### Open
Open the "Plugins and their settings" dialog.
## Hatch
From here, everything is under the hood. Please handle it with care.
@@ -280,11 +249,6 @@ Same as a setting passphrase, database locking is also performed.
3. Retrieve all and decrypt to file.
### Lock remote database
Lock the remote database to ban out other devices for synchronization. It is the same as the database lock that happened in dropping databases or applying passphrases.
Use it as an emergency evacuation method to protect local or remote databases when synchronization has been broken.
### Suspend file watching
If enable this option, Self-hosted LiveSync dismisses every file change or deletes the event.
@@ -301,9 +265,27 @@ Discard the data stored in the local database.
### Initialize local database again
Discard the data stored in the local database and initialize and create the database from the files on storage.
### Corrupted data
## Plugins and settings (beta)
### Enable plugin synchronization
If you want to use this feature, you have to activate this feature by this switch.
### Sweep plugins automatically
Plugin sweep will run before replication automatically.
### Sweep plugins periodically
Plugin sweep will run each 1 minute.
### Notify updates
When replication is complete, a message will be notified if a newer version of the plugin applied to this device is configured on another device.
### Device and Vault name
To save the plugins, you have to set a unique name every each device.
### Open
Open the "Plugins and their settings" dialog.
### Corrupted or missing data
![CorruptedData](../images/corrupted_data.png)
When Self-hosted LiveSync could not write to the file on the storage, the files are shown here. If you have the old data in your vault, change it once, it will be cured. Or you can use the "File History" plugin.
But if you don't, sorry, we can't rescue the file, and error messages are shown frequently, and you have to delete the file from here.

View File

@@ -9,7 +9,7 @@ export interface KeyValueDatabase {
destroy(): void;
}
const databaseCache: { [key: string]: IDBPDatabase<any> } = {};
export const OpenKeyValueDatabase = (dbKey: string): KeyValueDatabase => {
export const OpenKeyValueDatabase = async (dbKey: string): Promise<KeyValueDatabase> => {
if (dbKey in databaseCache) {
databaseCache[dbKey].close();
delete databaseCache[dbKey];
@@ -20,30 +20,32 @@ export const OpenKeyValueDatabase = (dbKey: string): KeyValueDatabase => {
db.createObjectStore(storeKey);
},
});
~(async () => (databaseCache[dbKey] = await dbPromise))();
let db: IDBPDatabase<any> = null;
db = await dbPromise;
databaseCache[dbKey] = db;
return {
async get<T>(key: string): Promise<T> {
return (await dbPromise).get(storeKey, key);
get<T>(key: string): Promise<T> {
return db.get(storeKey, key);
},
async set<T>(key: string, value: T) {
return (await dbPromise).put(storeKey, value, key);
set<T>(key: string, value: T) {
return db.put(storeKey, value, key);
},
async del(key: string) {
return (await dbPromise).delete(storeKey, key);
del(key: string) {
return db.delete(storeKey, key);
},
async clear() {
return (await dbPromise).clear(storeKey);
clear() {
return db.clear(storeKey);
},
async keys(query?: IDBValidKey | IDBKeyRange, count?: number) {
return (await dbPromise).getAllKeys(storeKey, query, count);
keys(query?: IDBValidKey | IDBKeyRange, count?: number) {
return db.getAllKeys(storeKey, query, count);
},
async close() {
close() {
delete databaseCache[dbKey];
return (await dbPromise).close();
return db.close();
},
async destroy() {
delete databaseCache[dbKey];
(await dbPromise).close();
db.close();
await deleteDB(dbKey);
},
};

View File

@@ -19,59 +19,26 @@ import {
VER,
MILSTONE_DOCID,
DatabaseConnectingStatus,
ObsidianLiveSyncSettings,
ChunkVersionRange,
} from "./lib/src/types";
import { RemoteDBSettings } from "./lib/src/types";
import { resolveWithIgnoreKnownError, delay, runWithLock, NewNotice, WrappedNotice, shouldSplitAsPlainText, splitPieces2, enableEncryption } from "./lib/src/utils";
import { resolveWithIgnoreKnownError, runWithLock, shouldSplitAsPlainText, splitPieces2, enableEncryption } from "./lib/src/utils";
import { path2id } from "./utils";
import { Logger } from "./lib/src/logger";
import { checkRemoteVersion, connectRemoteCouchDBWithSetting, getLastPostFailedBySize } from "./utils_couchdb";
import { openDB, deleteDB, IDBPDatabase } from "idb";
import { KeyValueDatabase, OpenKeyValueDatabase } from "./KeyValueDB";
import { LRUCache } from "./lib/src/LRUCache";
// when replicated, LiveSync checks chunk versions that every node used.
// If all minumum version of every devices were up, that means we can convert database automatically.
const currentVersionRange: ChunkVersionRange = {
min: 0,
max: 1,
current: 1,
}
type ReplicationCallback = (e: PouchDB.Core.ExistingDocument<EntryDoc>[]) => Promise<void>;
class LRUCache {
cache = new Map<string, string>([]);
revCache = new Map<string, string>([]);
maxCache = 100;
constructor() { }
get(key: string) {
// debugger
const v = this.cache.get(key);
if (v) {
// update the key to recently used.
this.cache.delete(key);
this.revCache.delete(v);
this.cache.set(key, v);
this.revCache.set(v, key);
}
return v;
}
revGet(value: string) {
// debugger
const key = this.revCache.get(value);
if (value) {
// update the key to recently used.
this.cache.delete(key);
this.revCache.delete(value);
this.cache.set(key, value);
this.revCache.set(value, key);
}
return key;
}
set(key: string, value: string) {
this.cache.set(key, value);
this.revCache.set(value, key);
if (this.cache.size > this.maxCache) {
for (const kv of this.cache) {
this.revCache.delete(kv[1]);
this.cache.delete(kv[0]);
if (this.cache.size <= this.maxCache) break;
}
}
}
}
export class LocalPouchDB {
auth: Credential;
dbname: string;
@@ -81,16 +48,9 @@ export class LocalPouchDB {
nodeid = "";
isReady = false;
recentModifiedDocs: string[] = [];
h32: (input: string, seed?: number) => string;
h64: (input: string, seedHigh?: number, seedLow?: number) => string;
h32Raw: (input: Uint8Array, seed?: number) => number;
// hashCache: {
// [key: string]: string;
// } = {};
// hashCacheRev: {
// [key: string]: string;
// } = {};
hashCaches = new LRUCache();
corruptedEntries: { [key: string]: EntryDoc } = {};
@@ -109,6 +69,8 @@ export class LocalPouchDB {
isMobile = false;
chunkVersion = 0;
cancelHandler<T extends PouchDB.Core.Changes<EntryDoc> | PouchDB.Replication.Sync<EntryDoc> | PouchDB.Replication.Replication<EntryDoc>>(handler: T): T {
if (handler != null) {
handler.removeAllListeners();
@@ -119,7 +81,6 @@ export class LocalPouchDB {
}
onunload() {
this.kvDB.close();
this.recentModifiedDocs = [];
this.leafArrivedCallbacks;
this.changeHandler = this.cancelHandler(this.changeHandler);
this.syncHandler = this.cancelHandler(this.syncHandler);
@@ -172,7 +133,7 @@ export class LocalPouchDB {
revs_limit: 100,
deterministic_revs: true,
});
this.kvDB = OpenKeyValueDatabase(this.dbname + "-livesync-kv");
this.kvDB = await OpenKeyValueDatabase(this.dbname + "-livesync-kv");
Logger("Database info", LOG_LEVEL.VERBOSE);
Logger(await this.localDatabase.info(), LOG_LEVEL.VERBOSE);
Logger("Open Database...");
@@ -227,9 +188,8 @@ export class LocalPouchDB {
return nextSeq();
}
//
const progress = NewNotice("Converting..", 0);
Logger("We have to upgrade database..", LOG_LEVEL.NOTICE, "conv");
try {
Logger("We have to upgrade database..", LOG_LEVEL.NOTICE);
// To debug , uncomment below.
@@ -249,14 +209,12 @@ export class LocalPouchDB {
}
const rep = old.replicate.to(this.localDatabase, { batch_size: 25, batches_limit: 10 });
rep.on("change", (e) => {
progress.setMessage(`Converting ${e.docs_written} docs...`);
Logger(`Converting ${e.docs_written} docs...`, LOG_LEVEL.VERBOSE);
Logger(`Converting ${e.docs_written} docs...`, LOG_LEVEL.NOTICE, "conv");
});
const w = await rep;
progress.hide();
if (w.ok) {
Logger("Conversion completed!", LOG_LEVEL.NOTICE);
Logger("Conversion completed!", LOG_LEVEL.NOTICE, "conv");
old.destroy(); // delete the old database.
this.isReady = true;
return await nextSeq();
@@ -264,8 +222,7 @@ export class LocalPouchDB {
throw new Error("Conversion failed!");
}
} catch (ex) {
progress.hide();
Logger("Conversion failed!, If you are fully synchronized, please drop the old database in the Hatch pane in setting dialog. or please make an issue on Github.", LOG_LEVEL.NOTICE);
Logger("Conversion failed!, If you are fully synchronized, please drop the old database in the Hatch pane in setting dialog. or please make an issue on Github.", LOG_LEVEL.NOTICE, "conv");
Logger(ex);
this.isReady = false;
return false;
@@ -309,7 +266,6 @@ export class LocalPouchDB {
}
async getDBLeaf(id: string, waitForReady: boolean): Promise<string> {
await this.waitForGCComplete();
// when in cache, use that.
const leaf = this.hashCaches.revGet(id);
if (leaf) {
@@ -342,7 +298,6 @@ export class LocalPouchDB {
}
async getDBEntryMeta(path: string, opt?: PouchDB.Core.GetOptions): Promise<false | LoadedEntry> {
await this.waitForGCComplete();
const id = path2id(path);
try {
let obj: EntryDocResponse = null;
@@ -387,7 +342,6 @@ export class LocalPouchDB {
return false;
}
async getDBEntry(path: string, opt?: PouchDB.Core.GetOptions, dump = false, waitForReady = true): Promise<false | LoadedEntry> {
await this.waitForGCComplete();
const id = path2id(path);
try {
let obj: EntryDocResponse = null;
@@ -487,7 +441,6 @@ export class LocalPouchDB {
return false;
}
async deleteDBEntry(path: string, opt?: PouchDB.Core.GetOptions): Promise<boolean> {
await this.waitForGCComplete();
const id = path2id(path);
try {
@@ -534,7 +487,6 @@ export class LocalPouchDB {
}
}
async deleteDBEntryPrefix(prefixSrc: string): Promise<boolean> {
await this.waitForGCComplete();
// delete database entries by prefix.
// it called from folder deletion.
let c = 0;
@@ -585,8 +537,7 @@ export class LocalPouchDB {
Logger(`deleteDBEntryPrefix:deleted ${deleteCount} items, skipped ${notfound}`);
return true;
}
async putDBEntry(note: LoadedEntry) {
await this.waitForGCComplete();
async putDBEntry(note: LoadedEntry, saveAsBigChunk?: boolean) {
// let leftData = note.data;
const savenNotes = [];
let processed = 0;
@@ -596,7 +547,7 @@ export class LocalPouchDB {
let plainSplit = false;
let cacheUsed = 0;
const userpasswordHash = this.h32Raw(new TextEncoder().encode(this.settings.passphrase));
if (shouldSplitAsPlainText(note._id)) {
if (!saveAsBigChunk && shouldSplitAsPlainText(note._id)) {
pieceSize = MAX_DOC_SIZE;
plainSplit = true;
}
@@ -702,9 +653,6 @@ export class LocalPouchDB {
}
}
}
if (saved) {
Logger(`Chunk saved:${newLeafs.length} chunks`);
}
} catch (ex) {
Logger("Chunk save failed:", LOG_LEVEL.NOTICE);
Logger(ex, LOG_LEVEL.NOTICE);
@@ -712,7 +660,7 @@ export class LocalPouchDB {
}
}
if (saved) {
Logger(`note content saven, pieces:${processed} new:${made}, skip:${skiped}, cache:${cacheUsed}`);
Logger(`Content saved:${note._id} ,pieces:${processed} (new:${made}, skip:${skiped}, cache:${cacheUsed})`);
const newDoc: PlainEntry | NewEntry = {
NewNote: true,
children: savenNotes,
@@ -766,8 +714,7 @@ export class LocalPouchDB {
return true;
}
replicateAllToServer(setting: RemoteDBSettings, showingNotice?: boolean) {
return new Promise(async (res, rej) => {
await this.waitForGCComplete();
return new Promise((res, rej) => {
this.openOneshotReplication(
setting,
showingNotice,
@@ -777,8 +724,7 @@ export class LocalPouchDB {
if (e === true) res(e);
rej(e);
},
true,
false
"pushOnly"
);
});
}
@@ -789,9 +735,8 @@ export class LocalPouchDB {
return false;
}
await this.waitForGCComplete();
if (setting.versionUpFlash != "") {
NewNotice("Open settings and check message, please.");
Logger("Open settings and check message, please.", LOG_LEVEL.NOTICE);
return false;
}
const uri = setting.couchDB_URI + (setting.couchDB_DBNAME == "" ? "" : "/" + setting.couchDB_DBNAME);
@@ -818,19 +763,38 @@ export class LocalPouchDB {
created: (new Date() as any) / 1,
locked: false,
accepted_nodes: [this.nodeid],
node_chunk_info: { [this.nodeid]: currentVersionRange }
};
const remoteMilestone: EntryMilestoneInfo = await resolveWithIgnoreKnownError(dbret.db.get(MILSTONE_DOCID), defMilestonePoint);
const remoteMilestone: EntryMilestoneInfo = { ...defMilestonePoint, ...(await resolveWithIgnoreKnownError(dbret.db.get(MILSTONE_DOCID), defMilestonePoint)) };
remoteMilestone.node_chunk_info = { ...defMilestonePoint.node_chunk_info, ...remoteMilestone.node_chunk_info };
this.remoteLocked = remoteMilestone.locked;
this.remoteLockedAndDeviceNotAccepted = remoteMilestone.locked && remoteMilestone.accepted_nodes.indexOf(this.nodeid) == -1;
const writeMilestone = ((remoteMilestone.node_chunk_info[this.nodeid].min != currentVersionRange.min || remoteMilestone.node_chunk_info[this.nodeid].max != currentVersionRange.max)
|| typeof remoteMilestone._rev == "undefined");
if (writeMilestone) {
await dbret.db.put(remoteMilestone);
}
let globalMin = currentVersionRange.min;
let globalMax = currentVersionRange.max;
for (const nodeid of remoteMilestone.accepted_nodes) {
if (nodeid in remoteMilestone.node_chunk_info) {
const nodeinfo = remoteMilestone.node_chunk_info[nodeid];
globalMin = Math.max(nodeinfo.min, globalMin);
globalMax = Math.min(nodeinfo.max, globalMax);
} else {
globalMin = 0;
globalMax = 0;
}
}
//If globalMin and globalMax is suitable, we can upgrade.
if (remoteMilestone.locked && remoteMilestone.accepted_nodes.indexOf(this.nodeid) == -1) {
Logger("Remote database marked as 'Auto Sync Locked'. And this devide does not marked as resolved device. see settings dialog.", LOG_LEVEL.NOTICE);
Logger("The remote database has been rebuilt or corrupted since we have synchronized last time. Fetch rebuilt DB or explicit unlocking is required. See the settings dialog.", LOG_LEVEL.NOTICE);
return false;
}
if (typeof remoteMilestone._rev == "undefined") {
await dbret.db.put(remoteMilestone);
}
}
const syncOptionBase: PouchDB.Replication.SyncOptions = {
batches_limit: setting.batches_limit,
@@ -845,59 +809,54 @@ export class LocalPouchDB {
if (keepAlive) {
this.openContinuousReplication(setting, showResult, callback, false);
} else {
this.openOneshotReplication(setting, showResult, callback, false, null, false, false);
this.openOneshotReplication(setting, showResult, callback, false, null, "sync");
}
}
replicationActivated(notice: WrappedNotice) {
replicationActivated(showResult: boolean) {
this.syncStatus = "CONNECTED";
this.updateInfo();
Logger("Replication activated");
if (notice != null) notice.setMessage(`Activated..`);
Logger("Replication activated", showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO, "sync");
}
async replicationChangeDetected(e: PouchDB.Replication.SyncResult<EntryDoc>, notice: WrappedNotice, docSentOnStart: number, docArrivedOnStart: number, callback: ReplicationCallback) {
async replicationChangeDetected(e: PouchDB.Replication.SyncResult<EntryDoc>, showResult: boolean, docSentOnStart: number, docArrivedOnStart: number, callback: ReplicationCallback) {
try {
if (e.direction == "pull") {
await callback(e.change.docs);
Logger(`replicated ${e.change.docs_read} doc(s)`);
this.docArrived += e.change.docs.length;
} else {
this.docSent += e.change.docs.length;
}
if (notice != null) {
notice.setMessage(`${this.docSent - docSentOnStart}${this.docArrived - docArrivedOnStart}`);
if (showResult) {
Logger(`${this.docSent - docSentOnStart}${this.docArrived - docArrivedOnStart}`, LOG_LEVEL.NOTICE, "sync");
}
this.updateInfo();
} catch (ex) {
Logger("Replication callback error", LOG_LEVEL.NOTICE);
Logger("Replication callback error", LOG_LEVEL.NOTICE, "sync");
Logger(ex, LOG_LEVEL.NOTICE);
//
}
}
replicationCompleted(notice: WrappedNotice, showResult: boolean) {
replicationCompleted(showResult: boolean) {
this.syncStatus = "COMPLETED";
this.updateInfo();
Logger("Replication completed", showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO);
if (notice != null) notice.hide();
Logger("Replication completed", showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO, showResult ? "sync" : "");
this.syncHandler = this.cancelHandler(this.syncHandler);
}
replicationDeniend(notice: WrappedNotice, e: any) {
replicationDeniend(e: any) {
this.syncStatus = "ERRORED";
this.updateInfo();
this.syncHandler = this.cancelHandler(this.syncHandler);
if (notice != null) notice.hide();
Logger("Replication denied", LOG_LEVEL.NOTICE);
Logger("Replication denied", LOG_LEVEL.NOTICE, "sync");
Logger(e);
}
replicationErrored(notice: WrappedNotice, e: any) {
replicationErrored(e: any) {
this.syncStatus = "ERRORED";
this.syncHandler = this.cancelHandler(this.syncHandler);
this.updateInfo();
}
replicationPaused(notice: WrappedNotice) {
replicationPaused() {
this.syncStatus = "PAUSED";
this.updateInfo();
if (notice != null) notice.hide();
Logger("replication paused", LOG_LEVEL.VERBOSE);
Logger("replication paused", LOG_LEVEL.VERBOSE, "sync");
}
async openOneshotReplication(
@@ -906,23 +865,21 @@ export class LocalPouchDB {
callback: (e: PouchDB.Core.ExistingDocument<EntryDoc>[]) => Promise<void>,
retrying: boolean,
callbackDone: (e: boolean | any) => void,
pushOnly: boolean,
pullOnly: boolean
syncmode: "sync" | "pullOnly" | "pushOnly"
): Promise<boolean> {
if (this.syncHandler != null) {
Logger("Replication is already in progress.", showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO);
Logger("Replication is already in progress.", showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO, "sync");
return;
}
Logger("Oneshot Sync begin...");
Logger(`Oneshot Sync begin... (${syncmode})`);
let thisCallback = callbackDone;
const ret = await this.checkReplicationConnectivity(setting, true, retrying, showResult);
let notice: WrappedNotice = null;
if (ret === false) {
Logger("Could not connect to server.", showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO);
Logger("Could not connect to server.", showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO, "sync");
return;
}
if (showResult) {
notice = NewNotice("Looking for the point last synchronized point.", 0);
Logger("Looking for the point last synchronized point.", LOG_LEVEL.NOTICE, "sync");
}
const { db, syncOptionBase } = ret;
this.syncStatus = "STARTED";
@@ -934,52 +891,61 @@ export class LocalPouchDB {
this.originalSetting = setting;
}
this.syncHandler = this.cancelHandler(this.syncHandler);
if (!pushOnly && !pullOnly) {
if (syncmode == "sync") {
this.syncHandler = this.localDatabase.sync(db, { checkpoint: "target", ...syncOptionBase });
this.syncHandler
.on("change", async (e) => {
await this.replicationChangeDetected(e, notice, docSentOnStart, docArrivedOnStart, callback);
await this.replicationChangeDetected(e, showResult, docSentOnStart, docArrivedOnStart, callback);
if (retrying) {
if (this.docSent - docSentOnStart + (this.docArrived - docArrivedOnStart) > this.originalSetting.batch_size * 2) {
// restore configration.
Logger("Back into original settings once.");
if (notice != null) notice.hide();
this.syncHandler = this.cancelHandler(this.syncHandler);
this.openOneshotReplication(this.originalSetting, showResult, callback, false, callbackDone, pushOnly, pullOnly);
this.openOneshotReplication(this.originalSetting, showResult, callback, false, callbackDone, syncmode);
}
}
})
.on("complete", (e) => {
this.replicationCompleted(notice, showResult);
this.replicationCompleted(showResult);
if (thisCallback != null) {
thisCallback(true);
}
});
} else if (pullOnly) {
this.syncHandler = this.localDatabase.replicate.to(db, { checkpoint: "target", ...syncOptionBase });
} else if (syncmode == "pullOnly") {
this.syncHandler = this.localDatabase.replicate.from(db, { checkpoint: "target", ...syncOptionBase });
this.syncHandler
.on("change", async (e) => {
await this.replicationChangeDetected({ direction: "pull", change: e }, notice, docSentOnStart, docArrivedOnStart, callback);
await this.replicationChangeDetected({ direction: "pull", change: e }, showResult, docSentOnStart, docArrivedOnStart, callback);
if (retrying) {
if (this.docSent - docSentOnStart + (this.docArrived - docArrivedOnStart) > this.originalSetting.batch_size * 2) {
// restore configration.
Logger("Back into original settings once.");
if (notice != null) notice.hide();
this.syncHandler = this.cancelHandler(this.syncHandler);
this.openOneshotReplication(this.originalSetting, showResult, callback, false, callbackDone, pushOnly, pullOnly);
this.openOneshotReplication(this.originalSetting, showResult, callback, false, callbackDone, syncmode);
}
}
})
.on("complete", (e) => {
this.replicationCompleted(notice, showResult);
this.replicationCompleted(showResult);
if (thisCallback != null) {
thisCallback(true);
}
});
} else if (pushOnly) {
} else if (syncmode == "pushOnly") {
this.syncHandler = this.localDatabase.replicate.to(db, { checkpoint: "target", ...syncOptionBase });
this.syncHandler.on("change", async (e) => {
await this.replicationChangeDetected({ direction: "push", change: e }, showResult, docSentOnStart, docArrivedOnStart, callback);
if (retrying) {
if (this.docSent - docSentOnStart + (this.docArrived - docArrivedOnStart) > this.originalSetting.batch_size * 2) {
// restore configration.
Logger("Back into original settings once.");
this.syncHandler = this.cancelHandler(this.syncHandler);
this.openOneshotReplication(this.originalSetting, showResult, callback, false, callbackDone, syncmode);
}
}
})
this.syncHandler.on("complete", (e) => {
this.replicationCompleted(notice, showResult);
this.replicationCompleted(showResult);
if (thisCallback != null) {
thisCallback(true);
}
@@ -987,17 +953,16 @@ export class LocalPouchDB {
}
this.syncHandler
.on("active", () => this.replicationActivated(notice))
.on("active", () => this.replicationActivated(showResult))
.on("denied", (e) => {
this.replicationDeniend(notice, e);
this.replicationDeniend(e);
if (thisCallback != null) {
thisCallback(e);
}
})
.on("error", (e) => {
this.replicationErrored(notice, e);
Logger("Replication stopped.", showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO);
if (notice != null) notice.hide();
this.replicationErrored(e);
Logger("Replication stopped.", showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO, "sync");
if (getLastPostFailedBySize()) {
// Duplicate settings for smaller batch.
const xsetting: RemoteDBSettings = JSON.parse(JSON.stringify(setting));
@@ -1008,17 +973,19 @@ export class LocalPouchDB {
} else {
Logger(`Retry with lower batch size:${xsetting.batch_size}/${xsetting.batches_limit}`, showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO);
thisCallback = null;
this.openOneshotReplication(xsetting, showResult, callback, true, callbackDone, pushOnly, pullOnly);
this.openOneshotReplication(xsetting, showResult, callback, true, callbackDone, syncmode);
}
} else {
Logger("Replication error", LOG_LEVEL.NOTICE);
Logger("Replication error", LOG_LEVEL.NOTICE, "sync");
Logger(e);
}
if (thisCallback != null) {
thisCallback(e);
}
})
.on("paused", (e) => this.replicationPaused(notice));
.on("paused", (e) => this.replicationPaused());
await this.syncHandler;
}
openContinuousReplication(setting: RemoteDBSettings, showResult: boolean, callback: (e: PouchDB.Core.ExistingDocument<EntryDoc>[]) => Promise<void>, retrying: boolean) {
@@ -1035,13 +1002,12 @@ export class LocalPouchDB {
async () => {
Logger("LiveSync begin...");
const ret = await this.checkReplicationConnectivity(setting, true, true, showResult);
let notice: WrappedNotice = null;
if (ret === false) {
Logger("Could not connect to server.", showResult ? LOG_LEVEL.NOTICE : LOG_LEVEL.INFO);
return;
}
if (showResult) {
notice = NewNotice("Looking for the point last synchronized point.", 0);
Logger("Looking for the point last synchronized point.", LOG_LEVEL.NOTICE, "sync");
}
const { db, syncOption } = ret;
this.syncStatus = "STARTED";
@@ -1063,29 +1029,27 @@ export class LocalPouchDB {
},
});
this.syncHandler
.on("active", () => this.replicationActivated(notice))
.on("active", () => this.replicationActivated(showResult))
.on("change", async (e) => {
await this.replicationChangeDetected(e, notice, docSentOnStart, docArrivedOnStart, callback);
await this.replicationChangeDetected(e, showResult, docSentOnStart, docArrivedOnStart, callback);
if (retrying) {
if (this.docSent - docSentOnStart + (this.docArrived - docArrivedOnStart) > this.originalSetting.batch_size * 2) {
// restore sync values
Logger("Back into original settings once.");
if (notice != null) notice.hide();
this.syncHandler = this.cancelHandler(this.syncHandler);
this.openContinuousReplication(this.originalSetting, showResult, callback, false);
}
}
})
.on("complete", (e) => this.replicationCompleted(notice, showResult))
.on("denied", (e) => this.replicationDeniend(notice, e))
.on("complete", (e) => this.replicationCompleted(showResult))
.on("denied", (e) => this.replicationDeniend(e))
.on("error", (e) => {
this.replicationErrored(notice, e);
Logger("Replication stopped.", LOG_LEVEL.NOTICE);
this.replicationErrored(e);
Logger("Replication stopped.", LOG_LEVEL.NOTICE, "sync");
})
.on("paused", (e) => this.replicationPaused(notice));
.on("paused", (e) => this.replicationPaused());
},
false,
true
"pullOnly"
);
}
@@ -1102,15 +1066,14 @@ export class LocalPouchDB {
const oldDB = await this.isOldDatabaseExists();
if (oldDB) {
oldDB.destroy();
NewNotice("Deleted! Please re-launch obsidian.", LOG_LEVEL.NOTICE);
Logger("Deleted! Please re-launch obsidian.", LOG_LEVEL.NOTICE);
} else {
NewNotice("Old database is not exist.", LOG_LEVEL.NOTICE);
Logger("Old database is not exist.", LOG_LEVEL.NOTICE);
}
}
async resetDatabase() {
await this.waitForGCComplete();
this.changeHandler = this.cancelHandler(this.changeHandler);
await this.closeReplication();
this.closeReplication();
Logger("Database closed for reset Database.");
this.isReady = false;
await this.localDatabase.destroy();
@@ -1120,7 +1083,7 @@ export class LocalPouchDB {
Logger("Local Database Reset", LOG_LEVEL.NOTICE);
}
async tryResetRemoteDatabase(setting: RemoteDBSettings) {
await this.closeReplication();
this.closeReplication();
const con = await connectRemoteCouchDBWithSetting(setting, this.isMobile);
if (typeof con == "string") return;
try {
@@ -1133,7 +1096,7 @@ export class LocalPouchDB {
}
}
async tryCreateRemoteDatabase(setting: RemoteDBSettings) {
await this.closeReplication();
this.closeReplication();
const con2 = await connectRemoteCouchDBWithSetting(setting, this.isMobile);
if (typeof con2 === "string") return;
@@ -1157,9 +1120,11 @@ export class LocalPouchDB {
created: (new Date() as any) / 1,
locked: locked,
accepted_nodes: [this.nodeid],
node_chunk_info: { [this.nodeid]: currentVersionRange }
};
const remoteMilestone: EntryMilestoneInfo = await resolveWithIgnoreKnownError(dbret.db.get(MILSTONE_DOCID), defInitPoint);
const remoteMilestone: EntryMilestoneInfo = { ...defInitPoint, ...await resolveWithIgnoreKnownError(dbret.db.get(MILSTONE_DOCID), defInitPoint) };
remoteMilestone.node_chunk_info = { ...defInitPoint.node_chunk_info, ...remoteMilestone.node_chunk_info };
remoteMilestone.accepted_nodes = [this.nodeid];
remoteMilestone.locked = locked;
if (locked) {
@@ -1187,22 +1152,15 @@ export class LocalPouchDB {
created: (new Date() as any) / 1,
locked: false,
accepted_nodes: [this.nodeid],
node_chunk_info: { [this.nodeid]: currentVersionRange }
};
// check local database hash status and remote replicate hash status
const remoteMilestone: EntryMilestoneInfo = await resolveWithIgnoreKnownError(dbret.db.get(MILSTONE_DOCID), defInitPoint);
// remoteMilestone.locked = false;
const remoteMilestone: EntryMilestoneInfo = { ...defInitPoint, ...await resolveWithIgnoreKnownError(dbret.db.get(MILSTONE_DOCID), defInitPoint) };
remoteMilestone.node_chunk_info = { ...defInitPoint.node_chunk_info, ...remoteMilestone.node_chunk_info };
remoteMilestone.accepted_nodes = Array.from(new Set([...remoteMilestone.accepted_nodes, this.nodeid]));
// this.remoteLocked = false;
Logger("Mark this device as 'resolved'.", LOG_LEVEL.NOTICE);
await dbret.db.put(remoteMilestone);
}
gcRunning = false;
async waitForGCComplete() {
while (this.gcRunning) {
Logger("Waiting for Garbage Collection completed.");
await delay(1000);
}
}
async sanCheck(entry: EntryDoc): Promise<boolean> {
if (entry.type == "plain" || entry.type == "newnote") {
const children = entry.children;
@@ -1222,163 +1180,59 @@ export class LocalPouchDB {
return false;
}
async garbageCollect() {
if (this.settings.useHistory) {
Logger("GC skipped for using history", LOG_LEVEL.VERBOSE);
return;
}
if ((this.settings as ObsidianLiveSyncSettings).liveSync) {
Logger("GC skipped while live sync.", LOG_LEVEL.VERBOSE);
return;
}
// NOTE:Garbage collection could break old revisions.
await runWithLock("replicate", true, async () => {
if (this.gcRunning) return;
this.gcRunning = true;
let idbGC: IDBPDatabase<{ id: string }> = null;
const storeIDB = "gc";
const idbname = "idb-" + this.dbname + "-idb-gcx";
try {
const procAllDocs = async (getLeaf: boolean, startkey: string, endkey: string, callback: (idordoc: string[]) => Promise<void>) => {
let c = 0;
let readCount = 0;
do {
const result = await this.localDatabase.allDocs({ include_docs: false, skip: c, limit: 2000, conflicts: !getLeaf, startkey: startkey, endkey: endkey });
readCount = result.rows.length;
if (readCount > 0) {
await callback(result.rows.map((e) => e.id));
garbageCheck() {
Logger(`Checking garbages`, LOG_LEVEL.NOTICE, "gc");
let docNum = 0;
const chunks = new Map<string, Set<string>>();
this.localDatabase
.changes({
since: 0,
include_docs: true,
return_docs: false,
style: "all_docs",
// selector:
})
.on("change", (e) => {
if (e.id.startsWith("h:")) {
const chunk = e.id;
let c = chunks.get(chunk);
if (c == null) c = new Set<string>();
chunks.set(chunk, c);
} else if ("children" in e.doc) {
docNum++;
if (docNum % 100 == 0) Logger(`Processing ${docNum}`, LOG_LEVEL.NOTICE, "gc");
if (!e.deleted) {
for (const chunk of e.doc.children) {
let c = chunks.get(chunk);
if (c == null) c = new Set<string>();
c.add(e.id);
chunks.set(chunk, c);
}
c += readCount;
} while (readCount != 0);
};
// Delete working indexedDB once.
await deleteDB(idbname);
idbGC = await openDB(idbname, 1, {
upgrade(db) {
db.createObjectStore(storeIDB, { keyPath: "id" });
},
});
// Mark all chunks once.
await procAllDocs(true, "h:", "h_", async (docs) => {
Logger(`Chunks marked - :${docs.length}`);
const tx = idbGC.transaction(storeIDB, "readwrite");
const store = tx.objectStore(storeIDB);
for (const docId of docs) {
await store.put({ id: docId });
}
await tx.done;
});
Logger("All chunks are marked once");
const unmarkUsedByHashId = async (doc: EntryDoc) => {
if ("children" in doc) {
const tx = idbGC.transaction(storeIDB, "readwrite");
const store = tx.objectStore(storeIDB);
for (const hashId of doc.children) {
await store.delete(hashId);
}
await tx.done;
}
};
Logger("Processing existen docs");
let procDocs = 0;
await procAllDocs(false, null, null, async (doc) => {
const docIds = (doc as string[]).filter((e) => !e.startsWith("h:") && !e.startsWith("ps:"));
for (const docId of docIds) {
procDocs++;
if (procDocs % 25 == 0) Logger(`${procDocs} Processed`);
const docT = await this.localDatabase.get(docId, { revs_info: true });
if (docT._deleted) continue;
// Unmark about latest doc.
unmarkUsedByHashId(docT);
const revs = docT._revs_info;
// Unmark old revisions
for (const rev of revs) {
if (rev.status != "available") continue;
const docRev = await this.localDatabase.get(docId, { rev: rev.rev });
unmarkUsedByHashId(docRev);
if (docRev._conflicts) {
// Unmark the conflicted chunks of old revisions.
for (const cid of docRev._conflicts) {
const docConflict = await this.localDatabase.get<EntryDoc>(docId, { rev: cid });
unmarkUsedByHashId(docConflict);
}
}
}
// Unmark the conflicted chunk.
if (docT._conflicts) {
for (const cid of docT._conflicts) {
const docConflict = await this.localDatabase.get<EntryDoc>(docId, { rev: cid });
unmarkUsedByHashId(docConflict);
}
} else {
for (const chunk of e.doc.children) {
let c = chunks.get(chunk);
if (c == null) c = new Set<string>();
c.delete(e.id);
chunks.set(chunk, c);
}
}
});
// All marked chunks could be deleted.
Logger("Delete non-used chunks");
let dataLeft = false;
let chunkKeys: string[] = [];
let totalDelCount = 0;
do {
const tx = idbGC.transaction(storeIDB, "readonly");
const store = tx.objectStore(storeIDB);
let cursor = await store.openCursor();
if (cursor == null) break;
const maxconcurrentDocs = 10;
let delChunkCount = 0;
do {
// console.log(cursor.key, cursor.value);
if (cursor) {
chunkKeys.push(cursor.key as string);
delChunkCount++;
dataLeft = true;
} else {
dataLeft = false;
}
cursor = await cursor.continue();
} while (cursor && dataLeft && delChunkCount < maxconcurrentDocs);
// if (chunkKeys.length > 0) {
totalDelCount += delChunkCount;
const delDocResult = await this.localDatabase.allDocs({ keys: chunkKeys, include_docs: true });
const delDocs = delDocResult.rows.map((e) => ({ ...e.doc, _deleted: true }));
await this.localDatabase.bulkDocs(delDocs);
Logger(`deleted from pouchdb:${delDocs.length}`);
const tx2 = idbGC.transaction(storeIDB, "readwrite");
const store2 = tx2.objectStore(storeIDB);
for (const doc of chunkKeys) {
await store2.delete(doc);
}
Logger(`deleted from workspace:${chunkKeys.length}`);
await tx2.done;
// }
chunkKeys = [];
} while (dataLeft);
Logger(`Deleted ${totalDelCount} chunks`);
Logger("Teardown the database");
if (idbGC != null) {
idbGC.close();
idbGC = null;
}
await deleteDB(idbname);
this.gcRunning = false;
Logger("Done");
} catch (ex) {
Logger("Error on garbage collection");
Logger(ex);
} finally {
if (idbGC != null) {
idbGC.close();
})
.on("complete", (v) => {
// console.dir(chunks);
let alive = 0;
let nonref = 0;
for (const chunk of chunks) {
const items = chunk[1];
if (items.size == 0) {
nonref++;
} else {
alive++;
}
}
await deleteDB(idbname);
this.gcRunning = false;
}
});
Logger(`Garbage checking completed, documents:${docNum}. Used chunks:${alive}, Retained chunks:${nonref}. Retained chunks will be reused, but you can rebuild database if you feel there are too much.`, LOG_LEVEL.NOTICE, "gc");
});
return;
}
}

View File

@@ -1,7 +1,7 @@
import { App, PluginSettingTab, Setting, sanitizeHTMLToDom, RequestUrlParam, requestUrl } from "obsidian";
import { EntryDoc, LOG_LEVEL, RemoteDBSettings } from "./lib/src/types";
import { path2id, id2path } from "./utils";
import { NewNotice, runWithLock } from "./lib/src/utils";
import { delay, runWithLock } from "./lib/src/utils";
import { Logger } from "./lib/src/logger";
import { checkSyncInfo, connectRemoteCouchDBWithSetting } from "./utils_couchdb";
import { testCrypt } from "./lib/src/e2ee_v2";
@@ -15,15 +15,6 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
this.plugin = plugin;
}
async testConnection(): Promise<void> {
// const db = await connectRemoteCouchDB(
// this.plugin.settings.couchDB_URI + (this.plugin.settings.couchDB_DBNAME == "" ? "" : "/" + this.plugin.settings.couchDB_DBNAME),
// {
// username: this.plugin.settings.couchDB_USER,
// password: this.plugin.settings.couchDB_PASSWORD,
// },
// this.plugin.settings.disableRequestURI,
// this.plugin.settings.encrypt ? this.plugin.settings.passphrase : this.plugin.settings.encrypt
// );
const db = await connectRemoteCouchDBWithSetting(this.plugin.settings, this.plugin.localDatabase.isMobile);
if (typeof db === "string") {
this.plugin.addLog(`could not connect to ${this.plugin.settings.couchDB_URI} : ${this.plugin.settings.couchDB_DBNAME} \n(${db})`, LOG_LEVEL.NOTICE);
@@ -174,17 +165,6 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
})
)
// new Setting(containerRemoteDatabaseEl)
// .setDesc("This feature is locked in mobile")
// .setName("Use the old connecting method")
// .addToggle((toggle) => {
// toggle.setValue(this.plugin.settings.disableRequestURI).onChange(async (value) => {
// this.plugin.settings.disableRequestURI = value;
// await this.plugin.saveSettings();
// });
// toggle.setDisabled(this.plugin.isMobile);
// return toggle;
// })
);
new Setting(containerRemoteDatabaseEl)
.setName("End to End Encryption")
@@ -198,7 +178,7 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
);
const phasspharase = new Setting(containerRemoteDatabaseEl)
.setName("Passphrase")
.setDesc("Encrypting passphrase")
.setDesc("Encrypting passphrase. If you change the passphrase with existen database, overwriting remote database is strongly recommended.")
.addText((text) => {
text.setPlaceholder("")
.setValue(this.plugin.settings.workingPassphrase)
@@ -209,9 +189,6 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
text.inputEl.setAttribute("type", "password");
});
phasspharase.setDisabled(!this.plugin.settings.workingEncrypt);
containerRemoteDatabaseEl.createEl("div", {
text: "If you change the passphrase, rebuilding the remote database is required. Please press 'Apply and send'. Or, If you have configured it to connect to an existing database, click 'Just apply'.",
});
const checkWorkingPassphrase = async (): Promise<boolean> => {
const settingForCheck: RemoteDBSettings = {
...this.plugin.settings,
@@ -271,20 +248,10 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
};
new Setting(containerRemoteDatabaseEl)
.setName("Apply")
.setDesc("apply encryption settinngs, and re-initialize remote database")
.setDesc("Apply encryption settinngs")
.addButton((button) =>
button
.setButtonText("Apply and send")
.setWarning()
.setDisabled(false)
.setClass("sls-btn-left")
.onClick(async () => {
await applyEncryption(true);
})
)
.addButton((button) =>
button
.setButtonText("Just apply")
.setButtonText("Apply")
.setWarning()
.setDisabled(false)
.setClass("sls-btn-right")
@@ -293,6 +260,66 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
})
);
const rebuildDB = async (method: "localOnly" | "remoteOnly" | "rebuildBothByThisDevice") => {
this.plugin.settings.liveSync = false;
this.plugin.settings.periodicReplication = false;
this.plugin.settings.syncOnSave = false;
this.plugin.settings.syncOnStart = false;
this.plugin.settings.syncOnFileOpen = false;
await this.plugin.saveSettings();
applyDisplayEnabled();
await delay(2000);
if (method == "localOnly") {
await this.plugin.resetLocalDatabase();
await this.plugin.markRemoteResolved();
await this.plugin.replicate(true);
}
if (method == "remoteOnly") {
await this.plugin.markRemoteLocked();
await this.plugin.tryResetRemoteDatabase();
await this.plugin.markRemoteLocked();
await this.plugin.replicateAllToServer(true);
}
if (method == "rebuildBothByThisDevice") {
await this.plugin.resetLocalDatabase();
await this.plugin.initializeDatabase(true);
await this.plugin.markRemoteLocked();
await this.plugin.tryResetRemoteDatabase();
await this.plugin.markRemoteLocked();
await this.plugin.replicateAllToServer(true);
}
}
new Setting(containerRemoteDatabaseEl)
.setName("Overwrite by local DB")
.setDesc("Overwrite remote database with local DB and passphrase.")
.addButton((button) =>
button
.setButtonText("Send")
.setWarning()
.setDisabled(false)
.setClass("sls-btn-left")
.onClick(async () => {
await rebuildDB("remoteOnly");
})
)
new Setting(containerRemoteDatabaseEl)
.setName("Rebuild")
.setDesc("Rebuild local and remote database with local files.")
.addButton((button) =>
button
.setButtonText("Rebuild")
.setWarning()
.setDisabled(false)
.setClass("sls-btn-left")
.onClick(async () => {
await rebuildDB("rebuildBothByThisDevice");
})
)
new Setting(containerRemoteDatabaseEl)
.setName("Test Database Connection")
.setDesc("Open database connection. If the remote database is not found and you have the privilege to create a database, the database will be created.")
@@ -473,6 +500,18 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
text: "",
});
new Setting(containerRemoteDatabaseEl)
.setName("Lock remote database")
.setDesc("Lock remote database to prevent synchronization with other devices.")
.addButton((button) =>
button
.setButtonText("Lock")
.setDisabled(false)
.setWarning()
.onClick(async () => {
await this.plugin.markRemoteLocked();
})
);
addScreenElement("0", containerRemoteDatabaseEl);
const containerLocalDatabaseEl = containerEl.createDiv();
containerLocalDatabaseEl.createEl("h3", { text: "Local Database configuration" });
@@ -492,31 +531,29 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
})
);
new Setting(containerLocalDatabaseEl)
.setName("Auto Garbage Collection delay")
.setDesc("(seconds), if you set zero, you have to run manually.")
.addText((text) => {
text.setPlaceholder("")
.setValue(this.plugin.settings.gcDelay + "")
.onChange(async (value) => {
let v = Number(value);
if (isNaN(v) || v > 5000) {
v = 0;
}
this.plugin.settings.gcDelay = v;
await this.plugin.saveSettings();
});
text.inputEl.setAttribute("type", "number");
});
new Setting(containerLocalDatabaseEl).setName("Manual Garbage Collect").addButton((button) =>
new Setting(containerLocalDatabaseEl).setName("Garbage check").addButton((button) =>
button
.setButtonText("Collect now")
.setButtonText("Check now")
.setDisabled(false)
.onClick(async () => {
await this.plugin.garbageCollect();
await this.plugin.garbageCheck();
})
);
new Setting(containerLocalDatabaseEl)
.setName("Fetch rebuilt DB")
.setDesc("Restore or reconstruct local database from remote database.")
.addButton((button) =>
button
.setButtonText("Fetch")
.setWarning()
.setDisabled(false)
.setClass("sls-btn-left")
.onClick(async () => {
await rebuildDB("localOnly");
})
)
containerLocalDatabaseEl.createEl("div", {
text: sanitizeHTMLToDom(`Advanced settings<br>
Configuration of how LiveSync makes chunks from the file.`),
@@ -831,15 +868,6 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
})
);
new Setting(containerMiscellaneousEl)
.setName("Use history")
.setDesc("Use history dialog (Restart required, auto compaction would be disabled, and more storage will be consumed)")
.addToggle((toggle) =>
toggle.setValue(this.plugin.settings.useHistory).onChange(async (value) => {
this.plugin.settings.useHistory = value;
await this.plugin.saveSettings();
})
);
addScreenElement("40", containerMiscellaneousEl);
const containerHatchEl = containerEl.createDiv();
@@ -875,30 +903,10 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
}
const hatchWarn = containerHatchEl.createEl("div", { text: `To stop the bootup sequence for fixing problems on databases, you can put redflag.md on top of your vault (Rebooting obsidian is required).` });
hatchWarn.addClass("op-warn-info");
const dropHistory = async (sendToServer: boolean) => {
this.plugin.settings.liveSync = false;
this.plugin.settings.periodicReplication = false;
this.plugin.settings.syncOnSave = false;
this.plugin.settings.syncOnStart = false;
this.plugin.settings.syncOnFileOpen = false;
await this.plugin.saveSettings();
applyDisplayEnabled();
await this.plugin.resetLocalDatabase();
if (sendToServer) {
await this.plugin.initializeDatabase(true);
await this.plugin.markRemoteLocked();
await this.plugin.tryResetRemoteDatabase();
await this.plugin.markRemoteLocked();
await this.plugin.replicateAllToServer(true);
} else {
await this.plugin.markRemoteResolved();
await this.plugin.replicate(true);
}
};
new Setting(containerHatchEl)
.setName("Verify and repair all files")
.setDesc("Verify and repair all files and update database without dropping history")
.setDesc("Verify and repair all files and update database without restoring")
.addButton((button) =>
button
.setButtonText("Verify and repair")
@@ -906,13 +914,13 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
.setWarning()
.onClick(async () => {
const files = this.app.vault.getFiles();
Logger("Verify and repair all files started", LOG_LEVEL.NOTICE);
const notice = NewNotice("", 0);
Logger("Verify and repair all files started", LOG_LEVEL.NOTICE, "verify");
// const notice = NewNotice("", 0);
let i = 0;
for (const file of files) {
i++;
Logger(`Update into ${file.path}`);
notice.setMessage(`${i}/${files.length}\n${file.path}`);
Logger(`${i}/${files.length}\n${file.path}`, LOG_LEVEL.NOTICE, "verify");
try {
await this.plugin.updateIntoDB(file);
} catch (ex) {
@@ -920,8 +928,7 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
Logger(ex);
}
}
notice.hide();
Logger("done", LOG_LEVEL.NOTICE);
Logger("done", LOG_LEVEL.NOTICE, "verify");
})
);
new Setting(containerHatchEl)
@@ -933,9 +940,8 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
.setDisabled(false)
.setWarning()
.onClick(async () => {
const notice = NewNotice("", 0);
Logger(`Begin sanity check`, LOG_LEVEL.INFO);
notice.setMessage(`Begin sanity check`);
// const notice = NewNotice("", 0);
Logger(`Begin sanity check`, LOG_LEVEL.NOTICE, "sancheck");
await runWithLock("sancheck", true, async () => {
const db = this.plugin.localDatabase.localDatabase;
const wf = await db.allDocs();
@@ -943,59 +949,22 @@ export class ObsidianLiveSyncSettingTab extends PluginSettingTab {
let count = 0;
for (const id of filesDatabase) {
count++;
notice.setMessage(`${count}/${filesDatabase.length}\n${id2path(id)}`);
Logger(`${count}/${filesDatabase.length}\n${id2path(id)}`, LOG_LEVEL.NOTICE, "sancheck");
const w = await db.get<EntryDoc>(id);
if (!(await this.plugin.localDatabase.sanCheck(w))) {
Logger(`The file ${id2path(id)} missing child(ren)`, LOG_LEVEL.NOTICE);
}
}
});
notice.hide();
Logger(`Done`, LOG_LEVEL.NOTICE);
Logger(`Done`, LOG_LEVEL.NOTICE, "sancheck");
// Logger("done", LOG_LEVEL.NOTICE);
})
);
new Setting(containerHatchEl)
.setName("Drop History")
.setDesc("Initialize local and remote database, and send all or retrieve all again.")
.addButton((button) =>
button
.setButtonText("Drop and send")
.setWarning()
.setDisabled(false)
.setClass("sls-btn-left")
.onClick(async () => {
await dropHistory(true);
})
)
.addButton((button) =>
button
.setButtonText("Drop and receive")
.setWarning()
.setDisabled(false)
.setClass("sls-btn-right")
.onClick(async () => {
await dropHistory(false);
})
);
new Setting(containerHatchEl)
.setName("Lock remote database")
.setDesc("Lock remote database for synchronize")
.addButton((button) =>
button
.setButtonText("Lock")
.setDisabled(false)
.setWarning()
.onClick(async () => {
await this.plugin.markRemoteLocked();
})
);
new Setting(containerHatchEl)
.setName("Suspend file watching")
.setDesc("if enables it, all file operations are ignored.")
.setDesc("If enables it, all file operations are ignored.")
.addToggle((toggle) =>
toggle.setValue(this.plugin.settings.suspendFileWatching).onChange(async (value) => {
this.plugin.settings.suspendFileWatching = value;

Submodule src/lib updated: 4decf16d62...654bfcf8a6

View File

@@ -16,8 +16,8 @@ import {
isPlainText,
setNoticeClass,
NewNotice,
allSettledWithConcurrencyLimit,
getLocks,
Parallels,
} from "./lib/src/utils";
import { Logger, setLogger } from "./lib/src/logger";
import { LocalPouchDB } from "./LocalPouchDB";
@@ -32,6 +32,7 @@ import { id2path, path2id } from "./utils";
import { decrypt, encrypt } from "./lib/src/e2ee_v2";
const isDebug = false;
setNoticeClass(Notice);
class PluginDialogModal extends Modal {
plugin: ObsidianLiveSyncPlugin;
@@ -162,7 +163,21 @@ const askString = (app: App, title: string, key: string, placeholder: string): P
dialog.open();
});
};
let touchedFiles: string[] = [];
function touch(file: TFile | string) {
const f = file instanceof TFile ? file : app.vault.getAbstractFileByPath(file) as TFile;
const key = `${f.path}-${f.stat.mtime}-${f.stat.size}`;
touchedFiles.push(key);
touchedFiles = touchedFiles.slice(0, 100);
}
function recentlyTouched(file: TFile) {
const key = `${file.path}-${file.stat.mtime}-${file.stat.size}`;
if (touchedFiles.indexOf(key) == -1) return false;
return true;
}
function clearTouched() {
touchedFiles = [];
}
export default class ObsidianLiveSyncPlugin extends Plugin {
settings: ObsidianLiveSyncSettings;
localDatabase: LocalPouchDB;
@@ -431,9 +446,9 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
});
this.addCommand({
id: "livesync-gc",
name: "garbage collect now",
name: "Check garbages now",
callback: () => {
this.garbageCollect();
this.garbageCheck();
},
});
this.addCommand({
@@ -539,8 +554,8 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
return await this.localDatabase.initializeDatabase();
}
async garbageCollect() {
await this.localDatabase.garbageCollect();
async garbageCheck() {
await this.localDatabase.garbageCheck();
}
async loadSettings() {
@@ -549,9 +564,11 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
this.settings.workingPassphrase = this.settings.passphrase;
// Delete this feature to avoid problems on mobile.
this.settings.disableRequestURI = true;
// Temporary disabled
// TODO: If a new GC is created, a new default value must be created.
// GC is disabled.
this.settings.gcDelay = 0;
// So, use history is always enabled.
this.settings.useHistory = true;
const lsname = "obsidian-live-sync-vaultanddevicename-" + this.app.vault.getName();
if (this.settings.deviceAndVaultName != "") {
@@ -589,7 +606,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
}
this.gcTimerHandler = setTimeout(() => {
this.gcTimerHandler = null;
this.garbageCollect();
this.garbageCheck();
}, GC_DELAY);
}
@@ -652,6 +669,9 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
watchVaultCreate(file: TFile, ...args: any[]) {
if (this.settings.suspendFileWatching) return;
if (recentlyTouched(file)) {
return;
}
this.watchVaultChangeAsync(file, ...args);
}
@@ -659,6 +679,9 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
if (!(file instanceof TFile)) {
return;
}
if (recentlyTouched(file)) {
return;
}
if (this.settings.suspendFileWatching) return;
// If batchsave is enabled, queue all changes and do nothing.
@@ -687,20 +710,28 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
return await runWithLock("batchSave", false, async () => {
const batchItems = JSON.parse(JSON.stringify(this.batchFileChange)) as string[];
this.batchFileChange = [];
const promises = batchItems.map(async (e) => {
try {
const f = this.app.vault.getAbstractFileByPath(normalizePath(e));
if (f && f instanceof TFile) {
await this.updateIntoDB(f);
Logger(`Batch save:${e}`);
const limit = 3;
const p = Parallels();
for (const e of batchItems) {
const w = (async () => {
try {
const f = this.app.vault.getAbstractFileByPath(normalizePath(e));
if (f && f instanceof TFile) {
await this.updateIntoDB(f);
Logger(`Batch save:${e}`);
}
} catch (ex) {
Logger(`Batch save error:${e}`, LOG_LEVEL.NOTICE);
Logger(ex, LOG_LEVEL.VERBOSE);
}
} catch (ex) {
Logger(`Batch save error:${e}`, LOG_LEVEL.NOTICE);
Logger(ex, LOG_LEVEL.VERBOSE);
}
});
})();
p.add(w);
await p.wait(limit)
}
this.refreshStatusText();
await p.all();
this.refreshStatusText();
await allSettledWithConcurrencyLimit(promises, 3);
return;
});
}
@@ -709,6 +740,9 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
async watchVaultChangeAsync(file: TFile, ...args: any[]) {
if (file instanceof TFile) {
if (recentlyTouched(file)) {
return;
}
await this.updateIntoDB(file);
this.gcHook();
}
@@ -716,7 +750,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
watchVaultDelete(file: TAbstractFile) {
// When save is delayed, it should be cancelled.
this.batchFileChange = this.batchFileChange.filter((e) => e == file.path);
this.batchFileChange = this.batchFileChange.filter((e) => e != file.path);
if (this.settings.suspendFileWatching) return;
this.watchVaultDeleteAsync(file).then(() => { });
}
@@ -805,7 +839,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
lastLog = "";
// eslint-disable-next-line require-await
async addLog(message: any, level: LOG_LEVEL = LOG_LEVEL.INFO) {
async addLog(message: any, level: LOG_LEVEL = LOG_LEVEL.INFO, key = "") {
if (level == LOG_LEVEL.DEBUG && !isDebug) {
return;
}
@@ -828,13 +862,24 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
// }
if (level >= LOG_LEVEL.NOTICE) {
if (messagecontent in this.notifies) {
clearTimeout(this.notifies[messagecontent].timer);
this.notifies[messagecontent].count++;
this.notifies[messagecontent].notice.setMessage(`(${this.notifies[messagecontent].count}):${messagecontent}`);
this.notifies[messagecontent].timer = setTimeout(() => {
const notify = this.notifies[messagecontent].notice;
delete this.notifies[messagecontent];
if (!key) key = messagecontent;
if (key in this.notifies) {
// @ts-ignore
const isShown = this.notifies[key].notice.noticeEl?.isShown()
if (!isShown) {
this.notifies[key].notice = new Notice(messagecontent, 0);
}
clearTimeout(this.notifies[key].timer);
if (key == messagecontent) {
this.notifies[key].count++;
this.notifies[key].notice.setMessage(`(${this.notifies[key].count}):${messagecontent}`);
} else {
this.notifies[key].notice.setMessage(`${messagecontent}`);
}
this.notifies[key].timer = setTimeout(() => {
const notify = this.notifies[key].notice;
delete this.notifies[key];
try {
notify.hide();
} catch (ex) {
@@ -843,11 +888,11 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
}, 5000);
} else {
const notify = new Notice(messagecontent, 0);
this.notifies[messagecontent] = {
this.notifies[key] = {
count: 0,
notice: notify,
timer: setTimeout(() => {
delete this.notifies[messagecontent];
delete this.notifies[key];
notify.hide();
}, 5000),
};
@@ -884,12 +929,13 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
}
const doc = await this.localDatabase.getDBEntry(pathSrc, { rev: docEntry._rev });
if (doc === false) return;
const msg = `DB -> STORAGE (create${force ? ",force" : ""},${doc.datatype}) `;
const path = id2path(doc._id);
if (doc.datatype == "newnote") {
const bin = base64ToArrayBuffer(doc.data);
if (bin != null) {
if (!isValidPath(path)) {
Logger(`The file that having platform dependent name has been arrived. This file has skipped: ${path}`, LOG_LEVEL.NOTICE);
Logger(msg + "ERROR, invalid path: " + path, LOG_LEVEL.NOTICE);
return;
}
await this.ensureDirectory(path);
@@ -898,16 +944,18 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
ctime: doc.ctime,
mtime: doc.mtime,
});
Logger("live : write to local (newfile:b) " + path);
this.batchFileChange = this.batchFileChange.filter((e) => e != newfile.path);
Logger(msg + path);
touch(newfile);
this.app.vault.trigger("create", newfile);
} catch (ex) {
Logger("could not write to local (newfile:bin) " + path, LOG_LEVEL.NOTICE);
Logger(msg + "ERROR, Could not write: " + path, LOG_LEVEL.NOTICE);
Logger(ex, LOG_LEVEL.VERBOSE);
}
}
} else if (doc.datatype == "plain") {
if (!isValidPath(path)) {
Logger(`The file that having platform dependent name has been arrived. This file has skipped: ${path}`, LOG_LEVEL.NOTICE);
Logger(msg + "ERROR, invalid path: " + path, LOG_LEVEL.NOTICE);
return;
}
await this.ensureDirectory(path);
@@ -916,14 +964,16 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
ctime: doc.ctime,
mtime: doc.mtime,
});
Logger("live : write to local (newfile:p) " + path);
this.batchFileChange = this.batchFileChange.filter((e) => e != newfile.path);
Logger(msg + path);
touch(newfile);
this.app.vault.trigger("create", newfile);
} catch (ex) {
Logger("could not write to local (newfile:plain) " + path, LOG_LEVEL.NOTICE);
Logger(msg + "ERROR, Could not parse: " + path + "(" + doc.datatype + ")", LOG_LEVEL.NOTICE);
Logger(ex, LOG_LEVEL.VERBOSE);
}
} else {
Logger("live : New data imcoming, but we cound't parse that." + doc.datatype, LOG_LEVEL.NOTICE);
Logger(msg + "ERROR, Could not parse: " + path + "(" + doc.datatype + ")", LOG_LEVEL.NOTICE);
}
}
@@ -967,41 +1017,46 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
const docMtime = ~~(docEntry.mtime / 1000);
if (localMtime < docMtime || force) {
const doc = await this.localDatabase.getDBEntry(pathSrc);
let msg = "livesync : newer local files so write to local:" + file.path;
if (force) msg = "livesync : force write to local:" + file.path;
if (doc === false) return;
const msg = `DB -> STORAGE (modify${force ? ",force" : ""},${doc.datatype}) `;
const path = id2path(doc._id);
if (doc.datatype == "newnote") {
const bin = base64ToArrayBuffer(doc.data);
if (bin != null) {
if (!isValidPath(path)) {
Logger(`The file that having platform dependent name has been arrived. This file has skipped: ${path}`, LOG_LEVEL.NOTICE);
Logger(msg + "ERROR, invalid path: " + path, LOG_LEVEL.NOTICE);
return;
}
await this.ensureDirectory(path);
try {
await this.app.vault.modifyBinary(file, bin, { ctime: doc.ctime, mtime: doc.mtime });
Logger(msg);
this.app.vault.trigger("modify", file);
this.batchFileChange = this.batchFileChange.filter((e) => e != file.path);
Logger(msg + path);
const xf = this.app.vault.getAbstractFileByPath(file.path) as TFile;
touch(xf);
this.app.vault.trigger("modify", xf);
} catch (ex) {
Logger("could not write to local (modify:bin) " + path, LOG_LEVEL.NOTICE);
Logger(msg + "ERROR, Could not write: " + path, LOG_LEVEL.NOTICE);
}
}
} else if (doc.datatype == "plain") {
if (!isValidPath(path)) {
Logger(`The file that having platform dependent name has been arrived. This file has skipped: ${path}`, LOG_LEVEL.NOTICE);
Logger(msg + "ERROR, invalid path: " + path, LOG_LEVEL.NOTICE);
return;
}
await this.ensureDirectory(path);
try {
await this.app.vault.modify(file, doc.data, { ctime: doc.ctime, mtime: doc.mtime });
Logger(msg);
this.app.vault.trigger("modify", file);
Logger(msg + path);
this.batchFileChange = this.batchFileChange.filter((e) => e != file.path);
const xf = this.app.vault.getAbstractFileByPath(file.path) as TFile;
touch(xf);
this.app.vault.trigger("modify", xf);
} catch (ex) {
Logger("could not write to local (modify:plain) " + path, LOG_LEVEL.NOTICE);
Logger(msg + "ERROR, Could not write: " + path, LOG_LEVEL.NOTICE);
}
} else {
Logger("live : New data imcoming, but we cound't parse that.:" + doc.datatype + "-", LOG_LEVEL.NOTICE);
Logger(msg + "ERROR, Could not parse: " + path + "(" + doc.datatype + ")", LOG_LEVEL.NOTICE);
}
} else if (localMtime > docMtime) {
// newer local file.
@@ -1046,7 +1101,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
}[] = [];
chunkWaitTimeout = 60000;
async saveQueuedFiles() {
saveQueuedFiles() {
const saveData = JSON.stringify(this.queuedFiles.filter((e) => !e.done).map((e) => e.entry._id));
const lsname = "obsidian-livesync-queuefiles-" + this.app.vault.getName();
localStorage.setItem(lsname, saveData);
@@ -1072,6 +1127,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
if (isValidPath(id2path(queue.entry._id))) {
Logger(`Applying ${queue.entry._id} (${queue.entry._rev}) change...`);
await this.handleDBChanged(queue.entry);
Logger(`Applied ${queue.entry._id} (${queue.entry._rev})`);
}
} else if (now > queue.timeout) {
if (!queue.warned) Logger(`Timed out: ${queue.entry._id} could not collect ${queue.missingChildren.length} chunks. plugin keeps watching, but you have to check the file after the replication.`, LOG_LEVEL.NOTICE);
@@ -1128,7 +1184,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
if ("children" in doc) {
const c = await this.localDatabase.localDatabase.allDocs({ keys: doc.children, include_docs: false });
const missing = c.rows.filter((e) => "error" in e).map((e) => e.key);
Logger(`${doc._id}(${doc._rev}) Queued (waiting ${missing.length} items)`, LOG_LEVEL.VERBOSE);
if (missing.length > 0) Logger(`${doc._id}(${doc._rev}) Queued (waiting ${missing.length} items)`, LOG_LEVEL.VERBOSE);
newQueue.missingChildren = missing;
this.queuedFiles.push(newQueue);
} else {
@@ -1350,7 +1406,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
async replicate(showMessage?: boolean) {
if (this.settings.versionUpFlash != "") {
NewNotice("Open settings and check message, please.");
Logger("Open settings and check message, please.", LOG_LEVEL.NOTICE);
return;
}
await this.applyBatchChange();
@@ -1393,16 +1449,21 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
async syncAllFiles(showingNotice?: boolean) {
// synchronize all files between database and storage.
let notice: Notice = null;
let initialScan = false;
if (showingNotice) {
notice = NewNotice("Initializing", 0);
Logger("Initializing", LOG_LEVEL.NOTICE, "syncAll");
}
const filesStorage = this.app.vault.getFiles();
const filesStorageName = filesStorage.map((e) => e.path);
const wf = await this.localDatabase.localDatabase.allDocs();
const filesDatabase = wf.rows.filter((e) => !e.id.startsWith("h:") && !e.id.startsWith("ps:") && e.id != "obsydian_livesync_version").map((e) => id2path(e.id));
const filesDatabase = wf.rows.filter((e) => !e.id.startsWith("h:") && !e.id.startsWith("ps:") && e.id != "obsydian_livesync_version").filter(e => isValidPath(e.id)).map((e) => id2path(e.id));
const isInitialized = await (this.localDatabase.kvDB.get<boolean>("initialized")) || false;
// Make chunk bigger if it is the initial scan. There must be non-active docs.
if (filesDatabase.length == 0 && !isInitialized) {
initialScan = true;
Logger("Database looks empty, save files as initial sync data");
}
const onlyInStorage = filesStorage.filter((e) => filesDatabase.indexOf(e.path) == -1);
const onlyInDatabase = filesDatabase.filter((e) => filesStorageName.indexOf(e) == -1);
@@ -1418,45 +1479,71 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
Logger(procedurename);
let i = 0;
// let lastTicks = performance.now() + 2000;
let workProcs = 0;
const procs = objects.map(async (e) => {
try {
workProcs++;
await callback(e);
// let workProcs = 0;
const p = Parallels();
const limit = 10;
Logger(`${procedurename} exec.`);
for (const v of objects) {
// workProcs++;
if (!this.localDatabase.isReady) throw Error("Database is not ready!");
p.add(callback(v).then(() => {
i++;
if (i % 25 == 0) {
const notify = `${procedurename} : ${workProcs}/${count} (Pending:${workProcs})`;
if (notice != null) notice.setMessage(notify);
Logger(notify);
if (i % 100 == 0) {
const notify = `${procedurename} : ${i}/${count}`;
if (showingNotice) {
Logger(notify, LOG_LEVEL.NOTICE, "syncAll");
} else {
Logger(notify);
}
this.setStatusBarText(notify);
}
} catch (ex) {
}).catch(ex => {
Logger(`Error while ${procedurename}`, LOG_LEVEL.NOTICE);
Logger(ex);
} finally {
workProcs--;
}
});
await allSettledWithConcurrencyLimit(procs, 10);
}).finally(() => {
// workProcs--;
})
);
await p.wait(limit);
}
await p.all();
Logger(`${procedurename} done.`);
};
await runAll("UPDATE DATABASE", onlyInStorage, async (e) => {
Logger(`Update into ${e.path}`);
await this.updateIntoDB(e);
});
await runAll("UPDATE STORAGE", onlyInDatabase, async (e) => {
Logger(`Pull from db:${e}`);
await this.pullFile(e, filesStorage, false, null, false);
});
await runAll("CHECK FILE STATUS", syncFiles, async (e) => {
await this.syncFileBetweenDBandStorage(e, filesStorage);
await this.updateIntoDB(e, initialScan);
});
if (!initialScan) {
await runAll("UPDATE STORAGE", onlyInDatabase, async (e) => {
Logger(`Pull from db:${e}`);
await this.pullFile(e, filesStorage, false, null, false);
});
}
if (!initialScan) {
let caches: { [key: string]: { storageMtime: number; docMtime: number } } = {};
caches = await this.localDatabase.kvDB.get<{ [key: string]: { storageMtime: number; docMtime: number } }>("diff-caches") || {};
const docsCount = syncFiles.length;
do {
const syncFilesX = syncFiles.splice(0, 100);
const docs = await this.localDatabase.localDatabase.allDocs({ keys: syncFilesX.map(e => path2id(e.path)), include_docs: true })
const syncFilesToSync = syncFilesX.map((e) => ({ ...e, doc: docs.rows.find(ee => ee.id == path2id(e.path)).doc as LoadedEntry }));
await runAll(`CHECK FILE STATUS:${syncFiles.length}/${docsCount}`, syncFilesToSync, async (e) => {
caches = await this.syncFileBetweenDBandStorage(e, initialScan, caches);
});
} while (syncFiles.length > 0);
await this.localDatabase.kvDB.set("diff-caches", caches);
}
this.setStatusBarText(`NOW TRACKING!`);
Logger("Initialized,NOW TRACKING!");
if (!isInitialized) {
await (this.localDatabase.kvDB.set("initialized", true))
}
if (showingNotice) {
notice.hide();
Logger("Initialize done!", LOG_LEVEL.NOTICE);
Logger("Initialize done!", LOG_LEVEL.NOTICE, "syncAll");
}
}
@@ -1638,7 +1725,7 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
} else if (toDelete == null) {
Logger("Leave it still conflicted");
} else {
Logger(`resolved conflict:${file.path}`);
Logger(`Conflict resolved:${file.path}`);
await this.localDatabase.deleteDBEntry(file.path, { rev: toDelete });
await this.pullFile(file.path, null, true, toKeep);
setTimeout(() => {
@@ -1719,44 +1806,48 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
//when to opened file;
}
async syncFileBetweenDBandStorage(file: TFile, fileList?: TFile[]) {
const doc = await this.localDatabase.getDBEntryMeta(file.path);
if (doc === false) return;
async syncFileBetweenDBandStorage(file: TFile & { doc?: LoadedEntry }, initialScan: boolean, caches: { [key: string]: { storageMtime: number; docMtime: number } }) {
const doc = file.doc;
if (!doc) return;
const storageMtime = ~~(file.stat.mtime / 1000);
const docMtime = ~~(doc.mtime / 1000);
const dK = `${file.path}-diff`;
const isLastDiff = (await this.localDatabase.kvDB.get<{ storageMtime: number; docMtime: number }>(dK)) || { storageMtime: 0, docMtime: 0 };
const isLastDiff = dK in caches ? caches[dK] : { storageMtime: 0, docMtime: 0 };
if (isLastDiff.docMtime == docMtime && isLastDiff.storageMtime == storageMtime) {
// Logger("CHECKED :" + file.path, LOG_LEVEL.VERBOSE);
} else {
if (storageMtime > docMtime) {
//newer local file.
Logger("STORAGE -> DB :" + file.path);
Logger(`${storageMtime} > ${docMtime}`);
await this.updateIntoDB(file);
} else if (storageMtime < docMtime) {
//newer database file.
Logger("STORAGE <- DB :" + file.path);
Logger(`${storageMtime} < ${docMtime}`);
const docx = await this.localDatabase.getDBEntry(file.path, null, false, false);
if (docx != false) {
await this.doc2storage_modify(docx, file);
}
} else {
// Logger("EVEN :" + file.path, LOG_LEVEL.VERBOSE);
// Logger(`${storageMtime} = ${docMtime}`, LOG_LEVEL.VERBOSE);
//eq.case
}
await this.localDatabase.kvDB.set(dK, { storageMtime, docMtime });
caches[dK] = { storageMtime, docMtime };
return caches;
}
if (storageMtime > docMtime) {
//newer local file.
Logger("STORAGE -> DB :" + file.path);
Logger(`${storageMtime} > ${docMtime}`);
await this.updateIntoDB(file, initialScan);
caches[dK] = { storageMtime, docMtime };
return caches;
} else if (storageMtime < docMtime) {
//newer database file.
Logger("STORAGE <- DB :" + file.path);
Logger(`${storageMtime} < ${docMtime}`);
const docx = await this.localDatabase.getDBEntry(file.path, null, false, false);
if (docx != false) {
await this.doc2storage_modify(docx, file);
}
caches[dK] = { storageMtime, docMtime };
return caches;
} else {
// Logger("EVEN :" + file.path, LOG_LEVEL.VERBOSE);
// Logger(`${storageMtime} = ${docMtime}`, LOG_LEVEL.VERBOSE);
//eq.case
}
caches[dK] = { storageMtime, docMtime };
return caches;
}
async updateIntoDB(file: TFile) {
async updateIntoDB(file: TFile, initialScan?: boolean) {
if (shouldBeIgnored(file.path)) {
return;
}
await this.localDatabase.waitForGCComplete();
let content = "";
let datatype: "plain" | "newnote" = "newnote";
if (!isPlainText(file.name)) {
@@ -1778,13 +1869,17 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
datatype: datatype,
};
//upsert should locked
const msg = `DB <- STORAGE (${datatype}) `;
const isNotChanged = await runWithLock("file:" + fullpath, false, async () => {
if (recentlyTouched(file)) {
return true;
}
const old = await this.localDatabase.getDBEntry(fullpath, null, false, false);
if (old !== false) {
const oldData = { data: old.data, deleted: old._deleted };
const newData = { data: d.data, deleted: d._deleted };
if (JSON.stringify(oldData) == JSON.stringify(newData)) {
Logger("not changed:" + fullpath + (d._deleted ? " (deleted)" : ""), LOG_LEVEL.VERBOSE);
Logger(msg + "Skipped (not changed) " + fullpath + (d._deleted ? " (deleted)" : ""), LOG_LEVEL.VERBOSE);
return true;
}
// d._rev = old._rev;
@@ -1792,10 +1887,11 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
return false;
});
if (isNotChanged) return;
await this.localDatabase.putDBEntry(d);
await this.localDatabase.putDBEntry(d, initialScan);
this.queuedFiles = this.queuedFiles.map((e) => ({ ...e, ...(e.entry._id == d._id ? { done: true } : {}) }));
Logger("put database:" + fullpath + "(" + datatype + ") ");
Logger(msg + fullpath);
if (this.settings.syncOnSave && !this.suspended) {
await this.replicate();
}
@@ -1818,9 +1914,11 @@ export default class ObsidianLiveSyncPlugin extends Plugin {
}
async resetLocalDatabase() {
clearTouched();
await this.localDatabase.resetDatabase();
}
async resetLocalOldDatabase() {
clearTouched();
await this.localDatabase.resetLocalOldDatabase();
}