diff --git a/packages/objectloader2/src/core/objectLoader2.spec.ts b/packages/objectloader2/src/core/objectLoader2.spec.ts index 996c65b95..412a34d95 100644 --- a/packages/objectloader2/src/core/objectLoader2.spec.ts +++ b/packages/objectloader2/src/core/objectLoader2.spec.ts @@ -5,6 +5,7 @@ import { IndexedDatabase } from './stages/indexedDatabase.js' import { IDBFactory, IDBKeyRange } from 'fake-indexeddb' import { MemoryDatabase } from './stages/memory/memoryDatabase.js' import { MemoryDownloader } from './stages/memory/memoryDownloader.js' +import { DefermentManager } from '../deferment/defermentManager.js' describe('objectloader2', () => { test('can get a root object from cache', async () => { @@ -17,6 +18,7 @@ describe('objectloader2', () => { const loader = new ObjectLoader2({ rootId, downloader, + deferments: new DefermentManager(() => {}), database: new IndexedDatabase({ indexedDB: new IDBFactory(), keyRange: IDBKeyRange @@ -37,6 +39,7 @@ describe('objectloader2', () => { const loader = new ObjectLoader2({ rootId, downloader, + deferments: new DefermentManager(() => {}), database: new IndexedDatabase({ indexedDB: new IDBFactory(), keyRange: IDBKeyRange @@ -58,6 +61,7 @@ describe('objectloader2', () => { const loader = new ObjectLoader2({ rootId, downloader, + deferments: new DefermentManager(() => {}), database: new IndexedDatabase({ indexedDB: new IDBFactory(), keyRange: IDBKeyRange @@ -97,7 +101,8 @@ describe('objectloader2', () => { const loader = new ObjectLoader2({ rootId: root.baseId, downloader: new MemoryDownloader(rootId, records), - database: new MemoryDatabase({ items: records }) + database: new MemoryDatabase({ items: records }), + deferments: new DefermentManager(() => {}) }) const r = [] @@ -139,7 +144,8 @@ describe('objectloader2', () => { const loader = new ObjectLoader2({ rootId: root.baseId, downloader: new MemoryDownloader(rootId, records), - database: new MemoryDatabase({ items: records }) + database: new MemoryDatabase({ items: records }), + deferments: new DefermentManager(() => {}) }) const r = [] const obj = loader.getObject({ id: child1.baseId }) @@ -180,7 +186,8 @@ describe('objectloader2', () => { database: new IndexedDatabase({ indexedDB: new IDBFactory(), keyRange: IDBKeyRange - }) + }), + deferments: new DefermentManager(() => {}) }) const x = await loader.getRootObject() await loader.disposeAsync() diff --git a/packages/objectloader2/src/core/stages/cacheReader.spec.ts b/packages/objectloader2/src/core/stages/cacheReader.spec.ts index a6e04fee0..e9a168000 100644 --- a/packages/objectloader2/src/core/stages/cacheReader.spec.ts +++ b/packages/objectloader2/src/core/stages/cacheReader.spec.ts @@ -10,7 +10,7 @@ describe('CacheReader testing', () => { const i1: Item = { baseId: 'id1', base: { id: 'id', speckle_type: 'type' } } const cache = new MemoryCache({ maxSizeInMb: 1, ttlms: 1 }, () => {}) - const deferments = new DefermentManager(cache, () => {}) + const deferments = new DefermentManager(() => {}, cache) const cacheReader = new CacheReader( new MemoryDatabase({ items: new Map([[i1.baseId, i1.base!]]) diff --git a/packages/objectloader2/src/core/stages/cacheWriter.spec.ts b/packages/objectloader2/src/core/stages/cacheWriter.spec.ts index ea79a5782..bf12598b1 100644 --- a/packages/objectloader2/src/core/stages/cacheWriter.spec.ts +++ b/packages/objectloader2/src/core/stages/cacheWriter.spec.ts @@ -42,7 +42,7 @@ describe('CacheWriter', () => { ttlms: 60000 } memoryCache = new MemoryCache(memoryCacheOptions, logger) - defermentManager = new DefermentManager(memoryCache, logger) + defermentManager = new DefermentManager(logger, memoryCache) requestItemMock = vi.fn() options = { diff --git a/packages/objectloader2/src/core/stages/serverDownloader.spec.ts b/packages/objectloader2/src/core/stages/serverDownloader.spec.ts index 2394ca8e2..dc45ba030 100644 --- a/packages/objectloader2/src/core/stages/serverDownloader.spec.ts +++ b/packages/objectloader2/src/core/stages/serverDownloader.spec.ts @@ -257,4 +257,126 @@ describe('downloader', () => { }) await downloader.disposeAsync() }) + + test('nothing is frozen when validateResponse returns 403', async () => { + const fetchMocker = createFetchMock(vi) + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) + + // Mock a 403 Forbidden response + fetchMocker.mockResponseOnce('', { status: 403, statusText: 'Forbidden' }) + + const gathered = new AsyncGeneratorQueue() + const downloader = new ServerDownloader({ + serverUrl: 'http://speckle.test', + streamId: 'streamId', + objectId: 'objectId', + token: 'invalid-token', + fetch: fetchMocker, + logger: (): void => {} + }) + + try { + downloader.initialize({ + results: gathered, + total: 2, + maxDownloadBatchWait: 100 + }) + + // Add items to trigger batch processing + downloader.add('id1') + downloader.add('id2') + + // Wait for the batch to be processed and fail with 403 + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Verify that the error was logged (indicating the batch processing failed) + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Batch processing failed:', + expect.any(Error) + ) + + // The key test: verify we can still dispose the downloader properly + // This ensures the system isn't frozen and can clean up resources + const disposePromise = downloader.disposeAsync() + + // Add a timeout to ensure disposal doesn't hang indefinitely + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error('Disposal timed out')), 5000) + }) + + // This should complete without timing out or throwing + await Promise.race([disposePromise, timeoutPromise]) + + // Additional verification: the batching queue should be marked as disposed + // We can't directly access the private field, but we can verify disposal completed + expect(true).toBe(true) // If we reach here, disposal succeeded + } finally { + consoleErrorSpy.mockRestore() + } + }) + + test('system remains functional after 403 error and can be properly cleaned up', async () => { + const fetchMocker = createFetchMock(vi) + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) + + // First call returns 403, subsequent calls should not be made due to queue disposal + fetchMocker.mockResponseOnce('', { status: 403, statusText: 'Forbidden' }) + + const gathered = new AsyncGeneratorQueue() + const downloader = new ServerDownloader({ + serverUrl: 'http://speckle.test', + streamId: 'streamId', + objectId: 'objectId', + token: 'invalid-token', + fetch: fetchMocker, + logger: (): void => {} + }) + + try { + downloader.initialize({ + results: gathered, + total: 5, + maxDownloadBatchWait: 50 + }) + + // Add first batch that will trigger the 403 error + downloader.add('id1') + downloader.add('id2') + + // Wait for first batch to fail + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Verify error was logged + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Batch processing failed:', + expect.any(Error) + ) + + // Try to add more items after the failure + // These should be ignored since the queue is now disposed + downloader.add('id3') + downloader.add('id4') + downloader.add('id5') + + // Wait a bit more to ensure no additional processing attempts + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Note: The batching queue might make multiple attempts before disposal + // The key is that disposal should still work regardless of how many calls were made + expect(fetchMocker).toHaveBeenCalled() + + // Critical test: disposal should complete without hanging + const start = Date.now() + await downloader.disposeAsync() + const elapsed = Date.now() - start + + // Disposal should be quick (under 1 second) and not hang + expect(elapsed).toBeLessThan(1000) + + // Verify that the results queue can also be disposed properly + await gathered.disposeAsync() + } finally { + consoleErrorSpy.mockRestore() + } + }) }) diff --git a/packages/objectloader2/src/deferment/defermentManager.test.ts b/packages/objectloader2/src/deferment/defermentManager.test.ts index 94f279463..6818b1f5b 100644 --- a/packages/objectloader2/src/deferment/defermentManager.test.ts +++ b/packages/objectloader2/src/deferment/defermentManager.test.ts @@ -9,7 +9,8 @@ describe('DefermentManager', () => { const mockLogger: CustomLogger = vi.fn() const mockCache = { get: vi.fn(), - add: vi.fn() + add: vi.fn(), + dispose: vi.fn() } as unknown as MemoryCache const defermentManager = new DefermentManager(mockLogger, mockCache) expect(defermentManager).toBeDefined() @@ -81,7 +82,8 @@ describe('DefermentManager', () => { const add = vi.fn() const mockCache = { get, - add + add, + dispose: vi.fn() } as unknown as MemoryCache const defermentManager = new DefermentManager(mockLogger, mockCache) @@ -165,7 +167,8 @@ describe('DefermentManager', () => { const add = vi.fn() const mockCache = { get, - add + add, + dispose: vi.fn() } as unknown as MemoryCache const defermentManager = new DefermentManager(mockLogger, mockCache) const requestItem = vi.fn() @@ -189,7 +192,8 @@ describe('DefermentManager', () => { const add = vi.fn() const mockCache = { get, - add + add, + dispose: vi.fn() } as unknown as MemoryCache const defermentManager = new DefermentManager(mockLogger, mockCache) @@ -205,7 +209,8 @@ describe('DefermentManager', () => { const add = vi.fn() const mockCache = { get, - add + add, + dispose: vi.fn() } as unknown as MemoryCache const defermentManager = new DefermentManager(mockLogger, mockCache) diff --git a/packages/objectloader2/src/queues/batchingQueue.dispose.test.ts b/packages/objectloader2/src/queues/batchingQueue.dispose.test.ts index 163f5a4e6..f39ee9757 100644 --- a/packages/objectloader2/src/queues/batchingQueue.dispose.test.ts +++ b/packages/objectloader2/src/queues/batchingQueue.dispose.test.ts @@ -17,7 +17,7 @@ describe('BatchingQueue disposal', () => { await queue.disposeAsync() - expect(processFunction).not.toHaveBeenCalled() + expect(processFunction).toHaveBeenCalled() expect(queue.count()).toBe(0) expect(queue.isDisposed()).toBe(true) }) diff --git a/packages/objectloader2/src/queues/batchingQueue.test.ts b/packages/objectloader2/src/queues/batchingQueue.test.ts index 027435e52..727c1f96b 100644 --- a/packages/objectloader2/src/queues/batchingQueue.test.ts +++ b/packages/objectloader2/src/queues/batchingQueue.test.ts @@ -146,4 +146,125 @@ describe('BatchingQueue', () => { await queue.disposeAsync() } }) + + test('should handle processFunction throwing an exception during flush and is disposed', async () => { + const errorMessage = 'Process function failed' + const processFunction = vi.fn().mockRejectedValue(new Error(errorMessage)) + + const queue = new BatchingQueue<{ id: string }>({ + batchSize: 5, + maxWaitTime: 1000, + processFunction + }) + + const items = Array.from({ length: 3 }, (_, i) => ({ id: `item-${i}` })) + items.forEach((item) => queue.add(item.id, item)) + + expect(queue.count()).toBe(3) + + // flush should not throw even if processFunction rejects + await expect(queue.flush()).resolves.not.toThrow() + + expect(processFunction).toHaveBeenCalled() + expect(queue.count()).toBe(0) + expect(queue.isDisposed()).toBe(false) + expect(queue.isErrored()).toBe(true) + // Add more items after the exception + queue.add('key3', { id: `item-3` }) + queue.add('key4', { id: `item-4` }) + + // Wait to see if second batch gets processed (it shouldn't due to errored state) + await new Promise((resolve) => setTimeout(resolve, 200)) + + expect(queue.count()).toBe(0) // Items were not added due to errored state + await queue.disposeAsync() + }) + + test('should drain remaining items when disposed', async () => { + const processSpy = vi.fn() + const queue = new BatchingQueue({ + batchSize: 5, // Large batch size to prevent automatic processing + maxWaitTime: 10000, // Long timeout to prevent timeout-based processing + processFunction: async (batch: string[]): Promise => { + await new Promise((resolve) => setTimeout(resolve, 10)) + processSpy(batch) + } + }) + + // Add items that won't trigger automatic processing (less than batch size) + queue.add('key1', 'item1') + queue.add('key2', 'item2') + queue.add('key3', 'item3') + + // Verify items are in queue but haven't been processed yet + expect(queue.count()).toBe(3) + expect(processSpy).not.toHaveBeenCalled() + + // Dispose should drain the remaining items + await queue.disposeAsync() + + // Verify all items were processed during disposal + expect(processSpy).toHaveBeenCalledTimes(1) + expect(processSpy).toHaveBeenCalledWith(['item1', 'item2', 'item3']) + expect(queue.count()).toBe(0) + expect(queue.isDisposed()).toBe(true) + }) + + test('should drain items even with ongoing processing during dispose', async () => { + const processSpy = vi.fn() + let firstBatchStarted = false + let allowFirstBatchToComplete: (() => void) | null = null + + const queue = new BatchingQueue({ + batchSize: 2, + maxWaitTime: 100, + processFunction: async (batch: string[]): Promise => { + processSpy(batch) + + // Make the first batch wait for our signal + if (!firstBatchStarted) { + firstBatchStarted = true + await new Promise((resolve) => { + allowFirstBatchToComplete = resolve + }) + } else { + // Other batches process normally + await new Promise((resolve) => setTimeout(resolve, 10)) + } + } + }) + + // Add first batch that will trigger processing but will be blocked + queue.add('key1', 'item1') + queue.add('key2', 'item2') + + // Wait for first batch to start processing and allowFirstBatchToComplete to be assigned + await new Promise((resolve) => setTimeout(resolve, 50)) + expect(firstBatchStarted).toBe(true) + expect(processSpy).toHaveBeenCalledTimes(1) + expect(allowFirstBatchToComplete).not.toBeNull() + + // Add more items while first batch is still processing + queue.add('key3', 'item3') + queue.add('key4', 'item4') + + // Verify the additional items are queued + expect(queue.count()).toBe(2) + + // Start disposal (this should wait for ongoing processing and then drain) + const disposePromise = queue.disposeAsync() + + // Allow the first batch to complete + allowFirstBatchToComplete!() + + // Wait for disposal to complete + await disposePromise + + // Verify all batches were processed + expect(processSpy).toHaveBeenCalledTimes(2) + expect(processSpy).toHaveBeenCalledWith(['item1', 'item2']) + expect(processSpy).toHaveBeenCalledWith(['item3', 'item4']) + expect(queue.count()).toBe(0) + expect(queue.isDisposed()).toBe(true) + }) }) diff --git a/packages/objectloader2/src/queues/batchingQueue.ts b/packages/objectloader2/src/queues/batchingQueue.ts index 9d1c3d943..0b8dd5d2e 100644 --- a/packages/objectloader2/src/queues/batchingQueue.ts +++ b/packages/objectloader2/src/queues/batchingQueue.ts @@ -11,9 +11,10 @@ export default class BatchingQueue { #batchSize: number #processFunction: (batch: T[]) => Promise #timeoutId: ReturnType | null = null - #isProcessing = false - #disposed = false + #isProcessing = false + #isDisposed = false + #isErrored = false #batchTimeout: number // Helper methods for cross-environment timeout handling @@ -46,7 +47,8 @@ export default class BatchingQueue { } async disposeAsync(): Promise { - this.#disposed = true + if (this.#isDisposed) return + this.#isDisposed = true if (this.#timeoutId) { this.#getClearTimeoutFn()(this.#timeoutId) this.#timeoutId = null @@ -62,56 +64,54 @@ export default class BatchingQueue { // After any ongoing flush is completed, there might be items in the queue. // We should flush them. if (this.#queue.size > 0) { - await this.#flush() + await this.flush() } } add(key: string, item: T): void { - if (this.#disposed) return + if (this.#isDisposed || this.#isErrored) return this.#queue.enqueue(key, item) this.#addCheck() } addAll(keys: string[], items: T[]): void { - if (this.#disposed) return + if (this.#isDisposed || this.#isErrored) return this.#queue.enqueueAll(keys, items) this.#addCheck() } #addCheck(): void { - if (this.#disposed) return + if (this.#isDisposed) return if (this.#queue.size >= this.#batchSize) { // Fire and forget, no need to await // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.#flush() + this.flush() } else { if (this.#timeoutId) { this.#getClearTimeoutFn()(this.#timeoutId) } // eslint-disable-next-line @typescript-eslint/no-misused-promises - this.#timeoutId = this.#getSetTimeoutFn()(() => this.#flush(), this.#batchTimeout) + this.#timeoutId = this.#getSetTimeoutFn()(() => this.flush(), this.#batchTimeout) } } - async #flush(): Promise { + async flush(): Promise { if (this.#timeoutId) { this.#getClearTimeoutFn()(this.#timeoutId) this.#timeoutId = null } - if (this.#isProcessing || this.#queue.size === 0) { + if (this.#isErrored || this.#isProcessing || this.#queue.size === 0) { return } this.#isProcessing = true - const batchToProcess = this.#getBatch(this.#batchSize) - if (this.#disposed) return - try { + const batchToProcess = this.#getBatch(this.#batchSize) await this.#processFunction(batchToProcess) } catch (error) { console.error('Batch processing failed:', error) - this.#disposed = true + this.#isErrored = true } finally { this.#isProcessing = false } @@ -127,7 +127,11 @@ export default class BatchingQueue { } isDisposed(): boolean { - return this.#disposed + return this.#isDisposed + } + + isErrored(): boolean { + return this.#isErrored } #getBatch(batchSize: number): T[] {