From d7f87076cceb18ecaa5144dd6a66414e84407d3f Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Thu, 25 Sep 2025 06:29:15 +0100 Subject: [PATCH 1/5] Fixes tests and batchingqueue during exceptions --- .../src/core/objectLoader2.spec.ts | 13 ++++++++--- .../src/core/stages/cacheReader.spec.ts | 2 +- .../src/core/stages/cacheWriter.spec.ts | 2 +- .../src/deferment/defermentManager.test.ts | 15 ++++++++---- .../src/queues/batchingQueue.dispose.test.ts | 2 +- .../src/queues/batchingQueue.test.ts | 23 +++++++++++++++++++ .../objectloader2/src/queues/batchingQueue.ts | 13 +++++------ 7 files changed, 52 insertions(+), 18 deletions(-) diff --git a/packages/objectloader2/src/core/objectLoader2.spec.ts b/packages/objectloader2/src/core/objectLoader2.spec.ts index 996c65b95..412a34d95 100644 --- a/packages/objectloader2/src/core/objectLoader2.spec.ts +++ b/packages/objectloader2/src/core/objectLoader2.spec.ts @@ -5,6 +5,7 @@ import { IndexedDatabase } from './stages/indexedDatabase.js' import { IDBFactory, IDBKeyRange } from 'fake-indexeddb' import { MemoryDatabase } from './stages/memory/memoryDatabase.js' import { MemoryDownloader } from './stages/memory/memoryDownloader.js' +import { DefermentManager } from '../deferment/defermentManager.js' describe('objectloader2', () => { test('can get a root object from cache', async () => { @@ -17,6 +18,7 @@ describe('objectloader2', () => { const loader = new ObjectLoader2({ rootId, downloader, + deferments: new DefermentManager(() => {}), database: new IndexedDatabase({ indexedDB: new IDBFactory(), keyRange: IDBKeyRange @@ -37,6 +39,7 @@ describe('objectloader2', () => { const loader = new ObjectLoader2({ rootId, downloader, + deferments: new DefermentManager(() => {}), database: new IndexedDatabase({ indexedDB: new IDBFactory(), keyRange: IDBKeyRange @@ -58,6 +61,7 @@ describe('objectloader2', () => { const loader = new ObjectLoader2({ rootId, downloader, + deferments: new DefermentManager(() => {}), database: new IndexedDatabase({ indexedDB: new IDBFactory(), keyRange: IDBKeyRange @@ -97,7 +101,8 @@ describe('objectloader2', () => { const loader = new ObjectLoader2({ rootId: root.baseId, downloader: new MemoryDownloader(rootId, records), - database: new MemoryDatabase({ items: records }) + database: new MemoryDatabase({ items: records }), + deferments: new DefermentManager(() => {}) }) const r = [] @@ -139,7 +144,8 @@ describe('objectloader2', () => { const loader = new ObjectLoader2({ rootId: root.baseId, downloader: new MemoryDownloader(rootId, records), - database: new MemoryDatabase({ items: records }) + database: new MemoryDatabase({ items: records }), + deferments: new DefermentManager(() => {}) }) const r = [] const obj = loader.getObject({ id: child1.baseId }) @@ -180,7 +186,8 @@ describe('objectloader2', () => { database: new IndexedDatabase({ indexedDB: new IDBFactory(), keyRange: IDBKeyRange - }) + }), + deferments: new DefermentManager(() => {}) }) const x = await loader.getRootObject() await loader.disposeAsync() diff --git a/packages/objectloader2/src/core/stages/cacheReader.spec.ts b/packages/objectloader2/src/core/stages/cacheReader.spec.ts index a6e04fee0..e9a168000 100644 --- a/packages/objectloader2/src/core/stages/cacheReader.spec.ts +++ b/packages/objectloader2/src/core/stages/cacheReader.spec.ts @@ -10,7 +10,7 @@ describe('CacheReader testing', () => { const i1: Item = { baseId: 'id1', base: { id: 'id', speckle_type: 'type' } } const cache = new MemoryCache({ maxSizeInMb: 1, ttlms: 1 }, () => {}) - const deferments = new DefermentManager(cache, () => {}) + const deferments = new DefermentManager(() => {}, cache) const cacheReader = new CacheReader( new MemoryDatabase({ items: new Map([[i1.baseId, i1.base!]]) diff --git a/packages/objectloader2/src/core/stages/cacheWriter.spec.ts b/packages/objectloader2/src/core/stages/cacheWriter.spec.ts index ea79a5782..bf12598b1 100644 --- a/packages/objectloader2/src/core/stages/cacheWriter.spec.ts +++ b/packages/objectloader2/src/core/stages/cacheWriter.spec.ts @@ -42,7 +42,7 @@ describe('CacheWriter', () => { ttlms: 60000 } memoryCache = new MemoryCache(memoryCacheOptions, logger) - defermentManager = new DefermentManager(memoryCache, logger) + defermentManager = new DefermentManager(logger, memoryCache) requestItemMock = vi.fn() options = { diff --git a/packages/objectloader2/src/deferment/defermentManager.test.ts b/packages/objectloader2/src/deferment/defermentManager.test.ts index 94f279463..6818b1f5b 100644 --- a/packages/objectloader2/src/deferment/defermentManager.test.ts +++ b/packages/objectloader2/src/deferment/defermentManager.test.ts @@ -9,7 +9,8 @@ describe('DefermentManager', () => { const mockLogger: CustomLogger = vi.fn() const mockCache = { get: vi.fn(), - add: vi.fn() + add: vi.fn(), + dispose: vi.fn() } as unknown as MemoryCache const defermentManager = new DefermentManager(mockLogger, mockCache) expect(defermentManager).toBeDefined() @@ -81,7 +82,8 @@ describe('DefermentManager', () => { const add = vi.fn() const mockCache = { get, - add + add, + dispose: vi.fn() } as unknown as MemoryCache const defermentManager = new DefermentManager(mockLogger, mockCache) @@ -165,7 +167,8 @@ describe('DefermentManager', () => { const add = vi.fn() const mockCache = { get, - add + add, + dispose: vi.fn() } as unknown as MemoryCache const defermentManager = new DefermentManager(mockLogger, mockCache) const requestItem = vi.fn() @@ -189,7 +192,8 @@ describe('DefermentManager', () => { const add = vi.fn() const mockCache = { get, - add + add, + dispose: vi.fn() } as unknown as MemoryCache const defermentManager = new DefermentManager(mockLogger, mockCache) @@ -205,7 +209,8 @@ describe('DefermentManager', () => { const add = vi.fn() const mockCache = { get, - add + add, + dispose: vi.fn() } as unknown as MemoryCache const defermentManager = new DefermentManager(mockLogger, mockCache) diff --git a/packages/objectloader2/src/queues/batchingQueue.dispose.test.ts b/packages/objectloader2/src/queues/batchingQueue.dispose.test.ts index 163f5a4e6..f39ee9757 100644 --- a/packages/objectloader2/src/queues/batchingQueue.dispose.test.ts +++ b/packages/objectloader2/src/queues/batchingQueue.dispose.test.ts @@ -17,7 +17,7 @@ describe('BatchingQueue disposal', () => { await queue.disposeAsync() - expect(processFunction).not.toHaveBeenCalled() + expect(processFunction).toHaveBeenCalled() expect(queue.count()).toBe(0) expect(queue.isDisposed()).toBe(true) }) diff --git a/packages/objectloader2/src/queues/batchingQueue.test.ts b/packages/objectloader2/src/queues/batchingQueue.test.ts index 027435e52..a016520a3 100644 --- a/packages/objectloader2/src/queues/batchingQueue.test.ts +++ b/packages/objectloader2/src/queues/batchingQueue.test.ts @@ -146,4 +146,27 @@ describe('BatchingQueue', () => { await queue.disposeAsync() } }) + + test('should handle processFunction throwing an exception during flush and is disposed', async () => { + const errorMessage = 'Process function failed' + const processFunction = vi.fn().mockRejectedValue(new Error(errorMessage)) + + const queue = new BatchingQueue<{ id: string }>({ + batchSize: 5, + maxWaitTime: 1000, + processFunction + }) + + const items = Array.from({ length: 3 }, (_, i) => ({ id: `item-${i}` })) + items.forEach((item) => queue.add(item.id, item)) + + expect(queue.count()).toBe(3) + + // flush should not throw even if processFunction rejects + await expect(queue.flush()).resolves.not.toThrow() + + expect(processFunction).toHaveBeenCalled() + expect(queue.count()).toBe(0) + expect(queue.isDisposed()).toBe(true) + }) }) diff --git a/packages/objectloader2/src/queues/batchingQueue.ts b/packages/objectloader2/src/queues/batchingQueue.ts index 9d1c3d943..57dfaf5a9 100644 --- a/packages/objectloader2/src/queues/batchingQueue.ts +++ b/packages/objectloader2/src/queues/batchingQueue.ts @@ -46,6 +46,7 @@ export default class BatchingQueue { } async disposeAsync(): Promise { + if (this.#disposed) return this.#disposed = true if (this.#timeoutId) { this.#getClearTimeoutFn()(this.#timeoutId) @@ -62,7 +63,7 @@ export default class BatchingQueue { // After any ongoing flush is completed, there might be items in the queue. // We should flush them. if (this.#queue.size > 0) { - await this.#flush() + await this.flush() } } @@ -83,17 +84,17 @@ export default class BatchingQueue { if (this.#queue.size >= this.#batchSize) { // Fire and forget, no need to await // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.#flush() + this.flush() } else { if (this.#timeoutId) { this.#getClearTimeoutFn()(this.#timeoutId) } // eslint-disable-next-line @typescript-eslint/no-misused-promises - this.#timeoutId = this.#getSetTimeoutFn()(() => this.#flush(), this.#batchTimeout) + this.#timeoutId = this.#getSetTimeoutFn()(() => this.flush(), this.#batchTimeout) } } - async #flush(): Promise { + async flush(): Promise { if (this.#timeoutId) { this.#getClearTimeoutFn()(this.#timeoutId) this.#timeoutId = null @@ -104,10 +105,8 @@ export default class BatchingQueue { } this.#isProcessing = true - const batchToProcess = this.#getBatch(this.#batchSize) - if (this.#disposed) return - try { + const batchToProcess = this.#getBatch(this.#batchSize) await this.#processFunction(batchToProcess) } catch (error) { console.error('Batch processing failed:', error) From 3bd835f03a770856a2ff46c0384fe98fbbf39b28 Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Tue, 30 Sep 2025 12:16:19 +0100 Subject: [PATCH 2/5] Erroring ends processing and adding, Processing just skips itself and Disposal does flush --- .../src/core/stages/serverDownloader.spec.ts | 122 ++++++++++++++++++ .../src/queues/batchingQueue.test.ts | 59 +++++++++ .../objectloader2/src/queues/batchingQueue.ts | 23 ++-- 3 files changed, 195 insertions(+), 9 deletions(-) diff --git a/packages/objectloader2/src/core/stages/serverDownloader.spec.ts b/packages/objectloader2/src/core/stages/serverDownloader.spec.ts index 2394ca8e2..dc45ba030 100644 --- a/packages/objectloader2/src/core/stages/serverDownloader.spec.ts +++ b/packages/objectloader2/src/core/stages/serverDownloader.spec.ts @@ -257,4 +257,126 @@ describe('downloader', () => { }) await downloader.disposeAsync() }) + + test('nothing is frozen when validateResponse returns 403', async () => { + const fetchMocker = createFetchMock(vi) + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) + + // Mock a 403 Forbidden response + fetchMocker.mockResponseOnce('', { status: 403, statusText: 'Forbidden' }) + + const gathered = new AsyncGeneratorQueue() + const downloader = new ServerDownloader({ + serverUrl: 'http://speckle.test', + streamId: 'streamId', + objectId: 'objectId', + token: 'invalid-token', + fetch: fetchMocker, + logger: (): void => {} + }) + + try { + downloader.initialize({ + results: gathered, + total: 2, + maxDownloadBatchWait: 100 + }) + + // Add items to trigger batch processing + downloader.add('id1') + downloader.add('id2') + + // Wait for the batch to be processed and fail with 403 + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Verify that the error was logged (indicating the batch processing failed) + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Batch processing failed:', + expect.any(Error) + ) + + // The key test: verify we can still dispose the downloader properly + // This ensures the system isn't frozen and can clean up resources + const disposePromise = downloader.disposeAsync() + + // Add a timeout to ensure disposal doesn't hang indefinitely + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error('Disposal timed out')), 5000) + }) + + // This should complete without timing out or throwing + await Promise.race([disposePromise, timeoutPromise]) + + // Additional verification: the batching queue should be marked as disposed + // We can't directly access the private field, but we can verify disposal completed + expect(true).toBe(true) // If we reach here, disposal succeeded + } finally { + consoleErrorSpy.mockRestore() + } + }) + + test('system remains functional after 403 error and can be properly cleaned up', async () => { + const fetchMocker = createFetchMock(vi) + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) + + // First call returns 403, subsequent calls should not be made due to queue disposal + fetchMocker.mockResponseOnce('', { status: 403, statusText: 'Forbidden' }) + + const gathered = new AsyncGeneratorQueue() + const downloader = new ServerDownloader({ + serverUrl: 'http://speckle.test', + streamId: 'streamId', + objectId: 'objectId', + token: 'invalid-token', + fetch: fetchMocker, + logger: (): void => {} + }) + + try { + downloader.initialize({ + results: gathered, + total: 5, + maxDownloadBatchWait: 50 + }) + + // Add first batch that will trigger the 403 error + downloader.add('id1') + downloader.add('id2') + + // Wait for first batch to fail + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Verify error was logged + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Batch processing failed:', + expect.any(Error) + ) + + // Try to add more items after the failure + // These should be ignored since the queue is now disposed + downloader.add('id3') + downloader.add('id4') + downloader.add('id5') + + // Wait a bit more to ensure no additional processing attempts + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Note: The batching queue might make multiple attempts before disposal + // The key is that disposal should still work regardless of how many calls were made + expect(fetchMocker).toHaveBeenCalled() + + // Critical test: disposal should complete without hanging + const start = Date.now() + await downloader.disposeAsync() + const elapsed = Date.now() - start + + // Disposal should be quick (under 1 second) and not hang + expect(elapsed).toBeLessThan(1000) + + // Verify that the results queue can also be disposed properly + await gathered.disposeAsync() + } finally { + consoleErrorSpy.mockRestore() + } + }) }) diff --git a/packages/objectloader2/src/queues/batchingQueue.test.ts b/packages/objectloader2/src/queues/batchingQueue.test.ts index 027435e52..281931884 100644 --- a/packages/objectloader2/src/queues/batchingQueue.test.ts +++ b/packages/objectloader2/src/queues/batchingQueue.test.ts @@ -146,4 +146,63 @@ describe('BatchingQueue', () => { await queue.disposeAsync() } }) + + test('should stop processing when exception occurs in processFunction without disposal', async () => { + const processSpy = vi.fn() + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) + + let callCount = 0 + const queue = new BatchingQueue({ + batchSize: 2, + maxWaitTime: 100, + processFunction: async (batch: string[]): Promise => { + callCount++ + processSpy(batch) + + // Add small delay to simulate async processing + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Throw error on first batch to simulate exception + if (callCount === 1) { + throw new Error('Processing failed') + } + } + }) + + try { + // Add first batch that will cause exception + queue.add('key1', 'item1') + queue.add('key2', 'item2') + + // Wait for first batch to be processed and fail + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Verify first batch was attempted and error was logged + expect(processSpy).toHaveBeenCalledTimes(1) + expect(processSpy).toHaveBeenCalledWith(['item1', 'item2']) + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Batch processing failed:', + expect.any(Error) + ) + + // Verify queue is marked as errored due to the exception + expect(queue.isDisposed()).toBe(false) + expect(queue.isErrored()).toBe(true) + + // Add more items after the exception + queue.add('key3', 'item3') + queue.add('key4', 'item4') + + // Wait to see if second batch gets processed (it shouldn't due to errored state) + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Verify no further processing happened - the queue stopped due to exception + expect(processSpy).toHaveBeenCalledTimes(1) // Still only 1 call + expect(queue.count()).toBe(0) // Items were not added due to errored state + } finally { + consoleErrorSpy.mockRestore() + // Note: Not calling disposeAsync() here since the queue is already disposed + // This demonstrates the problematic behavior - the queue disposed itself due to exception + } + }) }) diff --git a/packages/objectloader2/src/queues/batchingQueue.ts b/packages/objectloader2/src/queues/batchingQueue.ts index 9d1c3d943..f0595f6d6 100644 --- a/packages/objectloader2/src/queues/batchingQueue.ts +++ b/packages/objectloader2/src/queues/batchingQueue.ts @@ -11,9 +11,10 @@ export default class BatchingQueue { #batchSize: number #processFunction: (batch: T[]) => Promise #timeoutId: ReturnType | null = null - #isProcessing = false - #disposed = false + #isProcessing = false + #isDisposed = false + #isErrored = false #batchTimeout: number // Helper methods for cross-environment timeout handling @@ -46,7 +47,7 @@ export default class BatchingQueue { } async disposeAsync(): Promise { - this.#disposed = true + this.#isDisposed = true if (this.#timeoutId) { this.#getClearTimeoutFn()(this.#timeoutId) this.#timeoutId = null @@ -67,19 +68,19 @@ export default class BatchingQueue { } add(key: string, item: T): void { - if (this.#disposed) return + if (this.#isDisposed || this.#isErrored) return this.#queue.enqueue(key, item) this.#addCheck() } addAll(keys: string[], items: T[]): void { - if (this.#disposed) return + if (this.#isDisposed || this.#isErrored) return this.#queue.enqueueAll(keys, items) this.#addCheck() } #addCheck(): void { - if (this.#disposed) return + if (this.#isDisposed) return if (this.#queue.size >= this.#batchSize) { // Fire and forget, no need to await // eslint-disable-next-line @typescript-eslint/no-floating-promises @@ -99,7 +100,7 @@ export default class BatchingQueue { this.#timeoutId = null } - if (this.#isProcessing || this.#queue.size === 0) { + if (this.#isErrored || this.#isProcessing || this.#queue.size === 0) { return } this.#isProcessing = true @@ -111,7 +112,7 @@ export default class BatchingQueue { await this.#processFunction(batchToProcess) } catch (error) { console.error('Batch processing failed:', error) - this.#disposed = true + this.#isErrored = true } finally { this.#isProcessing = false } @@ -127,7 +128,11 @@ export default class BatchingQueue { } isDisposed(): boolean { - return this.#disposed + return this.#isDisposed + } + + isErrored(): boolean { + return this.#isErrored } #getBatch(batchSize: number): T[] { From 02ae8c38e9d5aeb5bbf18402e8f67510009ae521 Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Tue, 30 Sep 2025 13:05:13 +0100 Subject: [PATCH 3/5] fix tests --- .../objectloader2/src/queues/batchingQueue.test.ts | 12 +++++++++++- packages/objectloader2/src/queues/batchingQueue.ts | 1 + 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/packages/objectloader2/src/queues/batchingQueue.test.ts b/packages/objectloader2/src/queues/batchingQueue.test.ts index a016520a3..10e9b45d7 100644 --- a/packages/objectloader2/src/queues/batchingQueue.test.ts +++ b/packages/objectloader2/src/queues/batchingQueue.test.ts @@ -167,6 +167,16 @@ describe('BatchingQueue', () => { expect(processFunction).toHaveBeenCalled() expect(queue.count()).toBe(0) - expect(queue.isDisposed()).toBe(true) + expect(queue.isDisposed()).toBe(false) + expect(queue.isErrored()).toBe(true) + // Add more items after the exception + queue.add('key3', { id: `item-3` }) + queue.add('key4', { id: `item-4` }) + + // Wait to see if second batch gets processed (it shouldn't due to errored state) + await new Promise((resolve) => setTimeout(resolve, 200)) + + expect(queue.count()).toBe(0) // Items were not added due to errored state + await queue.disposeAsync() }) }) diff --git a/packages/objectloader2/src/queues/batchingQueue.ts b/packages/objectloader2/src/queues/batchingQueue.ts index 784bbbae3..0b8dd5d2e 100644 --- a/packages/objectloader2/src/queues/batchingQueue.ts +++ b/packages/objectloader2/src/queues/batchingQueue.ts @@ -47,6 +47,7 @@ export default class BatchingQueue { } async disposeAsync(): Promise { + if (this.#isDisposed) return this.#isDisposed = true if (this.#timeoutId) { this.#getClearTimeoutFn()(this.#timeoutId) From 63a819ad608fad3fd28395e34b49936f6e8b5526 Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Tue, 30 Sep 2025 13:12:15 +0100 Subject: [PATCH 4/5] add more tests for draining --- .../src/queues/batchingQueue.test.ts | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/packages/objectloader2/src/queues/batchingQueue.test.ts b/packages/objectloader2/src/queues/batchingQueue.test.ts index 10e9b45d7..c683b8a78 100644 --- a/packages/objectloader2/src/queues/batchingQueue.test.ts +++ b/packages/objectloader2/src/queues/batchingQueue.test.ts @@ -179,4 +179,91 @@ describe('BatchingQueue', () => { expect(queue.count()).toBe(0) // Items were not added due to errored state await queue.disposeAsync() }) + + test('should drain remaining items when disposed', async () => { + const processSpy = vi.fn() + const queue = new BatchingQueue({ + batchSize: 5, // Large batch size to prevent automatic processing + maxWaitTime: 10000, // Long timeout to prevent timeout-based processing + processFunction: async (batch: string[]): Promise => { + await new Promise((resolve) => setTimeout(resolve, 10)) + processSpy(batch) + } + }) + + // Add items that won't trigger automatic processing (less than batch size) + queue.add('key1', 'item1') + queue.add('key2', 'item2') + queue.add('key3', 'item3') + + // Verify items are in queue but haven't been processed yet + expect(queue.count()).toBe(3) + expect(processSpy).not.toHaveBeenCalled() + + // Dispose should drain the remaining items + await queue.disposeAsync() + + // Verify all items were processed during disposal + expect(processSpy).toHaveBeenCalledTimes(1) + expect(processSpy).toHaveBeenCalledWith(['item1', 'item2', 'item3']) + expect(queue.count()).toBe(0) + expect(queue.isDisposed()).toBe(true) + }) + + test('should drain items even with ongoing processing during dispose', async () => { + const processSpy = vi.fn() + let firstBatchStarted = false + let allowFirstBatchToComplete: (() => void) | undefined = undefined + + const queue = new BatchingQueue({ + batchSize: 2, + maxWaitTime: 100, + processFunction: async (batch: string[]): Promise => { + processSpy(batch) + + // Make the first batch wait for our signal + if (!firstBatchStarted) { + firstBatchStarted = true + await new Promise((resolve) => { + allowFirstBatchToComplete = resolve + }) + } else { + // Other batches process normally + await new Promise((resolve) => setTimeout(resolve, 10)) + } + } + }) + + // Add first batch that will trigger processing but will be blocked + queue.add('key1', 'item1') + queue.add('key2', 'item2') + + // Wait for first batch to start processing + await new Promise((resolve) => setTimeout(resolve, 50)) + expect(firstBatchStarted).toBe(true) + expect(processSpy).toHaveBeenCalledTimes(1) + + // Add more items while first batch is still processing + queue.add('key3', 'item3') + queue.add('key4', 'item4') + + // Verify the additional items are queued + expect(queue.count()).toBe(2) + + // Start disposal (this should wait for ongoing processing and then drain) + const disposePromise = queue.disposeAsync() + + // Allow the first batch to complete + allowFirstBatchToComplete?.() + + // Wait for disposal to complete + await disposePromise + + // Verify all batches were processed + expect(processSpy).toHaveBeenCalledTimes(2) + expect(processSpy).toHaveBeenCalledWith(['item1', 'item2']) + expect(processSpy).toHaveBeenCalledWith(['item3', 'item4']) + expect(queue.count()).toBe(0) + expect(queue.isDisposed()).toBe(true) + }) }) From 49a0faa05072430e518f6c6c8dacf0720f8a7b46 Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Wed, 1 Oct 2025 08:30:27 +0100 Subject: [PATCH 5/5] fix test --- packages/objectloader2/src/queues/batchingQueue.test.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/objectloader2/src/queues/batchingQueue.test.ts b/packages/objectloader2/src/queues/batchingQueue.test.ts index c683b8a78..727c1f96b 100644 --- a/packages/objectloader2/src/queues/batchingQueue.test.ts +++ b/packages/objectloader2/src/queues/batchingQueue.test.ts @@ -213,7 +213,7 @@ describe('BatchingQueue', () => { test('should drain items even with ongoing processing during dispose', async () => { const processSpy = vi.fn() let firstBatchStarted = false - let allowFirstBatchToComplete: (() => void) | undefined = undefined + let allowFirstBatchToComplete: (() => void) | null = null const queue = new BatchingQueue({ batchSize: 2, @@ -238,10 +238,11 @@ describe('BatchingQueue', () => { queue.add('key1', 'item1') queue.add('key2', 'item2') - // Wait for first batch to start processing + // Wait for first batch to start processing and allowFirstBatchToComplete to be assigned await new Promise((resolve) => setTimeout(resolve, 50)) expect(firstBatchStarted).toBe(true) expect(processSpy).toHaveBeenCalledTimes(1) + expect(allowFirstBatchToComplete).not.toBeNull() // Add more items while first batch is still processing queue.add('key3', 'item3') @@ -254,7 +255,7 @@ describe('BatchingQueue', () => { const disposePromise = queue.disposeAsync() // Allow the first batch to complete - allowFirstBatchToComplete?.() + allowFirstBatchToComplete!() // Wait for disposal to complete await disposePromise