From 3bd835f03a770856a2ff46c0384fe98fbbf39b28 Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Tue, 30 Sep 2025 12:16:19 +0100 Subject: [PATCH] Erroring ends processing and adding, Processing just skips itself and Disposal does flush --- .../src/core/stages/serverDownloader.spec.ts | 122 ++++++++++++++++++ .../src/queues/batchingQueue.test.ts | 59 +++++++++ .../objectloader2/src/queues/batchingQueue.ts | 23 ++-- 3 files changed, 195 insertions(+), 9 deletions(-) diff --git a/packages/objectloader2/src/core/stages/serverDownloader.spec.ts b/packages/objectloader2/src/core/stages/serverDownloader.spec.ts index 2394ca8e2..dc45ba030 100644 --- a/packages/objectloader2/src/core/stages/serverDownloader.spec.ts +++ b/packages/objectloader2/src/core/stages/serverDownloader.spec.ts @@ -257,4 +257,126 @@ describe('downloader', () => { }) await downloader.disposeAsync() }) + + test('nothing is frozen when validateResponse returns 403', async () => { + const fetchMocker = createFetchMock(vi) + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) + + // Mock a 403 Forbidden response + fetchMocker.mockResponseOnce('', { status: 403, statusText: 'Forbidden' }) + + const gathered = new AsyncGeneratorQueue() + const downloader = new ServerDownloader({ + serverUrl: 'http://speckle.test', + streamId: 'streamId', + objectId: 'objectId', + token: 'invalid-token', + fetch: fetchMocker, + logger: (): void => {} + }) + + try { + downloader.initialize({ + results: gathered, + total: 2, + maxDownloadBatchWait: 100 + }) + + // Add items to trigger batch processing + downloader.add('id1') + downloader.add('id2') + + // Wait for the batch to be processed and fail with 403 + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Verify that the error was logged (indicating the batch processing failed) + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Batch processing failed:', + expect.any(Error) + ) + + // The key test: verify we can still dispose the downloader properly + // This ensures the system isn't frozen and can clean up resources + const disposePromise = downloader.disposeAsync() + + // Add a timeout to ensure disposal doesn't hang indefinitely + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error('Disposal timed out')), 5000) + }) + + // This should complete without timing out or throwing + await Promise.race([disposePromise, timeoutPromise]) + + // Additional verification: the batching queue should be marked as disposed + // We can't directly access the private field, but we can verify disposal completed + expect(true).toBe(true) // If we reach here, disposal succeeded + } finally { + consoleErrorSpy.mockRestore() + } + }) + + test('system remains functional after 403 error and can be properly cleaned up', async () => { + const fetchMocker = createFetchMock(vi) + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) + + // First call returns 403, subsequent calls should not be made due to queue disposal + fetchMocker.mockResponseOnce('', { status: 403, statusText: 'Forbidden' }) + + const gathered = new AsyncGeneratorQueue() + const downloader = new ServerDownloader({ + serverUrl: 'http://speckle.test', + streamId: 'streamId', + objectId: 'objectId', + token: 'invalid-token', + fetch: fetchMocker, + logger: (): void => {} + }) + + try { + downloader.initialize({ + results: gathered, + total: 5, + maxDownloadBatchWait: 50 + }) + + // Add first batch that will trigger the 403 error + downloader.add('id1') + downloader.add('id2') + + // Wait for first batch to fail + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Verify error was logged + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Batch processing failed:', + expect.any(Error) + ) + + // Try to add more items after the failure + // These should be ignored since the queue is now disposed + downloader.add('id3') + downloader.add('id4') + downloader.add('id5') + + // Wait a bit more to ensure no additional processing attempts + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Note: The batching queue might make multiple attempts before disposal + // The key is that disposal should still work regardless of how many calls were made + expect(fetchMocker).toHaveBeenCalled() + + // Critical test: disposal should complete without hanging + const start = Date.now() + await downloader.disposeAsync() + const elapsed = Date.now() - start + + // Disposal should be quick (under 1 second) and not hang + expect(elapsed).toBeLessThan(1000) + + // Verify that the results queue can also be disposed properly + await gathered.disposeAsync() + } finally { + consoleErrorSpy.mockRestore() + } + }) }) diff --git a/packages/objectloader2/src/queues/batchingQueue.test.ts b/packages/objectloader2/src/queues/batchingQueue.test.ts index 027435e52..281931884 100644 --- a/packages/objectloader2/src/queues/batchingQueue.test.ts +++ b/packages/objectloader2/src/queues/batchingQueue.test.ts @@ -146,4 +146,63 @@ describe('BatchingQueue', () => { await queue.disposeAsync() } }) + + test('should stop processing when exception occurs in processFunction without disposal', async () => { + const processSpy = vi.fn() + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) + + let callCount = 0 + const queue = new BatchingQueue({ + batchSize: 2, + maxWaitTime: 100, + processFunction: async (batch: string[]): Promise => { + callCount++ + processSpy(batch) + + // Add small delay to simulate async processing + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Throw error on first batch to simulate exception + if (callCount === 1) { + throw new Error('Processing failed') + } + } + }) + + try { + // Add first batch that will cause exception + queue.add('key1', 'item1') + queue.add('key2', 'item2') + + // Wait for first batch to be processed and fail + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Verify first batch was attempted and error was logged + expect(processSpy).toHaveBeenCalledTimes(1) + expect(processSpy).toHaveBeenCalledWith(['item1', 'item2']) + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Batch processing failed:', + expect.any(Error) + ) + + // Verify queue is marked as errored due to the exception + expect(queue.isDisposed()).toBe(false) + expect(queue.isErrored()).toBe(true) + + // Add more items after the exception + queue.add('key3', 'item3') + queue.add('key4', 'item4') + + // Wait to see if second batch gets processed (it shouldn't due to errored state) + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Verify no further processing happened - the queue stopped due to exception + expect(processSpy).toHaveBeenCalledTimes(1) // Still only 1 call + expect(queue.count()).toBe(0) // Items were not added due to errored state + } finally { + consoleErrorSpy.mockRestore() + // Note: Not calling disposeAsync() here since the queue is already disposed + // This demonstrates the problematic behavior - the queue disposed itself due to exception + } + }) }) diff --git a/packages/objectloader2/src/queues/batchingQueue.ts b/packages/objectloader2/src/queues/batchingQueue.ts index 9d1c3d943..f0595f6d6 100644 --- a/packages/objectloader2/src/queues/batchingQueue.ts +++ b/packages/objectloader2/src/queues/batchingQueue.ts @@ -11,9 +11,10 @@ export default class BatchingQueue { #batchSize: number #processFunction: (batch: T[]) => Promise #timeoutId: ReturnType | null = null - #isProcessing = false - #disposed = false + #isProcessing = false + #isDisposed = false + #isErrored = false #batchTimeout: number // Helper methods for cross-environment timeout handling @@ -46,7 +47,7 @@ export default class BatchingQueue { } async disposeAsync(): Promise { - this.#disposed = true + this.#isDisposed = true if (this.#timeoutId) { this.#getClearTimeoutFn()(this.#timeoutId) this.#timeoutId = null @@ -67,19 +68,19 @@ export default class BatchingQueue { } add(key: string, item: T): void { - if (this.#disposed) return + if (this.#isDisposed || this.#isErrored) return this.#queue.enqueue(key, item) this.#addCheck() } addAll(keys: string[], items: T[]): void { - if (this.#disposed) return + if (this.#isDisposed || this.#isErrored) return this.#queue.enqueueAll(keys, items) this.#addCheck() } #addCheck(): void { - if (this.#disposed) return + if (this.#isDisposed) return if (this.#queue.size >= this.#batchSize) { // Fire and forget, no need to await // eslint-disable-next-line @typescript-eslint/no-floating-promises @@ -99,7 +100,7 @@ export default class BatchingQueue { this.#timeoutId = null } - if (this.#isProcessing || this.#queue.size === 0) { + if (this.#isErrored || this.#isProcessing || this.#queue.size === 0) { return } this.#isProcessing = true @@ -111,7 +112,7 @@ export default class BatchingQueue { await this.#processFunction(batchToProcess) } catch (error) { console.error('Batch processing failed:', error) - this.#disposed = true + this.#isErrored = true } finally { this.#isProcessing = false } @@ -127,7 +128,11 @@ export default class BatchingQueue { } isDisposed(): boolean { - return this.#disposed + return this.#isDisposed + } + + isErrored(): boolean { + return this.#isErrored } #getBatch(batchSize: number): T[] {