diff --git a/src/modules/core/ModuleReplicator.ts b/src/modules/core/ModuleReplicator.ts index 4eb4925..b13fc59 100644 --- a/src/modules/core/ModuleReplicator.ts +++ b/src/modules/core/ModuleReplicator.ts @@ -13,15 +13,16 @@ import { VER, type EntryBody, type EntryDoc, + type EntryLeaf, type LoadedEntry, type MetaEntry, } from "../../lib/src/common/types"; import { QueueProcessor } from "octagonal-wheels/concurrency/processor"; import { getPath, isChunk, isValidPath, scheduleTask } from "../../common/utils"; -import { sendValue } from "octagonal-wheels/messagepassing/signal"; import { isAnyNote } from "../../lib/src/common/utils"; import { EVENT_FILE_SAVED, eventHub } from "../../common/events"; import type { LiveSyncAbstractReplicator } from "../../lib/src/replication/LiveSyncAbstractReplicator"; +import { globalSlipBoard } from "../../lib/src/bureau/bureau"; export class ModuleReplicator extends AbstractModule implements ICoreModule { $everyOnloadAfterLoadSettings(): Promise { @@ -242,9 +243,7 @@ Or if you are sure know what had been happened, we can unlock the database from const change = docs[0]; if (!change) return; if (isChunk(change._id)) { - // SendSignal? - // this.parseIncomingChunk(change); - sendValue(`leaf-${change._id}`, change); + globalSlipBoard.submit("read-chunk", change._id, change as EntryLeaf); return; } if (await this.core.$anyModuleParsedReplicationResultItem(change)) return; diff --git a/src/modules/essential/ModuleInitializerFile.ts b/src/modules/essential/ModuleInitializerFile.ts index 9f15bbd..5180b62 100644 --- a/src/modules/essential/ModuleInitializerFile.ts +++ b/src/modules/essential/ModuleInitializerFile.ts @@ -1,5 +1,4 @@ import { unique } from "octagonal-wheels/collection"; -import { QueueProcessor } from "octagonal-wheels/concurrency/processor"; import { throttle } from "octagonal-wheels/function"; import { eventHub } from "../../common/events.ts"; import { BASE_IS_NEW, compareFileFreshness, EVEN, getPath, isValidPath, TARGET_IS_NEW } from "../../common/utils.ts"; @@ -19,7 +18,7 @@ import { isAnyNote } from "../../lib/src/common/utils.ts"; import { stripAllPrefixes } from "../../lib/src/string_and_binary/path.ts"; import { AbstractModule } from "../AbstractModule.ts"; import type { ICoreModule } from "../ModuleTypes.ts"; - +import { withConcurrency } from "octagonal-wheels/iterable/map"; export class ModuleInitializerFile extends AbstractModule implements ICoreModule { async $$performFullScan(showingNotice?: boolean): Promise { this._log("Opening the key-value database", LOG_LEVEL_VERBOSE); @@ -152,35 +151,30 @@ export class ModuleInitializerFile extends AbstractModule implements ICoreModule if (!this.localDatabase.isReady) throw Error("Database is not ready!"); let success = 0; let failed = 0; - const step = 10; - const processor = new QueueProcessor( + let total = 0; + for await (const result of withConcurrency( + objects, async (e) => { try { - await callback(e[0]); - success++; - // return + await callback(e); + return true; } catch (ex) { this._log(`Error while ${procedureName}`, LOG_LEVEL_NOTICE); this._log(ex, LOG_LEVEL_VERBOSE); - failed++; + return false; } - if ((success + failed) % step == 0) { - const msg = `${procedureName}: DONE:${success}, FAILED:${failed}, LAST:${processor._queue.length}`; - updateLog(procedureName, msg); - } - return; }, - { - batchSize: 1, - concurrentLimit: 10, - delay: 0, - suspended: true, - maintainDelay: false, - interval: 0, - }, - objects - ); - await processor.waitForAllDoneAndTerminate(); + 10 + )) { + if (result) { + success++; + } else { + failed++; + } + total++; + const msg = `${procedureName}: DONE:${success}, FAILED:${failed}, LAST:${objects.length - total}`; + updateLog(procedureName, msg); + } const msg = `${procedureName} All done: DONE:${success}, FAILED:${failed}`; updateLog(procedureName, msg); };