### New experimental feature

- We can perform Garbage Collection (Beta2) without rebuilding the entire database, and also fetch the database.

### Fixed

- Resetting the bucket now properly clears all uploaded files.

### Refactored

- Some files have been moved to better reflect their purpose and improve maintainability.
- The extensive LiveSyncLocalDB has been split into separate files for each role.
This commit is contained in:
vorotamoroz
2025-08-26 11:09:33 +01:00
parent 5d24c3b984
commit 08548f8630
6 changed files with 301 additions and 18 deletions

View File

@@ -86,7 +86,7 @@ export class HiddenFileSync extends LiveSyncCommands implements IObsidianModule
return this.plugin.kvDB; return this.plugin.kvDB;
} }
getConflictedDoc(path: FilePathWithPrefix, rev: string) { getConflictedDoc(path: FilePathWithPrefix, rev: string) {
return this.plugin.localDatabase.getConflictedDoc(path, rev); return this.plugin.managers.conflictManager.getConflictedDoc(path, rev);
} }
onunload() { onunload() {
this.periodicInternalFileScanProcessor?.disable(); this.periodicInternalFileScanProcessor?.disable();
@@ -699,7 +699,7 @@ Offline Changed files: ${processFiles.length}`;
revFrom._revs_info revFrom._revs_info
?.filter((e) => e.status == "available" && Number(e.rev.split("-")[0]) < conflictedRevNo) ?.filter((e) => e.status == "available" && Number(e.rev.split("-")[0]) < conflictedRevNo)
.first()?.rev ?? ""; .first()?.rev ?? "";
const result = await this.plugin.localDatabase.mergeObject( const result = await this.plugin.managers.conflictManager.mergeObject(
doc.path, doc.path,
commonBase, commonBase,
doc._rev, doc._rev,

View File

@@ -1,9 +1,27 @@
import { sizeToHumanReadable } from "octagonal-wheels/number"; import { sizeToHumanReadable } from "octagonal-wheels/number";
import { LOG_LEVEL_NOTICE, type MetaEntry } from "../../lib/src/common/types"; import {
EntryTypes,
LOG_LEVEL_INFO,
LOG_LEVEL_NOTICE,
LOG_LEVEL_VERBOSE,
type DocumentID,
type EntryDoc,
type EntryLeaf,
type MetaEntry,
} from "../../lib/src/common/types";
import { getNoFromRev } from "../../lib/src/pouchdb/LiveSyncLocalDB"; import { getNoFromRev } from "../../lib/src/pouchdb/LiveSyncLocalDB";
import type { IObsidianModule } from "../../modules/AbstractObsidianModule"; import type { IObsidianModule } from "../../modules/AbstractObsidianModule";
import { LiveSyncCommands } from "../LiveSyncCommands"; import { LiveSyncCommands } from "../LiveSyncCommands";
import { serialized } from "octagonal-wheels/concurrency/lock_v2";
import { arrayToChunkedArray } from "octagonal-wheels/collection";
const DB_KEY_SEQ = "gc-seq";
const DB_KEY_CHUNK_SET = "chunk-set";
const DB_KEY_DOC_USAGE_MAP = "doc-usage-map";
type ChunkID = DocumentID;
type NoteDocumentID = DocumentID;
type Rev = string;
type ChunkUsageMap = Map<NoteDocumentID, Map<Rev, Set<ChunkID>>>;
export class LocalDatabaseMaintenance extends LiveSyncCommands implements IObsidianModule { export class LocalDatabaseMaintenance extends LiveSyncCommands implements IObsidianModule {
$everyOnload(): Promise<boolean> { $everyOnload(): Promise<boolean> {
return Promise.resolve(true); return Promise.resolve(true);
@@ -262,4 +280,213 @@ Note: **Make sure to synchronise all devices before deletion.**
this.clearHash(); this.clearHash();
} }
} }
async scanUnusedChunks() {
const kvDB = this.plugin.kvDB;
const chunkSet = (await kvDB.get<Set<DocumentID>>(DB_KEY_CHUNK_SET)) || new Set();
const chunkUsageMap = (await kvDB.get<ChunkUsageMap>(DB_KEY_DOC_USAGE_MAP)) || new Map();
const KEEP_MAX_REVS = 10;
const unusedSet = new Set<DocumentID>([...chunkSet]);
for (const [, revIdMap] of chunkUsageMap) {
const sortedRevId = [...revIdMap.entries()].sort((a, b) => getNoFromRev(b[0]) - getNoFromRev(a[0]));
if (sortedRevId.length > KEEP_MAX_REVS) {
// If we have more revisions than we want to keep, we need to delete the extras
}
const keepRevID = sortedRevId.slice(0, KEEP_MAX_REVS);
keepRevID.forEach((e) => e[1].forEach((ee) => unusedSet.delete(ee)));
}
return {
chunkSet,
chunkUsageMap,
unusedSet,
};
}
/**
* Track changes in the database and update the chunk usage map for garbage collection.
* Note that this only able to perform without Fetch chunks on demand.
*/
async trackChanges(fromStart: boolean = false, showNotice: boolean = false) {
if (!this.isAvailable()) return;
const logLevel = showNotice ? LOG_LEVEL_NOTICE : LOG_LEVEL_INFO;
const kvDB = this.plugin.kvDB;
const previousSeq = fromStart ? "" : await kvDB.get<string>(DB_KEY_SEQ);
const chunkSet = (await kvDB.get<Set<DocumentID>>(DB_KEY_CHUNK_SET)) || new Set();
const chunkUsageMap = (await kvDB.get<ChunkUsageMap>(DB_KEY_DOC_USAGE_MAP)) || new Map();
const db = this.localDatabase.localDatabase;
const verbose = (msg: string) => this._verbose(msg);
const processDoc = async (doc: EntryDoc, isDeleted: boolean) => {
if (!("children" in doc)) {
return;
}
const id = doc._id;
const rev = doc._rev!;
const deleted = doc._deleted || isDeleted;
const softDeleted = doc.deleted;
const children = (doc.children || []) as DocumentID[];
if (!chunkUsageMap.has(id)) {
chunkUsageMap.set(id, new Map<Rev, Set<ChunkID>>());
}
for (const chunkId of children) {
if (deleted) {
chunkUsageMap.get(id)!.delete(rev);
// chunkSet.add(chunkId as DocumentID);
} else {
if (softDeleted) {
//TODO: Soft delete
chunkUsageMap.get(id)!.set(rev, (chunkUsageMap.get(id)!.get(rev) || new Set()).add(chunkId));
} else {
chunkUsageMap.get(id)!.set(rev, (chunkUsageMap.get(id)!.get(rev) || new Set()).add(chunkId));
}
}
}
verbose(
`Tracking chunk: ${id}/${rev} (${doc?.path}), deleted: ${deleted ? "yes" : "no"} Soft-Deleted:${softDeleted ? "yes" : "no"}`
);
return await Promise.resolve();
};
// let saveQueue = 0;
const saveState = async (seq: string | number) => {
await kvDB.set(DB_KEY_SEQ, seq);
await kvDB.set(DB_KEY_CHUNK_SET, chunkSet);
await kvDB.set(DB_KEY_DOC_USAGE_MAP, chunkUsageMap);
};
const processDocRevisions = async (doc: EntryDoc) => {
try {
const oldRevisions = await db.get(doc._id, { revs: true, revs_info: true, conflicts: true });
const allRevs = oldRevisions._revs_info?.length || 0;
const info = (oldRevisions._revs_info || [])
.filter((e) => e.status == "available" && e.rev != doc._rev)
.filter((info) => !chunkUsageMap.get(doc._id)?.has(info.rev));
const infoLength = info.length;
this._log(`Found ${allRevs} old revisions for ${doc._id} . ${infoLength} items to check `);
if (info.length > 0) {
const oldDocs = await Promise.all(
info
.filter((revInfo) => revInfo.status == "available")
.map((revInfo) => db.get(doc._id, { rev: revInfo.rev }))
).then((docs) => docs.filter((doc) => doc));
for (const oldDoc of oldDocs) {
await processDoc(oldDoc as EntryDoc, false);
}
}
} catch (ex) {
if ((ex as any)?.status == 404) {
this._log(`No revisions found for ${doc._id}`, LOG_LEVEL_VERBOSE);
} else {
this._log(`Error finding revisions for ${doc._id}`);
this._verbose(ex);
}
}
};
const processChange = async (doc: EntryDoc, isDeleted: boolean, seq: string | number) => {
if (doc.type === EntryTypes.CHUNK) {
if (isDeleted) return;
chunkSet.add(doc._id);
} else if ("children" in doc) {
await processDoc(doc, isDeleted);
await serialized("x-process-doc", async () => await processDocRevisions(doc));
}
};
// Track changes
let i = 0;
await db
.changes({
since: previousSeq || "",
live: false,
conflicts: true,
include_docs: true,
style: "all_docs",
return_docs: false,
})
.on("change", async (change) => {
// handle change
await processChange(change.doc!, change.deleted ?? false, change.seq);
if (i++ % 100 == 0) {
await saveState(change.seq);
}
})
.on("complete", async (info) => {
await saveState(info.last_seq);
});
// Track all changed docs and new-leafs;
const result = await this.scanUnusedChunks();
const message = `Total chunks: ${result.chunkSet.size}\nUnused chunks: ${result.unusedSet.size}`;
this._log(message, logLevel);
}
async performGC(showingNotice = false) {
if (!this.isAvailable()) return;
await this.trackChanges(false, showingNotice);
const title = "Are all devices synchronised?";
const confirmMessage = `This function deletes unused chunks from the device. If there are differences between devices, some chunks may be missing when resolving conflicts.
Be sure to synchronise before executing.
However, if you have deleted them, you may be able to recover them by performing Hatch -> Recreate missing chunks for all files.
Are you ready to delete unused chunks?`;
const logLevel = showingNotice ? LOG_LEVEL_NOTICE : LOG_LEVEL_INFO;
const BUTTON_OK = `Yes, delete chunks`;
const BUTTON_CANCEL = "Cancel";
const result = await this.plugin.confirm.askSelectStringDialogue(
confirmMessage,
[BUTTON_OK, BUTTON_CANCEL] as const,
{
title,
defaultAction: BUTTON_CANCEL,
}
);
if (result !== BUTTON_OK) {
this._log("User cancelled chunk deletion", logLevel);
return;
}
const { unusedSet, chunkSet } = await this.scanUnusedChunks();
const deleteChunks = await this.database.allDocs({
keys: [...unusedSet],
include_docs: true,
});
for (const chunk of deleteChunks.rows) {
if ((chunk as any)?.value?.deleted) {
chunkSet.delete(chunk.key as DocumentID);
}
}
const deleteDocs = deleteChunks.rows
.filter((e) => "doc" in e)
.map((e) => ({
...(e as any).doc!,
_deleted: true,
}));
this._log(`Deleting chunks: ${deleteDocs.length}`, logLevel);
const deleteChunkBatch = arrayToChunkedArray(deleteDocs, 100);
let successCount = 0;
let errored = 0;
for (const batch of deleteChunkBatch) {
const results = await this.database.bulkDocs(batch as EntryLeaf[]);
for (const result of results) {
if ("ok" in result) {
chunkSet.delete(result.id as DocumentID);
successCount++;
} else {
this._log(`Failed to delete doc: ${result.id}`, LOG_LEVEL_VERBOSE);
errored++;
}
}
this._log(`Deleting chunks: ${successCount} `, logLevel, "gc-preforming");
}
const message = `Garbage Collection completed.
Success: ${successCount}, Errored: ${errored}`;
this._log(message, logLevel);
const kvDB = this.plugin.kvDB;
await kvDB.set(DB_KEY_CHUNK_SET, chunkSet);
}
} }

Submodule src/lib updated: 172e7ec61d...aff369c146

View File

@@ -84,6 +84,7 @@ import { ModuleLiveSyncMain } from "./modules/main/ModuleLiveSyncMain.ts";
import { ModuleExtraSyncObsidian } from "./modules/extraFeaturesObsidian/ModuleExtraSyncObsidian.ts"; import { ModuleExtraSyncObsidian } from "./modules/extraFeaturesObsidian/ModuleExtraSyncObsidian.ts";
import { LocalDatabaseMaintenance } from "./features/LocalDatabaseMainte/CmdLocalDatabaseMainte.ts"; import { LocalDatabaseMaintenance } from "./features/LocalDatabaseMainte/CmdLocalDatabaseMainte.ts";
import { P2PReplicator } from "./features/P2PSync/CmdP2PReplicator.ts"; import { P2PReplicator } from "./features/P2PSync/CmdP2PReplicator.ts";
import type { LiveSyncManagers } from "./lib/src/managers/LiveSyncManagers.ts";
function throwShouldBeOverridden(): never { function throwShouldBeOverridden(): never {
throw new Error("This function should be overridden by the module."); throw new Error("This function should be overridden by the module.");
@@ -211,6 +212,7 @@ export default class ObsidianLiveSyncPlugin
settings!: ObsidianLiveSyncSettings; settings!: ObsidianLiveSyncSettings;
localDatabase!: LiveSyncLocalDB; localDatabase!: LiveSyncLocalDB;
managers!: LiveSyncManagers;
simpleStore!: SimpleStore<CheckPointInfo>; simpleStore!: SimpleStore<CheckPointInfo>;
replicator!: LiveSyncAbstractReplicator; replicator!: LiveSyncAbstractReplicator;
confirm!: Confirm; confirm!: Confirm;

View File

@@ -3,6 +3,7 @@ import { LiveSyncLocalDB } from "../../lib/src/pouchdb/LiveSyncLocalDB.ts";
import { initializeStores } from "../../common/stores.ts"; import { initializeStores } from "../../common/stores.ts";
import { AbstractModule } from "../AbstractModule.ts"; import { AbstractModule } from "../AbstractModule.ts";
import type { ICoreModule } from "../ModuleTypes.ts"; import type { ICoreModule } from "../ModuleTypes.ts";
import { LiveSyncManagers } from "../../lib/src/managers/LiveSyncManagers.ts";
export class ModuleLocalDatabaseObsidian extends AbstractModule implements ICoreModule { export class ModuleLocalDatabaseObsidian extends AbstractModule implements ICoreModule {
$everyOnloadStart(): Promise<boolean> { $everyOnloadStart(): Promise<boolean> {
@@ -14,7 +15,21 @@ export class ModuleLocalDatabaseObsidian extends AbstractModule implements ICore
} }
const vaultName = this.core.$$getVaultName(); const vaultName = this.core.$$getVaultName();
this._log($msg("moduleLocalDatabase.logWaitingForReady")); this._log($msg("moduleLocalDatabase.logWaitingForReady"));
const getDB = () => this.core.localDatabase.localDatabase;
const getSettings = () => this.core.settings;
this.core.managers = new LiveSyncManagers({
get database() {
return getDB();
},
getActiveReplicator: () => this.core.replicator,
id2path: this.core.$$id2path.bind(this.core),
path2id: this.core.$$path2id.bind(this.core),
get settings() {
return getSettings();
},
});
this.core.localDatabase = new LiveSyncLocalDB(vaultName, this.core); this.core.localDatabase = new LiveSyncLocalDB(vaultName, this.core);
initializeStores(vaultName); initializeStores(vaultName);
return await this.localDatabase.initializeDatabase(); return await this.localDatabase.initializeDatabase();
} }

View File

@@ -158,7 +158,59 @@ export function paneMaintenance(
) )
.addOnUpdate(this.onlyOnMinIO); .addOnUpdate(this.onlyOnMinIO);
}); });
void addPanel(paneEl, "Garbage Collection (Beta)", (e) => e, this.onlyOnP2POrCouchDB).then((paneEl) => { void addPanel(paneEl, "Garbage Collection (Beta2)", (e) => e, this.onlyOnP2POrCouchDB).then((paneEl) => {
new Setting(paneEl)
.setName("Scan garbage")
.setDesc("Scan for garbage chunks in the database.")
.addButton((button) =>
button
.setButtonText("Scan")
// .setWarning()
.setDisabled(false)
.onClick(async () => {
await this.plugin
.getAddOn<LocalDatabaseMaintenance>(LocalDatabaseMaintenance.name)
?.trackChanges(false, true);
})
)
.addButton((button) =>
button.setButtonText("Rescan").onClick(async () => {
await this.plugin
.getAddOn<LocalDatabaseMaintenance>(LocalDatabaseMaintenance.name)
?.trackChanges(true, true);
})
);
new Setting(paneEl)
.setName("Collect garbage")
.setDesc("Remove all unused chunks from the local database.")
.addButton((button) =>
button
.setButtonText("Collect")
.setWarning()
.setDisabled(false)
.onClick(async () => {
await this.plugin
.getAddOn<LocalDatabaseMaintenance>(LocalDatabaseMaintenance.name)
?.performGC(true);
})
);
new Setting(paneEl)
.setName("Commit File Deletion")
.setDesc("Completely delete all deleted documents from the local database.")
.addButton((button) =>
button
.setButtonText("Delete")
.setWarning()
.setDisabled(false)
.onClick(async () => {
await this.plugin
.getAddOn<LocalDatabaseMaintenance>(LocalDatabaseMaintenance.name)
?.commitFileDeletion();
})
);
});
void addPanel(paneEl, "Garbage Collection (Old and Experimental)", (e) => e, this.onlyOnP2POrCouchDB).then(
(paneEl) => {
new Setting(paneEl) new Setting(paneEl)
.setName("Remove all orphaned chunks") .setName("Remove all orphaned chunks")
.setDesc("Remove all orphaned chunks from the local database.") .setDesc("Remove all orphaned chunks from the local database.")
@@ -190,21 +242,8 @@ export function paneMaintenance(
?.resurrectChunks(); ?.resurrectChunks();
}) })
); );
new Setting(paneEl) }
.setName("Commit File Deletion")
.setDesc("Completely delete all deleted documents from the local database.")
.addButton((button) =>
button
.setButtonText("Delete")
.setWarning()
.setDisabled(false)
.onClick(async () => {
await this.plugin
.getAddOn<LocalDatabaseMaintenance>(LocalDatabaseMaintenance.name)
?.commitFileDeletion();
})
); );
});
void addPanel(paneEl, "Rebuilding Operations (Local)").then((paneEl) => { void addPanel(paneEl, "Rebuilding Operations (Local)").then((paneEl) => {
new Setting(paneEl) new Setting(paneEl)
.setName("Fetch from remote") .setName("Fetch from remote")