From 2b828a5eebe39b8ef021da36301dd3a849acd83e Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Tue, 17 Jun 2025 08:24:05 +0100 Subject: [PATCH] (OL2) refactor read queue (#4948) * Rename to saveBatch * forgot a file * first pass of cacheReader * OL2 tests have infinite timeout * OL2 refactor works * fix for tests * get rid of pumps and fix test * lint fix * redo mermaid diagrams * add readme section on deferment --- packages/objectloader2/readme.md | 67 ++++++++--- .../__snapshots__/cachePump.spec.ts.snap | 31 ----- .../objectloader2/src/helpers/batchedPool.ts | 4 +- .../src/helpers/batchingQueue.ts | 4 + .../src/helpers/cachePump.disposal.spec.ts | 51 --------- .../src/helpers/cachePump.spec.ts | 104 ----------------- .../objectloader2/src/helpers/cachePump.ts | 107 ------------------ .../src/helpers/cacheReader.spec.ts | 2 +- .../objectloader2/src/helpers/cacheReader.ts | 34 ++++-- .../objectloader2/src/helpers/cacheWriter.ts | 48 ++++++++ .../src/helpers/defermentManager.spec.ts | 4 +- .../src/helpers/defermentManager.ts | 24 ++-- .../objectloader2/src/helpers/deferredBase.ts | 25 ++-- .../objectloader2/src/helpers/keyedQueue.ts | 12 ++ .../objectloader2/src/helpers/memoryPump.ts | 40 ------- packages/objectloader2/src/helpers/pump.ts | 8 -- .../indexedDatabase.spec.ts.snap | 8 +- .../databases/indexedDatabase.spec.ts | 10 +- .../operations/databases/indexedDatabase.ts | 2 +- .../databases/memoryDatabase.spec.ts | 8 +- .../operations/databases/memoryDatabase.ts | 5 +- .../downloaders/serverDownloader.spec.ts | 47 ++++++-- .../src/operations/interfaces.ts | 2 +- .../src/operations/objectLoader2.spec.ts | 12 +- .../src/operations/objectLoader2.ts | 52 +++++---- packages/objectloader2/src/types/types.ts | 2 +- 26 files changed, 268 insertions(+), 445 deletions(-) delete mode 100644 packages/objectloader2/src/helpers/__snapshots__/cachePump.spec.ts.snap delete mode 100644 packages/objectloader2/src/helpers/cachePump.disposal.spec.ts delete mode 100644 packages/objectloader2/src/helpers/cachePump.spec.ts delete mode 100644 packages/objectloader2/src/helpers/cachePump.ts create mode 100644 packages/objectloader2/src/helpers/cacheWriter.ts delete mode 100644 packages/objectloader2/src/helpers/memoryPump.ts delete mode 100644 packages/objectloader2/src/helpers/pump.ts diff --git a/packages/objectloader2/readme.md b/packages/objectloader2/readme.md index 7b1a3a34b..35b2de015 100644 --- a/packages/objectloader2/readme.md +++ b/packages/objectloader2/readme.md @@ -11,26 +11,51 @@ The main aim for the objectloader is: ## Architecture -To achieve increased concurrency, the different phases of the objectloader are divided into pools of workers with queues to feed them. +To achieve increased concurrency, the different phases of the objectloader are divided into pools of workers with queues to feed them. Below is a sequence diagram of the worker stages + +```mermaid +sequenceDiagram + ObjectLoader2->>CacheReader: Root+Children + CacheReader-->>Database: Item exists? + CacheReader->>ObjectLoader2: Item exists in Cache + CacheReader->>Downloader: Item does not exist in Cache + Downloader->>CacheWriter: Save Item to Cache + CacheWriter->>Database: Write Item + Downloader->>ObjectLoader2: Item exists in Cache +``` + +The queues between stages are illustrated below with the concurrency ```mermaid flowchart TD - start(Root Commit) - getIds(Parse Root to get all IDs) - cached{Cached?} - download(Download IDs) - save(Write to Cache) - load(Load from Cache) - generate(Generate to Viewer!) + start(ObjectLoader2) + cachedQueue(BatchingQueue) + cachedExists{Exists?} + downloadQueue(BatchingQueue) + download{Download Batch} + saveQueue(BatchingQueue) + save{Save to Database} + asyncGeneratorQueue(Aggregated Async Generator Queue) + loop(Generate to Viewer!) - start --> getIds - getIds --> cached - cached -->|Yes| load - cached -->|No| download - load --> generate - download --> generate - download --> save + start -- Add IDs --> cachedQueue + subgraph CacheReader + cachedQueue -- Checks by Batch --> cachedExists + end + cachedExists -->|Yes| asyncGeneratorQueue + subgraph Downloader + cachedExists -->|No| downloadQueue + downloadQueue --> download + end + subgraph CacheWriter + download -- add to queue --> saveQueue + saveQueue --> save + end + subgraph Viewer + download -- add to queue --> asyncGeneratorQueue + asyncGeneratorQueue -- Generator Loop --> loop + end ``` From the list of IDs, they are moved to a queue to be begin checking the cache from a pool of readers. @@ -40,3 +65,15 @@ Results are then sent to the viewer, if found, else they're send to the download The download queue is a batching mechanism that gets what is available, up to a limit or a timeout. The results are parsed and given to the generator and written to another queue. The write cache queue is processed with a single writer to the indexeddb. + +## Deferment + +Deferment is what happens with the viewer does a random access to OL2. It returns a promise but it will be fulfilled later if the item isn't in memory. + +The `DefermentManager` only holds a subset of the model in memory. If the requested item isn't in memory, then it enqueues the request into the general process laid out above. + +When items are returned to the generator loop, `undefer` is called which caches the item in the manager as well as fulfills any outstanding promises. + +A cleanup process is ran to be a singleton process. This process sorts by the total number of requests and the size. If anything falls outside the size window, then it is removed from the manager's memory cache. + +The aim is to speed up random access while still getting items from the cache in batches. Items that are accessed randomly tend to be references in the model. diff --git a/packages/objectloader2/src/helpers/__snapshots__/cachePump.spec.ts.snap b/packages/objectloader2/src/helpers/__snapshots__/cachePump.spec.ts.snap deleted file mode 100644 index 500ad13fa..000000000 --- a/packages/objectloader2/src/helpers/__snapshots__/cachePump.spec.ts.snap +++ /dev/null @@ -1,31 +0,0 @@ -// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html - -exports[`CachePump testing > write two items to queue use pumpItems that are NOT found 1`] = `[]`; - -exports[`CachePump testing > write two items to queue use pumpItems that are NOT found 2`] = ` -[ - "id1", - "id2", -] -`; - -exports[`CachePump testing > write two items to queue use pumpItems that are found 1`] = ` -[ - { - "base": { - "id": "id", - "speckle_type": "type", - }, - "baseId": "id1", - }, - { - "base": { - "id": "id", - "speckle_type": "type", - }, - "baseId": "id2", - }, -] -`; - -exports[`CachePump testing > write two items to queue use pumpItems that are found 2`] = `[]`; diff --git a/packages/objectloader2/src/helpers/batchedPool.ts b/packages/objectloader2/src/helpers/batchedPool.ts index bb9883f0e..f0a08feaa 100644 --- a/packages/objectloader2/src/helpers/batchedPool.ts +++ b/packages/objectloader2/src/helpers/batchedPool.ts @@ -1,4 +1,6 @@ -export default class BatchedPool { +import Queue from './queue.js' + +export default class BatchedPool implements Queue { #queue: T[] = [] #concurrencyAndSizes: number[] #processFunction: (batch: T[]) => Promise diff --git a/packages/objectloader2/src/helpers/batchingQueue.ts b/packages/objectloader2/src/helpers/batchingQueue.ts index 7951d026c..dc9fa0f90 100644 --- a/packages/objectloader2/src/helpers/batchingQueue.ts +++ b/packages/objectloader2/src/helpers/batchingQueue.ts @@ -34,6 +34,10 @@ export default class BatchingQueue { this.#queue.enqueue(key, item) } + addAll(keys: string[], items: T[]): void { + this.#queue.enqueueAll(keys, items) + } + get(id: string): T | undefined { return this.#queue.get(id) } diff --git a/packages/objectloader2/src/helpers/cachePump.disposal.spec.ts b/packages/objectloader2/src/helpers/cachePump.disposal.spec.ts deleted file mode 100644 index 1dbcb938c..000000000 --- a/packages/objectloader2/src/helpers/cachePump.disposal.spec.ts +++ /dev/null @@ -1,51 +0,0 @@ -import { describe, expect, test } from 'vitest' -import { CachePump } from './cachePump.js' -import { Database } from '../operations/interfaces.js' -import AsyncGeneratorQueue from './asyncGeneratorQueue.js' -import { Item } from '../types/types.js' -import { DefermentManager } from './defermentManager.js' - -const makeDatabase = (): Database => - ({ - cacheSaveBatch: async (): Promise => {}, - getAll: async (): Promise<(Item | undefined)[]> => Promise.resolve([]), - getItem: async (): Promise => Promise.resolve(undefined), - disposeAsync: async (): Promise => {} - } as unknown as Database) -const makeGathered = (): AsyncGeneratorQueue => - ({ - add: () => {}, - async *consume() {} - } as unknown as AsyncGeneratorQueue) -const makeDeferments = (): DefermentManager => - ({ - undefer: () => {} - } as unknown as DefermentManager) -describe('CachePump disposal', () => { - test('disposeAsync is idempotent and always resolves', async () => { - const pump = new CachePump(makeDatabase(), makeGathered(), makeDeferments(), { - maxCacheWriteSize: 2, - maxCacheBatchWriteWait: 100, - maxCacheBatchReadWait: 1, - maxWriteQueueSize: 2, - maxCacheReadSize: 2 - }) - await pump.disposeAsync() - await expect(pump.disposeAsync()).resolves.toBeUndefined() - }) - - test('should not throw on add after dispose if writeQueue was never created', async () => { - const pump = new CachePump(makeDatabase(), makeGathered(), makeDeferments(), { - maxCacheWriteSize: 2, - maxCacheBatchWriteWait: 100, - maxCacheBatchReadWait: 1, - maxWriteQueueSize: 2, - maxCacheReadSize: 2 - }) - await pump.disposeAsync() - // Should not throw, but will not add anything - expect(() => - pump.add({ baseId: 'a', base: { id: 'b', speckle_type: 'type' } }) - ).not.toThrow() - }) -}) diff --git a/packages/objectloader2/src/helpers/cachePump.spec.ts b/packages/objectloader2/src/helpers/cachePump.spec.ts deleted file mode 100644 index f374025ab..000000000 --- a/packages/objectloader2/src/helpers/cachePump.spec.ts +++ /dev/null @@ -1,104 +0,0 @@ -import { describe, expect, test } from 'vitest' -import { CachePump } from './cachePump.js' -import { Base, Item } from '../types/types.js' -import BufferQueue from './bufferQueue.js' -import AsyncGeneratorQueue from './asyncGeneratorQueue.js' -import { DefermentManager } from './defermentManager.js' -import { MemoryDatabase } from '../operations/databases/memoryDatabase.js' -import { Database } from '../operations/interfaces.js' - -describe('CachePump testing', () => { - test('write two items to queue use pumpItems that are NOT found', async () => { - const i1: Item = { baseId: 'id1', base: { id: 'id', speckle_type: 'type' } } - const i2: Item = { baseId: 'id2', base: { id: 'id', speckle_type: 'type' } } - - const gathered = new AsyncGeneratorQueue() - const deferments = new DefermentManager({ maxSizeInMb: 1, ttlms: 1 }) - const cachePump = new CachePump(new MemoryDatabase({}), gathered, deferments, { - maxCacheReadSize: 1, - maxCacheWriteSize: 1, - maxCacheBatchWriteWait: 1, - maxCacheBatchReadWait: 1, - maxWriteQueueSize: 1 - }) - - const foundItems = new BufferQueue() - const notFoundItems = new BufferQueue() - - await cachePump.pumpItems({ - ids: [i1.baseId, i2.baseId], - foundItems, - notFoundItems - }) - - expect(foundItems.values()).toMatchSnapshot() - expect(notFoundItems.values()).toMatchSnapshot() - await cachePump.disposeAsync() - }) - - test('write two items to queue use pumpItems that are found', async () => { - const i1: Item = { baseId: 'id1', base: { id: 'id', speckle_type: 'type' } } - const i2: Item = { baseId: 'id2', base: { id: 'id', speckle_type: 'type' } } - - const db = new Map() - db.set(i1.baseId, i1.base) - db.set(i2.baseId, i2.base) - - const gathered = new AsyncGeneratorQueue() - const deferments = new DefermentManager({ maxSizeInMb: 1, ttlms: 1 }) - const cachePump = new CachePump( - new MemoryDatabase({ items: db }), - gathered, - deferments, - { - maxCacheReadSize: 1, - maxCacheWriteSize: 1, - maxCacheBatchWriteWait: 1, - maxCacheBatchReadWait: 1, - maxWriteQueueSize: 1 - } - ) - - const foundItems = new BufferQueue() - const notFoundItems = new BufferQueue() - - await cachePump.pumpItems({ - ids: [i1.baseId, i2.baseId], - foundItems, - notFoundItems - }) - - expect(foundItems.values()).toMatchSnapshot() - expect(notFoundItems.values()).toMatchSnapshot() - await cachePump.disposeAsync() - }) - - test('can dispose while waiting and not wait', async () => { - const i1: Item = { baseId: 'id1', base: { id: 'id', speckle_type: 'type' } } - const i2: Item = { baseId: 'id2', base: { id: 'id', speckle_type: 'type' } } - - const db: Database = { - getAll: async () => Promise.resolve([]), - disposeAsync: async (): Promise => {} - } as unknown as Database - const gathered = new AsyncGeneratorQueue() - const deferments = new DefermentManager({ maxSizeInMb: 1, ttlms: 1 }) - const cachePump = new CachePump(db, gathered, deferments, { - maxCacheReadSize: 1, - maxCacheWriteSize: 1, - maxCacheBatchWriteWait: 1, - maxCacheBatchReadWait: 1, - maxWriteQueueSize: 1 - }) - - const foundItems = new BufferQueue() - const notFoundItems = new BufferQueue() - - await cachePump.disposeAsync() - await cachePump.pumpItems({ - ids: [i1.baseId, i2.baseId], - foundItems, - notFoundItems - }) - }) -}) diff --git a/packages/objectloader2/src/helpers/cachePump.ts b/packages/objectloader2/src/helpers/cachePump.ts deleted file mode 100644 index 5396a45f7..000000000 --- a/packages/objectloader2/src/helpers/cachePump.ts +++ /dev/null @@ -1,107 +0,0 @@ -import { TIME } from '@speckle/shared' -import { Database } from '../operations/interfaces.js' -import { CacheOptions } from '../operations/options.js' -import { CustomLogger, Item } from '../types/types.js' -import BatchingQueue from './batchingQueue.js' -import Queue from './queue.js' -import { Downloader } from '../operations/interfaces.js' -import { DefermentManager } from './defermentManager.js' -import AsyncGeneratorQueue from './asyncGeneratorQueue.js' -import { Pump } from './pump.js' - -export class CachePump implements Pump { - #writeQueue: BatchingQueue | undefined - #database: Database - #logger: CustomLogger - #deferments: DefermentManager - - #gathered: AsyncGeneratorQueue - - #options: CacheOptions - - #disposed = false - - constructor( - database: Database, - gathered: AsyncGeneratorQueue, - deferments: DefermentManager, - options: CacheOptions - ) { - this.#database = database - this.#gathered = gathered - this.#deferments = deferments - this.#options = options - this.#logger = options.logger || ((): void => {}) - } - - add(item: Item): void { - if (!this.#writeQueue) { - this.#writeQueue = new BatchingQueue({ - batchSize: this.#options.maxCacheWriteSize, - maxWaitTime: this.#options.maxCacheBatchWriteWait, - processFunction: (batch: Item[]): Promise => - this.#database.cacheSaveBatch({ batch }) - }) - } - this.#writeQueue.add(item.baseId, item) - } - - async disposeAsync(): Promise { - await this.#writeQueue?.disposeAsync() - await this.#database.disposeAsync() - this.#disposed = true - } - - get isDisposed(): boolean { - return this.#disposed - } - - async pumpItems(params: { - ids: string[] - foundItems: Queue - notFoundItems: Queue - }): Promise { - const { ids, foundItems, notFoundItems } = params - const maxCacheReadSize = this.#options.maxCacheReadSize - - for (let i = 0; i < ids.length; ) { - if (this.isDisposed) break - if ((this.#writeQueue?.count() ?? 0) > this.#options.maxWriteQueueSize) { - this.#logger( - 'pausing reads (# in write queue: ' + this.#writeQueue?.count() + ')' - ) - await new Promise((resolve) => setTimeout(resolve, TIME.second)) // Pause for 1 second, protects against out of memory - continue - } - const batch = ids.slice(i, i + maxCacheReadSize) - const cachedData = await this.#database.getAll(batch) - for (let i = 0; i < cachedData.length; i++) { - if (cachedData[i]) { - foundItems.add(cachedData[i]!) - } else { - notFoundItems.add(batch[i]) - } - } - i += maxCacheReadSize - } - } - - async *gather(ids: string[], downloader: Downloader): AsyncGenerator { - const total = ids.length - const pumpPromise = this.pumpItems({ - ids, - foundItems: this.#gathered, - notFoundItems: downloader - }) - let count = 0 - for await (const item of this.#gathered.consume()) { - this.#deferments.undefer(item) - yield item - count++ - if (count >= total) { - this.#gathered.dispose() - } - } - await pumpPromise - } -} diff --git a/packages/objectloader2/src/helpers/cacheReader.spec.ts b/packages/objectloader2/src/helpers/cacheReader.spec.ts index 6067d21a8..5604d9f84 100644 --- a/packages/objectloader2/src/helpers/cacheReader.spec.ts +++ b/packages/objectloader2/src/helpers/cacheReader.spec.ts @@ -11,7 +11,7 @@ describe('CacheReader testing', () => { const deferments = new DefermentManager({ maxSizeInMb: 1, ttlms: 1 }) const cacheReader = new CacheReader( new MemoryDatabase({ - items: new Map([[i1.baseId, i1.base]]) + items: new Map([[i1.baseId, i1.base!]]) }), deferments, { diff --git a/packages/objectloader2/src/helpers/cacheReader.ts b/packages/objectloader2/src/helpers/cacheReader.ts index 55c980354..ae853723c 100644 --- a/packages/objectloader2/src/helpers/cacheReader.ts +++ b/packages/objectloader2/src/helpers/cacheReader.ts @@ -3,6 +3,7 @@ import { CacheOptions } from '../operations/options.js' import { Base, CustomLogger, Item } from '../types/types.js' import BatchingQueue from './batchingQueue.js' import { DefermentManager } from './defermentManager.js' +import Queue from './queue.js' export class CacheReader { #database: Database @@ -10,6 +11,8 @@ export class CacheReader { #logger: CustomLogger #options: CacheOptions #readQueue: BatchingQueue | undefined + #foundQueue: Queue | undefined + #notFoundQueue: Queue | undefined constructor( database: Database, @@ -22,14 +25,19 @@ export class CacheReader { this.#logger = options.logger || ((): void => {}) } + initializeQueue(foundQueue: Queue, notFoundQueue: Queue): void { + this.#foundQueue = foundQueue + this.#notFoundQueue = notFoundQueue + } + async getObject(params: { id: string }): Promise { if (!this.#defermentManager.isDeferred(params.id)) { - this.#getItem(params.id) + this.#requestItem(params.id) } return await this.#defermentManager.defer({ id: params.id }) } - #getItem(id: string): void { + #createReadQueue(): void { if (!this.#readQueue) { this.#readQueue = new BatchingQueue({ batchSize: this.#options.maxCacheReadSize, @@ -37,23 +45,29 @@ export class CacheReader { processFunction: this.#processBatch }) } - if (!this.#readQueue.get(id)) { - this.#readQueue.add(id, id) + } + + #requestItem(id: string): void { + this.#createReadQueue() + if (!this.#readQueue?.get(id)) { + this.#readQueue?.add(id, id) } } - async getAll(keys: string[]): Promise<(Item | undefined)[]> { - return this.#database.getAll(keys) + requestAll(keys: string[]): void { + this.#createReadQueue() + this.#readQueue?.addAll(keys, keys) } #processBatch = async (batch: string[]): Promise => { const items = await this.#database.getAll(batch) for (let i = 0; i < items.length; i++) { - if (items[i]) { - this.#defermentManager.undefer(items[i]!) + const item = items[i] + if (item) { + this.#foundQueue?.add(item) + this.#defermentManager.undefer(item) } else { - //this is okay! - //this.#logger(`Item ${batch[i]} not found in cache`) + this.#notFoundQueue?.add(batch[i]) } } } diff --git a/packages/objectloader2/src/helpers/cacheWriter.ts b/packages/objectloader2/src/helpers/cacheWriter.ts new file mode 100644 index 000000000..d2e3c9c78 --- /dev/null +++ b/packages/objectloader2/src/helpers/cacheWriter.ts @@ -0,0 +1,48 @@ +import { Database } from '../operations/interfaces.js' +import { CacheOptions } from '../operations/options.js' +import { CustomLogger, Item } from '../types/types.js' +import BatchingQueue from './batchingQueue.js' +import { DefermentManager } from './defermentManager.js' +import Queue from './queue.js' + +export class CacheWriter implements Queue { + #writeQueue: BatchingQueue | undefined + #database: Database + #defermentManager: DefermentManager + #logger: CustomLogger + #options: CacheOptions + #disposed = false + + constructor( + database: Database, + defermentManager: DefermentManager, + options: CacheOptions + ) { + this.#database = database + this.#defermentManager = defermentManager + this.#options = options + this.#logger = options.logger || ((): void => {}) + } + + add(item: Item): void { + if (!this.#writeQueue) { + this.#writeQueue = new BatchingQueue({ + batchSize: this.#options.maxCacheWriteSize, + maxWaitTime: this.#options.maxCacheBatchWriteWait, + processFunction: (batch: Item[]): Promise => + this.#database.saveBatch({ batch }) + }) + } + this.#defermentManager.undefer(item) + this.#writeQueue.add(item.baseId, item) + } + + async disposeAsync(): Promise { + await this.#writeQueue?.disposeAsync() + this.#disposed = true + } + + get isDisposed(): boolean { + return this.#disposed + } +} diff --git a/packages/objectloader2/src/helpers/defermentManager.spec.ts b/packages/objectloader2/src/helpers/defermentManager.spec.ts index 99da71de2..930e4b0fd 100644 --- a/packages/objectloader2/src/helpers/defermentManager.spec.ts +++ b/packages/objectloader2/src/helpers/defermentManager.spec.ts @@ -24,13 +24,13 @@ describe('deferments', () => { expect(d?.getId()).toBe('id') expect((d as any).expiresAt).toBe(2) expect((d as any).ttl).toBe(1) - expect((d as any).item).toBeUndefined() + expect((d as any).base).toBeUndefined() expect(d?.isExpired(1)).toBe(false) deferments.undefer({ baseId: 'id', base: { id: 'id', speckle_type: 'type' } }) await x expect((d as any).expiresAt).toBe(2) expect((d as any).ttl).toBe(1) - expect((d as any).item).toBeDefined() + expect((d as any).base).toBeDefined() expect(d?.isExpired(1)).toBe(false) expect(d?.isExpired(3)).toBe(true) }) diff --git a/packages/objectloader2/src/helpers/defermentManager.ts b/packages/objectloader2/src/helpers/defermentManager.ts index 4d96282ff..3434304f1 100644 --- a/packages/objectloader2/src/helpers/defermentManager.ts +++ b/packages/objectloader2/src/helpers/defermentManager.ts @@ -59,16 +59,21 @@ export class DefermentManager { undefer(item: Item): void { if (this.disposed) throw new Error('DefermentManager is disposed') + const base = item.base + if (!base) { + this.logger('undefer called with no base', item) + return + } const now = this.now() this.currentSize += item.size || 0 //order matters here with found before undefer const deferredBase = this.deferments.get(item.baseId) if (deferredBase) { - deferredBase.found(item) + deferredBase.found(base) deferredBase.setAccess(now) } else { const existing = new DeferredBase(this.options.ttlms, item.baseId, now) - existing.found(item) + existing.found(base) this.deferments.set(item.baseId, existing) } } @@ -95,7 +100,7 @@ export class DefermentManager { let waiting = 0 for (const deferredBase of this.deferments.values()) { deferredBase.done(0) - if (deferredBase.getItem() === undefined) { + if (deferredBase.getBase() === undefined) { waiting++ } } @@ -119,7 +124,7 @@ export class DefermentManager { const start = performance.now() for (const deferredBase of Array.from(this.deferments.values()) .filter((x) => x.isExpired(now)) - .sort((a, b) => this.compareMaybeBasesBySize(a.getItem(), b.getItem()))) { + .sort((a, b) => this.compareMaybeBasesBySize(a.getSize(), b.getSize()))) { if (deferredBase.done(now)) { //if the deferment is done but has been requested multiple times, //we do not clean it up to allow the requests to resolve @@ -127,7 +132,7 @@ export class DefermentManager { if (requestCount && requestCount > 1) { return } - this.currentSize -= deferredBase.getItem()?.size || 0 + this.currentSize -= deferredBase.getSize() || 0 this.deferments.delete(deferredBase.getId()) cleaned++ if (this.currentSize < maxSizeBytes) { @@ -144,14 +149,7 @@ export class DefermentManager { return } - compareMaybeBasesBySize(a: Item | undefined, b: Item | undefined): number { - if (a === undefined && b === undefined) return 0 - if (a === undefined) return -1 - if (b === undefined) return 1 - return this.compareMaybe(a.size, b.size) - } - - compareMaybe(a: number | undefined, b: number | undefined): number { + compareMaybeBasesBySize(a: number | undefined, b: number | undefined): number { if (a === undefined && b === undefined) return 0 if (a === undefined) return -1 if (b === undefined) return 1 diff --git a/packages/objectloader2/src/helpers/deferredBase.ts b/packages/objectloader2/src/helpers/deferredBase.ts index c97b3ad43..41c77514f 100644 --- a/packages/objectloader2/src/helpers/deferredBase.ts +++ b/packages/objectloader2/src/helpers/deferredBase.ts @@ -1,10 +1,11 @@ -import { Base, Item } from '../types/types.js' +import { Base } from '../types/types.js' export class DeferredBase { private promise: Promise private resolve!: (value: Base) => void private reject!: (reason?: Error) => void - private item?: Item + private base?: Base + private size?: number private readonly id: string private expiresAt: number // Timestamp in ms @@ -24,8 +25,11 @@ export class DeferredBase { return this.id } - getItem(): Item | undefined { - return this.item + getBase(): Base | undefined { + return this.base + } + getSize(): number | undefined { + return this.size } getPromise(): Promise { @@ -33,19 +37,20 @@ export class DeferredBase { } isExpired(now: number): boolean { - return this.item !== undefined && now > this.expiresAt + return this.base !== undefined && now > this.expiresAt } setAccess(now: number): void { this.expiresAt = now + this.ttl } - found(value: Item): void { - this.item = value - this.resolve(value.base) + found(value: Base, size?: number): void { + this.base = value + this.size = size + this.resolve(value) } done(now: number): boolean { - if (this.item) { - this.resolve(this.item.base) + if (this.base) { + this.resolve(this.base) } if (this.isExpired(now)) { return true diff --git a/packages/objectloader2/src/helpers/keyedQueue.ts b/packages/objectloader2/src/helpers/keyedQueue.ts index 949caaa7c..7efffae2f 100644 --- a/packages/objectloader2/src/helpers/keyedQueue.ts +++ b/packages/objectloader2/src/helpers/keyedQueue.ts @@ -16,6 +16,18 @@ export default class KeyedQueue { return true } + enqueueAll(keys: K[], values: V[]): number { + let count = 0 + for (let i = 0; i < keys.length; i++) { + if (!this._map.has(keys[i])) { + this._map.set(keys[i], values[i]) + this._order.push(keys[i]) + count++ + } + } + return count + } + get(key: K): V | undefined { return this._map.get(key) } diff --git a/packages/objectloader2/src/helpers/memoryPump.ts b/packages/objectloader2/src/helpers/memoryPump.ts deleted file mode 100644 index eee22560a..000000000 --- a/packages/objectloader2/src/helpers/memoryPump.ts +++ /dev/null @@ -1,40 +0,0 @@ -import { Item } from '../types/types.js' -import { Pump } from './pump.js' -import Queue from './queue.js' - -export class MemoryPump implements Pump { - #items: Map = new Map() - - add(item: Item): void { - this.#items.set(item.baseId, item) - } - - async pumpItems(params: { - ids: string[] - foundItems: Queue - notFoundItems: Queue - }): Promise { - const { ids, foundItems, notFoundItems } = params - for (const id of ids) { - const item = this.#items.get(id) - if (item) { - foundItems.add(item) - } else { - notFoundItems.add(id) - } - } - return Promise.resolve() - } - - async *gather(ids: string[]): AsyncGenerator { - for (const id of ids) { - const item = this.#items.get(id) - if (item) { - yield item - } - } - return Promise.resolve() - } - - async disposeAsync(): Promise {} -} diff --git a/packages/objectloader2/src/helpers/pump.ts b/packages/objectloader2/src/helpers/pump.ts deleted file mode 100644 index 431b71de1..000000000 --- a/packages/objectloader2/src/helpers/pump.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { Downloader } from '../operations/interfaces.js' -import { Item } from '../types/types.js' -import Queue from './queue.js' - -export interface Pump extends Queue { - gather(ids: string[], downloader: Downloader): AsyncGenerator - disposeAsync(): Promise -} diff --git a/packages/objectloader2/src/operations/databases/__snapshots__/indexedDatabase.spec.ts.snap b/packages/objectloader2/src/operations/databases/__snapshots__/indexedDatabase.spec.ts.snap index 69d532d0c..798c13720 100644 --- a/packages/objectloader2/src/operations/databases/__snapshots__/indexedDatabase.spec.ts.snap +++ b/packages/objectloader2/src/operations/databases/__snapshots__/indexedDatabase.spec.ts.snap @@ -3,16 +3,16 @@ exports[`IndexedDatabase > should add and get multiple items 1`] = ` [ { - "baseId": "id1", - "item": { + "base": { "foo": "bar", }, + "baseId": "id1", }, { - "baseId": "id2", - "item": { + "base": { "foo": "bar", }, + "baseId": "id2", }, ] `; diff --git a/packages/objectloader2/src/operations/databases/indexedDatabase.spec.ts b/packages/objectloader2/src/operations/databases/indexedDatabase.spec.ts index 80eed79e8..11fdfe751 100644 --- a/packages/objectloader2/src/operations/databases/indexedDatabase.spec.ts +++ b/packages/objectloader2/src/operations/databases/indexedDatabase.spec.ts @@ -1,10 +1,14 @@ import { describe, it, expect, beforeEach, afterEach } from 'vitest' import { IDBFactory, IDBKeyRange } from 'fake-indexeddb' import IndexedDatabase, { IndexedDatabaseOptions } from './indexedDatabase.js' -import { Item } from '../../types/types.js' + +import { Item, Base } from '../../types/types.js' // Mock Item -const defaultItem = (id: string): Item => ({ baseId: id, item: { foo: 'bar' } }) +const defaultItem = (id: string): Item => ({ + baseId: id, + base: { foo: 'bar' } as unknown as Base +}) describe('IndexedDatabase', () => { let db: IndexedDatabase @@ -21,7 +25,7 @@ describe('IndexedDatabase', () => { it('should add and get multiple items', async () => { const items = [defaultItem('id1'), defaultItem('id2')] - await db.cacheSaveBatch({ batch: items }) + await db.saveBatch({ batch: items }) const result = await db.getAll(['id1', 'id2']) expect(result).toMatchSnapshot() expect(result).toEqual(items) diff --git a/packages/objectloader2/src/operations/databases/indexedDatabase.ts b/packages/objectloader2/src/operations/databases/indexedDatabase.ts index 8d545659b..4d110b89b 100644 --- a/packages/objectloader2/src/operations/databases/indexedDatabase.ts +++ b/packages/objectloader2/src/operations/databases/indexedDatabase.ts @@ -83,7 +83,7 @@ export default class IndexedDatabase implements Database { this.#cacheDB = await this.#openDatabase() } - async cacheSaveBatch(params: { batch: Item[] }): Promise { + async saveBatch(params: { batch: Item[] }): Promise { await this.#setupCacheDb() const { batch } = params //const x = this.#count diff --git a/packages/objectloader2/src/operations/databases/memoryDatabase.spec.ts b/packages/objectloader2/src/operations/databases/memoryDatabase.spec.ts index cf224848c..9bf03f631 100644 --- a/packages/objectloader2/src/operations/databases/memoryDatabase.spec.ts +++ b/packages/objectloader2/src/operations/databases/memoryDatabase.spec.ts @@ -21,14 +21,14 @@ describe('MemoryDatabase', () => { it('should add and retrieve a single item', async () => { const item = makeItem('id1') - await db.cacheSaveBatch({ batch: [item] }) + await db.saveBatch({ batch: [item] }) const result = await db.getAll(['id1']) expect(result).toEqual([item]) }) it('should add and retrieve multiple items', async () => { const items = [makeItem('id1'), makeItem('id2', 'baz')] - await db.cacheSaveBatch({ batch: items }) + await db.saveBatch({ batch: items }) const result = await db.getAll(['id1', 'id2']) expect(result).toEqual(items) }) @@ -36,8 +36,8 @@ describe('MemoryDatabase', () => { it('should overwrite items with the same key', async () => { const item1 = makeItem('id1', 'foo') const item2 = makeItem('id1', 'bar') - await db.cacheSaveBatch({ batch: [item1] }) - await db.cacheSaveBatch({ batch: [item2] }) + await db.saveBatch({ batch: [item1] }) + await db.saveBatch({ batch: [item2] }) const result = await db.getAll(['id1']) expect(result).toEqual([item2]) }) diff --git a/packages/objectloader2/src/operations/databases/memoryDatabase.ts b/packages/objectloader2/src/operations/databases/memoryDatabase.ts index e657dcac4..d97075592 100644 --- a/packages/objectloader2/src/operations/databases/memoryDatabase.ts +++ b/packages/objectloader2/src/operations/databases/memoryDatabase.ts @@ -22,8 +22,11 @@ export class MemoryDatabase implements Database { return Promise.resolve(found) } - cacheSaveBatch({ batch }: { batch: Item[] }): Promise { + saveBatch({ batch }: { batch: Item[] }): Promise { for (const item of batch) { + if (!item.baseId || !item.base) { + throw new Error('Item must have a baseId and base') + } this.items.set(item.baseId, item.base) } return Promise.resolve() diff --git a/packages/objectloader2/src/operations/downloaders/serverDownloader.spec.ts b/packages/objectloader2/src/operations/downloaders/serverDownloader.spec.ts index 2fe337128..2bffce9f1 100644 --- a/packages/objectloader2/src/operations/downloaders/serverDownloader.spec.ts +++ b/packages/objectloader2/src/operations/downloaders/serverDownloader.spec.ts @@ -3,14 +3,14 @@ import createFetchMock from 'vitest-fetch-mock' import { vi } from 'vitest' import { Item } from '../../types/types.js' import ServerDownloader from './serverDownloader.js' -import { MemoryPump } from '../../helpers/memoryPump.js' +import AsyncGeneratorQueue from '../../helpers/asyncGeneratorQueue.js' describe('downloader', () => { test('download batch of one', async () => { const fetchMocker = createFetchMock(vi) const i: Item = { baseId: 'id', base: { id: 'id', speckle_type: 'type' } } fetchMocker.mockResponseOnce('id\t' + JSON.stringify(i.base) + '\n') - const pump = new MemoryPump() + const gathered = new AsyncGeneratorQueue() const downloader = new ServerDownloader({ serverUrl: 'http://speckle.test', streamId: 'streamId', @@ -18,12 +18,21 @@ describe('downloader', () => { token: 'token', fetch: fetchMocker }) - downloader.initializePool({ results: pump, total: 1, maxDownloadBatchWait: 200 }) + downloader.initializePool({ + results: gathered, + total: 1, + maxDownloadBatchWait: 200 + }) downloader.add('id') await downloader.disposeAsync() const r = [] - for await (const x of pump.gather([i.baseId])) { + let count = 0 + for await (const x of gathered.consume()) { r.push(x) + count++ + if (count >= 1) { + break + } } expect(r).toMatchSnapshot() @@ -38,7 +47,7 @@ describe('downloader', () => { 'id1\t' + JSON.stringify(i1.base) + '\nid2\t' + JSON.stringify(i2.base) + '\n' ) - const pump = new MemoryPump() + const gathered = new AsyncGeneratorQueue() const downloader = new ServerDownloader({ serverUrl: 'http://speckle.test', streamId: 'streamId', @@ -47,13 +56,22 @@ describe('downloader', () => { fetch: fetchMocker }) - downloader.initializePool({ results: pump, total: 2, maxDownloadBatchWait: 200 }) + downloader.initializePool({ + results: gathered, + total: 2, + maxDownloadBatchWait: 200 + }) downloader.add('id1') downloader.add('id2') await downloader.disposeAsync() const r = [] - for await (const x of pump.gather([i1.baseId, i2.baseId])) { + let count = 0 + for await (const x of gathered.consume()) { r.push(x) + count++ + if (count >= 2) { + break + } } expect(r).toMatchSnapshot() @@ -75,7 +93,7 @@ describe('downloader', () => { '\n' ) - const pump = new MemoryPump() + const gathered = new AsyncGeneratorQueue() const downloader = new ServerDownloader({ serverUrl: 'http://speckle.test', streamId: 'streamId', @@ -84,14 +102,23 @@ describe('downloader', () => { fetch: fetchMocker }) - downloader.initializePool({ results: pump, total: 3, maxDownloadBatchWait: 200 }) + downloader.initializePool({ + results: gathered, + total: 3, + maxDownloadBatchWait: 200 + }) downloader.add('id1') downloader.add('id2') downloader.add('id3') await downloader.disposeAsync() const r = [] - for await (const x of pump.gather([i1.baseId, i2.baseId, i3.baseId])) { + let count = 0 + for await (const x of gathered.consume()) { r.push(x) + count++ + if (count >= 3) { + break + } } expect(r).toMatchSnapshot() diff --git a/packages/objectloader2/src/operations/interfaces.ts b/packages/objectloader2/src/operations/interfaces.ts index 768662254..1ad33291e 100644 --- a/packages/objectloader2/src/operations/interfaces.ts +++ b/packages/objectloader2/src/operations/interfaces.ts @@ -13,6 +13,6 @@ export interface Downloader extends Queue { export interface Database { getAll(keys: string[]): Promise<(Item | undefined)[]> - cacheSaveBatch(params: { batch: Item[] }): Promise + saveBatch(params: { batch: Item[] }): Promise disposeAsync(): Promise } diff --git a/packages/objectloader2/src/operations/objectLoader2.spec.ts b/packages/objectloader2/src/operations/objectLoader2.spec.ts index 6d2f2c698..138205ddb 100644 --- a/packages/objectloader2/src/operations/objectLoader2.spec.ts +++ b/packages/objectloader2/src/operations/objectLoader2.spec.ts @@ -23,6 +23,7 @@ describe('objectloader2', () => { }) }) const x = await loader.getRootObject() + await loader.disposeAsync() expect(x).toMatchSnapshot() }) @@ -42,6 +43,7 @@ describe('objectloader2', () => { }) }) const x = await loader.getRootObject() + await loader.disposeAsync() expect(x).toMatchSnapshot() }) @@ -65,6 +67,7 @@ describe('objectloader2', () => { for await (const x of loader.getObjectIterator()) { r.push(x) } + await loader.disposeAsync() expect(r).toMatchSnapshot() }) @@ -99,6 +102,7 @@ describe('objectloader2', () => { for await (const x of loader.getObjectIterator()) { r.push(x) } + await loader.disposeAsync() expect(obj).toBeDefined() expect(r).toMatchSnapshot() @@ -129,16 +133,14 @@ describe('objectloader2', () => { const loader = new ObjectLoader2({ rootId: root.baseId, downloader: new MemoryDownloader(rootId, records), - database: new IndexedDatabase({ - indexedDB: new IDBFactory(), - keyRange: IDBKeyRange - }) + database: new MemoryDatabase({ items: records }) }) const r = [] const obj = loader.getObject({ id: child1.baseId }) for await (const x of loader.getObjectIterator()) { r.push(x) } + await loader.disposeAsync() expect(obj).toBeDefined() expect(r).toMatchSnapshot() @@ -172,6 +174,7 @@ describe('objectloader2', () => { }) }) const x = await loader.getRootObject() + await loader.disposeAsync() expect(x).toMatchSnapshot() }) @@ -224,6 +227,7 @@ describe('objectloader2', () => { for await (const x of loader.getObjectIterator()) { r.push(x) } + await loader.disposeAsync() expect(r).toMatchSnapshot() }) }) diff --git a/packages/objectloader2/src/operations/objectLoader2.ts b/packages/objectloader2/src/operations/objectLoader2.ts index 657677700..de6f129d3 100644 --- a/packages/objectloader2/src/operations/objectLoader2.ts +++ b/packages/objectloader2/src/operations/objectLoader2.ts @@ -4,9 +4,9 @@ import { CustomLogger, Base, Item } from '../types/types.js' import { CacheOptions, ObjectLoader2Options } from './options.js' import { DefermentManager } from '../helpers/defermentManager.js' import { CacheReader } from '../helpers/cacheReader.js' -import { CachePump } from '../helpers/cachePump.js' import AggregateQueue from '../helpers/aggregateQueue.js' import { ObjectLoader2Factory } from './objectLoader2Factory.js' +import { CacheWriter } from '../helpers/cacheWriter.js' export class ObjectLoader2 { #rootId: string @@ -15,8 +15,8 @@ export class ObjectLoader2 { #database: Database #downloader: Downloader - #pump: CachePump - #cache: CacheReader + #cacheReader: CacheReader + #cacheWriter: CacheWriter #deferments: DefermentManager @@ -38,27 +38,25 @@ export class ObjectLoader2 { } this.#gathered = new AsyncGeneratorQueue() + this.#database = options.database this.#deferments = new DefermentManager({ maxSizeInMb: 2_000, // 2 GBs ttlms: 15_000, // 15 seconds logger: this.#logger }) - this.#cache = new CacheReader(this.#database, this.#deferments, cacheOptions) - this.#pump = new CachePump( - this.#database, - this.#gathered, - this.#deferments, - cacheOptions - ) this.#downloader = options.downloader + this.#cacheReader = new CacheReader(this.#database, this.#deferments, cacheOptions) + this.#cacheReader.initializeQueue(this.#gathered, this.#downloader) + this.#cacheWriter = new CacheWriter(this.#database, this.#deferments, cacheOptions) } async disposeAsync(): Promise { + this.#gathered.dispose() await Promise.all([ this.#downloader.disposeAsync(), - this.#cache.disposeAsync(), - this.#pump.disposeAsync() + this.#cacheReader.disposeAsync(), + this.#cacheWriter.disposeAsync() ]) this.#deferments.dispose() } @@ -74,38 +72,46 @@ export class ObjectLoader2 { } async getObject(params: { id: string }): Promise { - return await this.#cache.getObject({ id: params.id }) + return await this.#cacheReader.getObject({ id: params.id }) } async getTotalObjectCount(): Promise { const rootObj = await this.getRootObject() - const totalChildrenCount = Object.keys(rootObj?.base.__closure || {}).length + const totalChildrenCount = Object.keys(rootObj?.base?.__closure || {}).length return totalChildrenCount + 1 //count the root } async *getObjectIterator(): AsyncGenerator { const rootItem = await this.getRootObject() - if (rootItem === undefined) { + if (rootItem?.base === undefined) { this.#logger('No root object found!') return } - //only for root - this.#pump.add(rootItem) - yield rootItem.base - if (!rootItem.base.__closure) return + if (!rootItem.base.__closure) { + yield rootItem.base + return + } //sort the closures by their values descending const sortedClosures = Object.entries(rootItem.base.__closure).sort( (a, b) => b[1] - a[1] ) const children = sortedClosures.map((x) => x[0]) - const total = children.length + const total = children.length + 1 // +1 for the root object this.#downloader.initializePool({ - results: new AggregateQueue(this.#gathered, this.#pump), + results: new AggregateQueue(this.#gathered, this.#cacheWriter), total }) - for await (const item of this.#pump.gather(children, this.#downloader)) { - yield item.base + //only for root + this.#gathered.add(rootItem) + this.#cacheReader.requestAll(children) + let count = 0 + for await (const item of this.#gathered.consume()) { + yield item.base! //always defined, as we add it to the queue + count++ + if (count >= total) { + break + } } } diff --git a/packages/objectloader2/src/types/types.ts b/packages/objectloader2/src/types/types.ts index 52a92f55e..f9c712634 100644 --- a/packages/objectloader2/src/types/types.ts +++ b/packages/objectloader2/src/types/types.ts @@ -7,7 +7,7 @@ export type Fetcher = ( export interface Item { baseId: string - base: Base + base?: Base size?: number }