Erroring ends processing and adding, Processing just skips itself and Disposal does flush

This commit is contained in:
Adam Hathcock
2025-09-30 12:16:19 +01:00
parent 0b7f06348c
commit 3bd835f03a
3 changed files with 195 additions and 9 deletions
@@ -257,4 +257,126 @@ describe('downloader', () => {
})
await downloader.disposeAsync()
})
test('nothing is frozen when validateResponse returns 403', async () => {
const fetchMocker = createFetchMock(vi)
const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {})
// Mock a 403 Forbidden response
fetchMocker.mockResponseOnce('', { status: 403, statusText: 'Forbidden' })
const gathered = new AsyncGeneratorQueue<Item>()
const downloader = new ServerDownloader({
serverUrl: 'http://speckle.test',
streamId: 'streamId',
objectId: 'objectId',
token: 'invalid-token',
fetch: fetchMocker,
logger: (): void => {}
})
try {
downloader.initialize({
results: gathered,
total: 2,
maxDownloadBatchWait: 100
})
// Add items to trigger batch processing
downloader.add('id1')
downloader.add('id2')
// Wait for the batch to be processed and fail with 403
await new Promise((resolve) => setTimeout(resolve, 200))
// Verify that the error was logged (indicating the batch processing failed)
expect(consoleErrorSpy).toHaveBeenCalledWith(
'Batch processing failed:',
expect.any(Error)
)
// The key test: verify we can still dispose the downloader properly
// This ensures the system isn't frozen and can clean up resources
const disposePromise = downloader.disposeAsync()
// Add a timeout to ensure disposal doesn't hang indefinitely
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('Disposal timed out')), 5000)
})
// This should complete without timing out or throwing
await Promise.race([disposePromise, timeoutPromise])
// Additional verification: the batching queue should be marked as disposed
// We can't directly access the private field, but we can verify disposal completed
expect(true).toBe(true) // If we reach here, disposal succeeded
} finally {
consoleErrorSpy.mockRestore()
}
})
test('system remains functional after 403 error and can be properly cleaned up', async () => {
const fetchMocker = createFetchMock(vi)
const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {})
// First call returns 403, subsequent calls should not be made due to queue disposal
fetchMocker.mockResponseOnce('', { status: 403, statusText: 'Forbidden' })
const gathered = new AsyncGeneratorQueue<Item>()
const downloader = new ServerDownloader({
serverUrl: 'http://speckle.test',
streamId: 'streamId',
objectId: 'objectId',
token: 'invalid-token',
fetch: fetchMocker,
logger: (): void => {}
})
try {
downloader.initialize({
results: gathered,
total: 5,
maxDownloadBatchWait: 50
})
// Add first batch that will trigger the 403 error
downloader.add('id1')
downloader.add('id2')
// Wait for first batch to fail
await new Promise((resolve) => setTimeout(resolve, 100))
// Verify error was logged
expect(consoleErrorSpy).toHaveBeenCalledWith(
'Batch processing failed:',
expect.any(Error)
)
// Try to add more items after the failure
// These should be ignored since the queue is now disposed
downloader.add('id3')
downloader.add('id4')
downloader.add('id5')
// Wait a bit more to ensure no additional processing attempts
await new Promise((resolve) => setTimeout(resolve, 100))
// Note: The batching queue might make multiple attempts before disposal
// The key is that disposal should still work regardless of how many calls were made
expect(fetchMocker).toHaveBeenCalled()
// Critical test: disposal should complete without hanging
const start = Date.now()
await downloader.disposeAsync()
const elapsed = Date.now() - start
// Disposal should be quick (under 1 second) and not hang
expect(elapsed).toBeLessThan(1000)
// Verify that the results queue can also be disposed properly
await gathered.disposeAsync()
} finally {
consoleErrorSpy.mockRestore()
}
})
})
@@ -146,4 +146,63 @@ describe('BatchingQueue', () => {
await queue.disposeAsync()
}
})
test('should stop processing when exception occurs in processFunction without disposal', async () => {
const processSpy = vi.fn()
const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {})
let callCount = 0
const queue = new BatchingQueue({
batchSize: 2,
maxWaitTime: 100,
processFunction: async (batch: string[]): Promise<void> => {
callCount++
processSpy(batch)
// Add small delay to simulate async processing
await new Promise((resolve) => setTimeout(resolve, 10))
// Throw error on first batch to simulate exception
if (callCount === 1) {
throw new Error('Processing failed')
}
}
})
try {
// Add first batch that will cause exception
queue.add('key1', 'item1')
queue.add('key2', 'item2')
// Wait for first batch to be processed and fail
await new Promise((resolve) => setTimeout(resolve, 200))
// Verify first batch was attempted and error was logged
expect(processSpy).toHaveBeenCalledTimes(1)
expect(processSpy).toHaveBeenCalledWith(['item1', 'item2'])
expect(consoleErrorSpy).toHaveBeenCalledWith(
'Batch processing failed:',
expect.any(Error)
)
// Verify queue is marked as errored due to the exception
expect(queue.isDisposed()).toBe(false)
expect(queue.isErrored()).toBe(true)
// Add more items after the exception
queue.add('key3', 'item3')
queue.add('key4', 'item4')
// Wait to see if second batch gets processed (it shouldn't due to errored state)
await new Promise((resolve) => setTimeout(resolve, 200))
// Verify no further processing happened - the queue stopped due to exception
expect(processSpy).toHaveBeenCalledTimes(1) // Still only 1 call
expect(queue.count()).toBe(0) // Items were not added due to errored state
} finally {
consoleErrorSpy.mockRestore()
// Note: Not calling disposeAsync() here since the queue is already disposed
// This demonstrates the problematic behavior - the queue disposed itself due to exception
}
})
})
@@ -11,9 +11,10 @@ export default class BatchingQueue<T> {
#batchSize: number
#processFunction: (batch: T[]) => Promise<void>
#timeoutId: ReturnType<typeof setTimeout> | null = null
#isProcessing = false
#disposed = false
#isProcessing = false
#isDisposed = false
#isErrored = false
#batchTimeout: number
// Helper methods for cross-environment timeout handling
@@ -46,7 +47,7 @@ export default class BatchingQueue<T> {
}
async disposeAsync(): Promise<void> {
this.#disposed = true
this.#isDisposed = true
if (this.#timeoutId) {
this.#getClearTimeoutFn()(this.#timeoutId)
this.#timeoutId = null
@@ -67,19 +68,19 @@ export default class BatchingQueue<T> {
}
add(key: string, item: T): void {
if (this.#disposed) return
if (this.#isDisposed || this.#isErrored) return
this.#queue.enqueue(key, item)
this.#addCheck()
}
addAll(keys: string[], items: T[]): void {
if (this.#disposed) return
if (this.#isDisposed || this.#isErrored) return
this.#queue.enqueueAll(keys, items)
this.#addCheck()
}
#addCheck(): void {
if (this.#disposed) return
if (this.#isDisposed) return
if (this.#queue.size >= this.#batchSize) {
// Fire and forget, no need to await
// eslint-disable-next-line @typescript-eslint/no-floating-promises
@@ -99,7 +100,7 @@ export default class BatchingQueue<T> {
this.#timeoutId = null
}
if (this.#isProcessing || this.#queue.size === 0) {
if (this.#isErrored || this.#isProcessing || this.#queue.size === 0) {
return
}
this.#isProcessing = true
@@ -111,7 +112,7 @@ export default class BatchingQueue<T> {
await this.#processFunction(batchToProcess)
} catch (error) {
console.error('Batch processing failed:', error)
this.#disposed = true
this.#isErrored = true
} finally {
this.#isProcessing = false
}
@@ -127,7 +128,11 @@ export default class BatchingQueue<T> {
}
isDisposed(): boolean {
return this.#disposed
return this.#isDisposed
}
isErrored(): boolean {
return this.#isErrored
}
#getBatch(batchSize: number): T[] {