diff --git a/packages/objectloader2/src/core/objectLoader2.ts b/packages/objectloader2/src/core/objectLoader2.ts index 934f9c9df..cf5a447a3 100644 --- a/packages/objectloader2/src/core/objectLoader2.ts +++ b/packages/objectloader2/src/core/objectLoader2.ts @@ -34,8 +34,8 @@ export class ObjectLoader2 { maxCacheReadSize: 10_000, maxCacheWriteSize: 10_000, maxWriteQueueSize: 40_000, - maxCacheBatchWriteWait: 3_000, - maxCacheBatchReadWait: 3_000 + maxCacheBatchWriteWait: 1_000, + maxCacheBatchReadWait: 1_000 } this.#gathered = new AsyncGeneratorQueue() @@ -56,10 +56,10 @@ export class ObjectLoader2 { await Promise.all([ this.#gathered.disposeAsync(), this.#downloader.disposeAsync(), - this.#cacheReader.disposeAsync(), this.#cacheWriter.disposeAsync() ]) this.#deferments.dispose() + this.#cacheReader.dispose() } async getRootObject(): Promise { diff --git a/packages/objectloader2/src/core/stages/cacheReader.spec.ts b/packages/objectloader2/src/core/stages/cacheReader.spec.ts index 2b1fbf9db..ec22a4351 100644 --- a/packages/objectloader2/src/core/stages/cacheReader.spec.ts +++ b/packages/objectloader2/src/core/stages/cacheReader.spec.ts @@ -30,6 +30,6 @@ describe('CacheReader testing', () => { const base = await objPromise expect(base).toMatchSnapshot() - await cacheReader.disposeAsync() + cacheReader.dispose() }) }) diff --git a/packages/objectloader2/src/core/stages/cacheReader.ts b/packages/objectloader2/src/core/stages/cacheReader.ts index 7645af00b..c3a5036c9 100644 --- a/packages/objectloader2/src/core/stages/cacheReader.ts +++ b/packages/objectloader2/src/core/stages/cacheReader.ts @@ -31,11 +31,12 @@ export class CacheReader { this.#notFoundQueue = notFoundQueue } - async getObject(params: { id: string }): Promise { - if (!this.#defermentManager.isDeferred(params.id)) { + getObject(params: { id: string }): Promise { + const [p, b] = this.#defermentManager.defer({ id: params.id }) + if (!b) { this.#requestItem(params.id) } - return await this.#defermentManager.defer({ id: params.id }) + return p } #createReadQueue(): void { @@ -57,10 +58,15 @@ export class CacheReader { requestAll(keys: string[]): void { this.#createReadQueue() + for (const key of keys) { + this.#defermentManager.trackDefermentRequest(key) + } + this.#readQueue?.addAll(keys, keys) } #processBatch = async (batch: string[]): Promise => { + const start = performance.now() const items = await this.#database.getAll(batch) for (let i = 0; i < items.length; i++) { const item = items[i] @@ -71,9 +77,10 @@ export class CacheReader { this.#notFoundQueue?.add(batch[i]) } } + this.#logger('readBatch: left, time', items.length, performance.now() - start) } - async disposeAsync(): Promise { - await this.#readQueue?.disposeAsync() + dispose(): void { + this.#readQueue?.dispose() } } diff --git a/packages/objectloader2/src/core/stages/cacheWriter.ts b/packages/objectloader2/src/core/stages/cacheWriter.ts index 9387193cd..739fbc750 100644 --- a/packages/objectloader2/src/core/stages/cacheWriter.ts +++ b/packages/objectloader2/src/core/stages/cacheWriter.ts @@ -30,17 +30,25 @@ export class CacheWriter implements Queue { this.#writeQueue = new BatchingQueue({ batchSize: this.#options.maxCacheWriteSize, maxWaitTime: this.#options.maxCacheBatchWriteWait, - processFunction: (batch: Item[]): Promise => - this.#database.saveBatch({ batch }) + processFunction: async (batch: Item[]): Promise => { + await this.writeAll(batch) + } }) } this.#defermentManager.undefer(item) this.#writeQueue.add(item.baseId, item) } + async writeAll(items: Item[]): Promise { + const start = performance.now() + await this.#database.saveBatch({ batch: items }) + this.#logger('writeBatch: left, time', items.length, performance.now() - start) + } + async disposeAsync(): Promise { - await this.#writeQueue?.disposeAsync() + this.#writeQueue?.dispose() this.#disposed = true + return Promise.resolve() } get isDisposed(): boolean { diff --git a/packages/objectloader2/src/core/stages/indexedDatabase.ts b/packages/objectloader2/src/core/stages/indexedDatabase.ts index c10e60447..ad437dd55 100644 --- a/packages/objectloader2/src/core/stages/indexedDatabase.ts +++ b/packages/objectloader2/src/core/stages/indexedDatabase.ts @@ -121,7 +121,8 @@ export default class IndexedDatabase implements Database { async disposeAsync(): Promise { this.#cacheDB?.close() this.#cacheDB = undefined - await this.#writeQueue?.disposeAsync() + this.#writeQueue?.dispose() this.#writeQueue = undefined + return Promise.resolve() } } diff --git a/packages/objectloader2/src/deferment/defermentManager.disposal.spec.ts b/packages/objectloader2/src/deferment/defermentManager.disposal.spec.ts index 30d9f1e6b..1326889b0 100644 --- a/packages/objectloader2/src/deferment/defermentManager.disposal.spec.ts +++ b/packages/objectloader2/src/deferment/defermentManager.disposal.spec.ts @@ -10,14 +10,12 @@ describe('DefermentManager disposal', () => { base: { id, speckle_type: 'test' } }) - it('should throw on get/defer/undefer after dispose', async () => { + it('should throw on get/defer/undefer after dispose', () => { const manager = new DefermentManager(options) manager.dispose() expect(() => manager.get('a')).toThrow('DefermentManager is disposed') expect(() => manager.undefer(makeItem('a'))).toThrow('DefermentManager is disposed') - await expect(manager.defer({ id: 'a' })).rejects.toThrow( - 'DefermentManager is disposed' - ) + expect(() => manager.defer({ id: 'a' })).toThrow('DefermentManager is disposed') }) it('dispose is idempotent', () => { diff --git a/packages/objectloader2/src/deferment/defermentManager.spec.ts b/packages/objectloader2/src/deferment/defermentManager.spec.ts index 930e4b0fd..ba08426b3 100644 --- a/packages/objectloader2/src/deferment/defermentManager.spec.ts +++ b/packages/objectloader2/src/deferment/defermentManager.spec.ts @@ -6,7 +6,8 @@ import { DefermentManager } from './defermentManager.js' describe('deferments', () => { test('defer one', async () => { const deferments = new DefermentManager({ maxSizeInMb: 1, ttlms: 1 }) - const x = deferments.defer({ id: 'id' }) + const [x, alreadyDeferred] = deferments.defer({ id: 'id' }) + expect(alreadyDeferred).toBeFalsy() expect(x).toBeInstanceOf(Promise) deferments.undefer({ baseId: 'id', base: { id: 'id', speckle_type: 'type' } }) const b = await x @@ -17,7 +18,8 @@ describe('deferments', () => { const now = 1 const deferments = new DefermentManager({ maxSizeInMb: 1, ttlms: 1 }) deferments['now'] = (): number => now - const x = deferments.defer({ id: 'id' }) + const [x, alreadyDeferred] = deferments.defer({ id: 'id' }) + expect(alreadyDeferred).toBeFalsy() expect(x).toBeInstanceOf(Promise) const d = deferments.get('id') expect(d).toBeDefined() diff --git a/packages/objectloader2/src/deferment/defermentManager.ts b/packages/objectloader2/src/deferment/defermentManager.ts index 3a9404202..4520917e8 100644 --- a/packages/objectloader2/src/deferment/defermentManager.ts +++ b/packages/objectloader2/src/deferment/defermentManager.ts @@ -22,23 +22,19 @@ export class DefermentManager { return Date.now() } - isDeferred(id: string): boolean { - return this.deferments.has(id) - } - get(id: string): DeferredBase | undefined { if (this.disposed) throw new Error('DefermentManager is disposed') return this.deferments.get(id) } - async defer(params: { id: string }): Promise { + defer(params: { id: string }): [Promise, boolean] { if (this.disposed) throw new Error('DefermentManager is disposed') this.trackDefermentRequest(params.id) const now = this.now() const deferredBase = this.deferments.get(params.id) if (deferredBase) { deferredBase.setAccess(now) - return deferredBase.getPromise() + return [deferredBase.getPromise(), true] } const notYetFound = new DeferredBase( this.options.ttlms, @@ -46,10 +42,10 @@ export class DefermentManager { now + this.options.ttlms ) this.deferments.set(params.id, notYetFound) - return notYetFound.getPromise() + return [notYetFound.getPromise(), false] } - private trackDefermentRequest(id: string): void { + trackDefermentRequest(id: string): void { const request = this.totalDefermentRequests.get(id) if (request) { this.totalDefermentRequests.set(id, request + 1) @@ -131,7 +127,7 @@ export class DefermentManager { //we do not clean it up to allow the requests to resolve const requestCount = this.totalDefermentRequests.get(deferredBase.getId()) if (requestCount && requestCount > 1) { - return + break } this.currentSize -= deferredBase.getSize() || 0 this.deferments.delete(deferredBase.getId()) diff --git a/packages/objectloader2/src/queues/batchingQueue.test.ts b/packages/objectloader2/src/queues/batchingQueue.test.ts new file mode 100644 index 000000000..a1b39fef9 --- /dev/null +++ b/packages/objectloader2/src/queues/batchingQueue.test.ts @@ -0,0 +1,146 @@ +import { describe, test, expect, beforeEach, afterEach, vi } from 'vitest' +import BatchingQueue from './batchingQueue.js' + +describe('BatchingQueue', () => { + let queue: BatchingQueue + + beforeEach(() => { + queue = new BatchingQueue({ + batchSize: 3, + maxWaitTime: 100, + processFunction: async (): Promise => { + await new Promise((resolve) => setTimeout(resolve, 0)) + } + }) + }) + + afterEach(() => { + queue.dispose() + }) + + test('should add items and process them in batches', async () => { + const processSpy = vi.fn() + queue = new BatchingQueue({ + batchSize: 2, + maxWaitTime: 100, + processFunction: async (batch: string[]): Promise => { + await new Promise((resolve) => setTimeout(resolve, 0)) + processSpy(batch) + } + }) + + queue.add('key1', 'item1') + queue.add('key2', 'item2') + + await new Promise((resolve) => setTimeout(resolve, 200)) + + expect(processSpy).toHaveBeenCalledTimes(1) + expect(processSpy).toHaveBeenCalledWith(['item1', 'item2']) + }) + + test('should process items after timeout if batch size is not reached', async () => { + const processSpy = vi.fn() + queue = new BatchingQueue({ + batchSize: 5, + maxWaitTime: 100, + processFunction: async (batch: string[]): Promise => { + await new Promise((resolve) => setTimeout(resolve, 0)) + processSpy(batch) + } + }) + + queue.add('key1', 'item1') + queue.add('key2', 'item2') + + await new Promise((resolve) => setTimeout(resolve, 200)) + + expect(processSpy).toHaveBeenCalledTimes(1) + expect(processSpy).toHaveBeenCalledWith(['item1', 'item2']) + }) + + test('should not process items if disposed', async () => { + const processSpy = vi.fn() + queue = new BatchingQueue({ + batchSize: 2, + maxWaitTime: 10000, + processFunction: async (batch: string[]): Promise => { + await new Promise((resolve) => setTimeout(resolve, 0)) + processSpy(batch) + } + }) + + queue.add('key1', 'item1') + queue.dispose() + + await new Promise((resolve) => setTimeout(resolve, 200)) + + expect(processSpy).not.toHaveBeenCalled() + }) + + test('should handle multiple batches correctly', async () => { + const processSpy = vi.fn() + queue = new BatchingQueue({ + batchSize: 2, + maxWaitTime: 100, + processFunction: async (batch: string[]): Promise => { + await new Promise((resolve) => setTimeout(resolve, 0)) + processSpy(batch) + } + }) + + queue.add('key1', 'item1') + queue.add('key2', 'item2') + queue.add('key3', 'item3') + queue.add('key4', 'item4') + + await new Promise((resolve) => setTimeout(resolve, 200)) + + expect(processSpy).toHaveBeenCalledTimes(2) + expect(processSpy).toHaveBeenCalledWith(['item1', 'item2']) + expect(processSpy).toHaveBeenCalledWith(['item3', 'item4']) + }) + + test('should retrieve items by key', () => { + queue.add('key1', 'item1') + queue.add('key2', 'item2') + + expect(queue.get('key1')).toBe('item1') + expect(queue.get('key2')).toBe('item2') + expect(queue.get('key3')).toBeUndefined() + }) + + test('should return correct count of items', () => { + expect(queue.count()).toBe(0) + + queue.add('key1', 'item1') + queue.add('key2', 'item2') + + expect(queue.count()).toBe(2) + }) + + test('should not process items if already processing', async () => { + const processSpy = vi.fn() + queue = new BatchingQueue({ + batchSize: 2, + maxWaitTime: 100, + processFunction: async (batch: string[]): Promise => { + processSpy(batch) + await new Promise((resolve) => setTimeout(resolve, 300)) + } + }) + + queue.add('key1', 'item1') + queue.add('key2', 'item2') + queue.add('key3', 'item3') + + await new Promise((resolve) => setTimeout(resolve, 200)) + + expect(processSpy).toHaveBeenCalledTimes(1) + expect(processSpy).toHaveBeenCalledWith(['item1', 'item2']) + + await new Promise((resolve) => setTimeout(resolve, 200)) + + expect(processSpy).toHaveBeenCalledTimes(2) + expect(processSpy).toHaveBeenCalledWith(['item3']) + }) +}) diff --git a/packages/objectloader2/src/queues/batchingQueue.ts b/packages/objectloader2/src/queues/batchingQueue.ts index dc9fa0f90..a28a96a45 100644 --- a/packages/objectloader2/src/queues/batchingQueue.ts +++ b/packages/objectloader2/src/queues/batchingQueue.ts @@ -1,41 +1,100 @@ +import { CustomLogger } from '../types/functions.js' import KeyedQueue from './keyedQueue.js' export default class BatchingQueue { #queue: KeyedQueue = new KeyedQueue() #batchSize: number #processFunction: (batch: T[]) => Promise + #timeoutId: ReturnType | null = null + #isProcessing = false + #logger: CustomLogger - #baseInterval: number - #minInterval: number - #maxInterval: number - - #processingLoop: Promise #disposed = false + #batchTimeout: number + + // Helper methods for cross-environment timeout handling + #getSetTimeoutFn(): typeof setTimeout { + // First check for window object (browser), then fallback to global (node), then just use setTimeout + return typeof window !== 'undefined' + ? window.setTimeout.bind(window) + : typeof global !== 'undefined' + ? global.setTimeout + : setTimeout + } + + #getClearTimeoutFn(): typeof clearTimeout { + // First check for window object (browser), then fallback to global (node), then just use clearTimeout + return typeof window !== 'undefined' + ? window.clearTimeout.bind(window) + : typeof global !== 'undefined' + ? global.clearTimeout + : clearTimeout + } constructor(params: { batchSize: number - maxWaitTime?: number + maxWaitTime: number processFunction: (batch: T[]) => Promise + logger?: CustomLogger }) { this.#batchSize = params.batchSize - this.#baseInterval = Math.min(params.maxWaitTime ?? 200, 200) // Initial batch time (ms) - this.#minInterval = Math.min(params.maxWaitTime ?? 100, 100) // Minimum batch time - this.#maxInterval = Math.min(params.maxWaitTime ?? 3000, 3000) // Maximum batch time this.#processFunction = params.processFunction - this.#processingLoop = this.#loop() + this.#batchTimeout = params.maxWaitTime + this.#logger = params.logger || ((): void => {}) } - async disposeAsync(): Promise { + dispose(): void { this.#disposed = true - await this.#processingLoop + if (this.#timeoutId) { + this.#getClearTimeoutFn()(this.#timeoutId) + } } add(key: string, item: T): void { this.#queue.enqueue(key, item) + this.#addCheck() } addAll(keys: string[], items: T[]): void { this.#queue.enqueueAll(keys, items) + this.#addCheck() + } + + #addCheck(): void { + if (this.#queue.size >= this.#batchSize) { + // Fire and forget, no need to await + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.#flush() + } else { + if (this.#timeoutId) { + this.#getClearTimeoutFn()(this.#timeoutId) + } + // eslint-disable-next-line @typescript-eslint/no-misused-promises + this.#timeoutId = this.#getSetTimeoutFn()(() => this.#flush(), this.#batchTimeout) + } + } + + async #flush(): Promise { + if (this.#timeoutId) { + this.#getClearTimeoutFn()(this.#timeoutId) + this.#timeoutId = null + } + + if (this.#isProcessing || this.#queue.size === 0) { + return + } + this.#isProcessing = true + + const batchToProcess = this.#getBatch(this.#batchSize) + + try { + await this.#processFunction(batchToProcess) + } catch (error) { + this.#logger('Batch processing failed:', error) + } finally { + this.#isProcessing = false + } + this.#addCheck() } get(id: string): T | undefined { @@ -53,37 +112,4 @@ export default class BatchingQueue { #getBatch(batchSize: number): T[] { return this.#queue.spliceValues(0, Math.min(batchSize, this.#queue.size)) } - - async #loop(): Promise { - let interval = this.#baseInterval - while (!this.#disposed || this.#queue.size > 0) { - const startTime = performance.now() - if (this.#queue.size > 0) { - const batch = this.#getBatch(this.#batchSize) - //console.log('running with queue size of ' + this.#queue.length) - await this.#processFunction(batch) - } - if (this.#queue.size < this.#batchSize / 2) { - //refigure interval - const endTime = performance.now() - const duration = endTime - startTime - if (duration > interval) { - interval = Math.min(interval * 1.5, this.#maxInterval) // Increase if slow or empty - } else { - interval = Math.max(interval * 0.8, this.#minInterval) // Decrease if fast - } - /*console.log( - 'queue is waiting ' + - interval / 1000 + - ' with queue size of ' + - this.#queue.length - )*/ - await this.#delay(interval) - } - } - } - - #delay(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)) - } } diff --git a/packages/objectloader2/src/queues/cacheWriter.ts b/packages/objectloader2/src/queues/cacheWriter.ts deleted file mode 100644 index e104ed529..000000000 --- a/packages/objectloader2/src/queues/cacheWriter.ts +++ /dev/null @@ -1,46 +0,0 @@ -import { Database } from '../core/interfaces.js' -import { CacheOptions } from '../core/options.js' -import { DefermentManager } from '../deferment/defermentManager.js' -import { Item } from '../types/types.js' -import BatchingQueue from './batchingQueue.js' -import Queue from './queue.js' - -export class CacheWriter implements Queue { - #writeQueue: BatchingQueue | undefined - #database: Database - #defermentManager: DefermentManager - #options: CacheOptions - #disposed = false - - constructor( - database: Database, - defermentManager: DefermentManager, - options: CacheOptions - ) { - this.#database = database - this.#defermentManager = defermentManager - this.#options = options - } - - add(item: Item): void { - if (!this.#writeQueue) { - this.#writeQueue = new BatchingQueue({ - batchSize: this.#options.maxCacheWriteSize, - maxWaitTime: this.#options.maxCacheBatchWriteWait, - processFunction: (batch: Item[]): Promise => - this.#database.saveBatch({ batch }) - }) - } - this.#defermentManager.undefer(item) - this.#writeQueue.add(item.baseId, item) - } - - async disposeAsync(): Promise { - await this.#writeQueue?.disposeAsync() - this.#disposed = true - } - - get isDisposed(): boolean { - return this.#disposed - } -}