Merge pull request #5608 from specklesystems/adam/fix-batchingqueue-error

OL2 (fix) Fixes tests and batchingqueue during exceptions and adds isErrored concept to drain
This commit is contained in:
Adam Hathcock
2025-10-01 08:48:48 +01:00
committed by GitHub
8 changed files with 286 additions and 27 deletions
@@ -5,6 +5,7 @@ import { IndexedDatabase } from './stages/indexedDatabase.js'
import { IDBFactory, IDBKeyRange } from 'fake-indexeddb'
import { MemoryDatabase } from './stages/memory/memoryDatabase.js'
import { MemoryDownloader } from './stages/memory/memoryDownloader.js'
import { DefermentManager } from '../deferment/defermentManager.js'
describe('objectloader2', () => {
test('can get a root object from cache', async () => {
@@ -17,6 +18,7 @@ describe('objectloader2', () => {
const loader = new ObjectLoader2({
rootId,
downloader,
deferments: new DefermentManager(() => {}),
database: new IndexedDatabase({
indexedDB: new IDBFactory(),
keyRange: IDBKeyRange
@@ -37,6 +39,7 @@ describe('objectloader2', () => {
const loader = new ObjectLoader2({
rootId,
downloader,
deferments: new DefermentManager(() => {}),
database: new IndexedDatabase({
indexedDB: new IDBFactory(),
keyRange: IDBKeyRange
@@ -58,6 +61,7 @@ describe('objectloader2', () => {
const loader = new ObjectLoader2({
rootId,
downloader,
deferments: new DefermentManager(() => {}),
database: new IndexedDatabase({
indexedDB: new IDBFactory(),
keyRange: IDBKeyRange
@@ -97,7 +101,8 @@ describe('objectloader2', () => {
const loader = new ObjectLoader2({
rootId: root.baseId,
downloader: new MemoryDownloader(rootId, records),
database: new MemoryDatabase({ items: records })
database: new MemoryDatabase({ items: records }),
deferments: new DefermentManager(() => {})
})
const r = []
@@ -139,7 +144,8 @@ describe('objectloader2', () => {
const loader = new ObjectLoader2({
rootId: root.baseId,
downloader: new MemoryDownloader(rootId, records),
database: new MemoryDatabase({ items: records })
database: new MemoryDatabase({ items: records }),
deferments: new DefermentManager(() => {})
})
const r = []
const obj = loader.getObject({ id: child1.baseId })
@@ -180,7 +186,8 @@ describe('objectloader2', () => {
database: new IndexedDatabase({
indexedDB: new IDBFactory(),
keyRange: IDBKeyRange
})
}),
deferments: new DefermentManager(() => {})
})
const x = await loader.getRootObject()
await loader.disposeAsync()
@@ -10,7 +10,7 @@ describe('CacheReader testing', () => {
const i1: Item = { baseId: 'id1', base: { id: 'id', speckle_type: 'type' } }
const cache = new MemoryCache({ maxSizeInMb: 1, ttlms: 1 }, () => {})
const deferments = new DefermentManager(cache, () => {})
const deferments = new DefermentManager(() => {}, cache)
const cacheReader = new CacheReader(
new MemoryDatabase({
items: new Map<string, Base>([[i1.baseId, i1.base!]])
@@ -42,7 +42,7 @@ describe('CacheWriter', () => {
ttlms: 60000
}
memoryCache = new MemoryCache(memoryCacheOptions, logger)
defermentManager = new DefermentManager(memoryCache, logger)
defermentManager = new DefermentManager(logger, memoryCache)
requestItemMock = vi.fn()
options = {
@@ -257,4 +257,126 @@ describe('downloader', () => {
})
await downloader.disposeAsync()
})
test('nothing is frozen when validateResponse returns 403', async () => {
const fetchMocker = createFetchMock(vi)
const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {})
// Mock a 403 Forbidden response
fetchMocker.mockResponseOnce('', { status: 403, statusText: 'Forbidden' })
const gathered = new AsyncGeneratorQueue<Item>()
const downloader = new ServerDownloader({
serverUrl: 'http://speckle.test',
streamId: 'streamId',
objectId: 'objectId',
token: 'invalid-token',
fetch: fetchMocker,
logger: (): void => {}
})
try {
downloader.initialize({
results: gathered,
total: 2,
maxDownloadBatchWait: 100
})
// Add items to trigger batch processing
downloader.add('id1')
downloader.add('id2')
// Wait for the batch to be processed and fail with 403
await new Promise((resolve) => setTimeout(resolve, 200))
// Verify that the error was logged (indicating the batch processing failed)
expect(consoleErrorSpy).toHaveBeenCalledWith(
'Batch processing failed:',
expect.any(Error)
)
// The key test: verify we can still dispose the downloader properly
// This ensures the system isn't frozen and can clean up resources
const disposePromise = downloader.disposeAsync()
// Add a timeout to ensure disposal doesn't hang indefinitely
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('Disposal timed out')), 5000)
})
// This should complete without timing out or throwing
await Promise.race([disposePromise, timeoutPromise])
// Additional verification: the batching queue should be marked as disposed
// We can't directly access the private field, but we can verify disposal completed
expect(true).toBe(true) // If we reach here, disposal succeeded
} finally {
consoleErrorSpy.mockRestore()
}
})
test('system remains functional after 403 error and can be properly cleaned up', async () => {
const fetchMocker = createFetchMock(vi)
const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {})
// First call returns 403, subsequent calls should not be made due to queue disposal
fetchMocker.mockResponseOnce('', { status: 403, statusText: 'Forbidden' })
const gathered = new AsyncGeneratorQueue<Item>()
const downloader = new ServerDownloader({
serverUrl: 'http://speckle.test',
streamId: 'streamId',
objectId: 'objectId',
token: 'invalid-token',
fetch: fetchMocker,
logger: (): void => {}
})
try {
downloader.initialize({
results: gathered,
total: 5,
maxDownloadBatchWait: 50
})
// Add first batch that will trigger the 403 error
downloader.add('id1')
downloader.add('id2')
// Wait for first batch to fail
await new Promise((resolve) => setTimeout(resolve, 100))
// Verify error was logged
expect(consoleErrorSpy).toHaveBeenCalledWith(
'Batch processing failed:',
expect.any(Error)
)
// Try to add more items after the failure
// These should be ignored since the queue is now disposed
downloader.add('id3')
downloader.add('id4')
downloader.add('id5')
// Wait a bit more to ensure no additional processing attempts
await new Promise((resolve) => setTimeout(resolve, 100))
// Note: The batching queue might make multiple attempts before disposal
// The key is that disposal should still work regardless of how many calls were made
expect(fetchMocker).toHaveBeenCalled()
// Critical test: disposal should complete without hanging
const start = Date.now()
await downloader.disposeAsync()
const elapsed = Date.now() - start
// Disposal should be quick (under 1 second) and not hang
expect(elapsed).toBeLessThan(1000)
// Verify that the results queue can also be disposed properly
await gathered.disposeAsync()
} finally {
consoleErrorSpy.mockRestore()
}
})
})
@@ -9,7 +9,8 @@ describe('DefermentManager', () => {
const mockLogger: CustomLogger = vi.fn()
const mockCache = {
get: vi.fn(),
add: vi.fn()
add: vi.fn(),
dispose: vi.fn()
} as unknown as MemoryCache
const defermentManager = new DefermentManager(mockLogger, mockCache)
expect(defermentManager).toBeDefined()
@@ -81,7 +82,8 @@ describe('DefermentManager', () => {
const add = vi.fn()
const mockCache = {
get,
add
add,
dispose: vi.fn()
} as unknown as MemoryCache
const defermentManager = new DefermentManager(mockLogger, mockCache)
@@ -165,7 +167,8 @@ describe('DefermentManager', () => {
const add = vi.fn()
const mockCache = {
get,
add
add,
dispose: vi.fn()
} as unknown as MemoryCache
const defermentManager = new DefermentManager(mockLogger, mockCache)
const requestItem = vi.fn()
@@ -189,7 +192,8 @@ describe('DefermentManager', () => {
const add = vi.fn()
const mockCache = {
get,
add
add,
dispose: vi.fn()
} as unknown as MemoryCache
const defermentManager = new DefermentManager(mockLogger, mockCache)
@@ -205,7 +209,8 @@ describe('DefermentManager', () => {
const add = vi.fn()
const mockCache = {
get,
add
add,
dispose: vi.fn()
} as unknown as MemoryCache
const defermentManager = new DefermentManager(mockLogger, mockCache)
@@ -17,7 +17,7 @@ describe('BatchingQueue disposal', () => {
await queue.disposeAsync()
expect(processFunction).not.toHaveBeenCalled()
expect(processFunction).toHaveBeenCalled()
expect(queue.count()).toBe(0)
expect(queue.isDisposed()).toBe(true)
})
@@ -146,4 +146,125 @@ describe('BatchingQueue', () => {
await queue.disposeAsync()
}
})
test('should handle processFunction throwing an exception during flush and is disposed', async () => {
const errorMessage = 'Process function failed'
const processFunction = vi.fn().mockRejectedValue(new Error(errorMessage))
const queue = new BatchingQueue<{ id: string }>({
batchSize: 5,
maxWaitTime: 1000,
processFunction
})
const items = Array.from({ length: 3 }, (_, i) => ({ id: `item-${i}` }))
items.forEach((item) => queue.add(item.id, item))
expect(queue.count()).toBe(3)
// flush should not throw even if processFunction rejects
await expect(queue.flush()).resolves.not.toThrow()
expect(processFunction).toHaveBeenCalled()
expect(queue.count()).toBe(0)
expect(queue.isDisposed()).toBe(false)
expect(queue.isErrored()).toBe(true)
// Add more items after the exception
queue.add('key3', { id: `item-3` })
queue.add('key4', { id: `item-4` })
// Wait to see if second batch gets processed (it shouldn't due to errored state)
await new Promise((resolve) => setTimeout(resolve, 200))
expect(queue.count()).toBe(0) // Items were not added due to errored state
await queue.disposeAsync()
})
test('should drain remaining items when disposed', async () => {
const processSpy = vi.fn()
const queue = new BatchingQueue({
batchSize: 5, // Large batch size to prevent automatic processing
maxWaitTime: 10000, // Long timeout to prevent timeout-based processing
processFunction: async (batch: string[]): Promise<void> => {
await new Promise((resolve) => setTimeout(resolve, 10))
processSpy(batch)
}
})
// Add items that won't trigger automatic processing (less than batch size)
queue.add('key1', 'item1')
queue.add('key2', 'item2')
queue.add('key3', 'item3')
// Verify items are in queue but haven't been processed yet
expect(queue.count()).toBe(3)
expect(processSpy).not.toHaveBeenCalled()
// Dispose should drain the remaining items
await queue.disposeAsync()
// Verify all items were processed during disposal
expect(processSpy).toHaveBeenCalledTimes(1)
expect(processSpy).toHaveBeenCalledWith(['item1', 'item2', 'item3'])
expect(queue.count()).toBe(0)
expect(queue.isDisposed()).toBe(true)
})
test('should drain items even with ongoing processing during dispose', async () => {
const processSpy = vi.fn()
let firstBatchStarted = false
let allowFirstBatchToComplete: (() => void) | null = null
const queue = new BatchingQueue({
batchSize: 2,
maxWaitTime: 100,
processFunction: async (batch: string[]): Promise<void> => {
processSpy(batch)
// Make the first batch wait for our signal
if (!firstBatchStarted) {
firstBatchStarted = true
await new Promise<void>((resolve) => {
allowFirstBatchToComplete = resolve
})
} else {
// Other batches process normally
await new Promise((resolve) => setTimeout(resolve, 10))
}
}
})
// Add first batch that will trigger processing but will be blocked
queue.add('key1', 'item1')
queue.add('key2', 'item2')
// Wait for first batch to start processing and allowFirstBatchToComplete to be assigned
await new Promise((resolve) => setTimeout(resolve, 50))
expect(firstBatchStarted).toBe(true)
expect(processSpy).toHaveBeenCalledTimes(1)
expect(allowFirstBatchToComplete).not.toBeNull()
// Add more items while first batch is still processing
queue.add('key3', 'item3')
queue.add('key4', 'item4')
// Verify the additional items are queued
expect(queue.count()).toBe(2)
// Start disposal (this should wait for ongoing processing and then drain)
const disposePromise = queue.disposeAsync()
// Allow the first batch to complete
allowFirstBatchToComplete!()
// Wait for disposal to complete
await disposePromise
// Verify all batches were processed
expect(processSpy).toHaveBeenCalledTimes(2)
expect(processSpy).toHaveBeenCalledWith(['item1', 'item2'])
expect(processSpy).toHaveBeenCalledWith(['item3', 'item4'])
expect(queue.count()).toBe(0)
expect(queue.isDisposed()).toBe(true)
})
})
@@ -11,9 +11,10 @@ export default class BatchingQueue<T> {
#batchSize: number
#processFunction: (batch: T[]) => Promise<void>
#timeoutId: ReturnType<typeof setTimeout> | null = null
#isProcessing = false
#disposed = false
#isProcessing = false
#isDisposed = false
#isErrored = false
#batchTimeout: number
// Helper methods for cross-environment timeout handling
@@ -46,7 +47,8 @@ export default class BatchingQueue<T> {
}
async disposeAsync(): Promise<void> {
this.#disposed = true
if (this.#isDisposed) return
this.#isDisposed = true
if (this.#timeoutId) {
this.#getClearTimeoutFn()(this.#timeoutId)
this.#timeoutId = null
@@ -62,56 +64,54 @@ export default class BatchingQueue<T> {
// After any ongoing flush is completed, there might be items in the queue.
// We should flush them.
if (this.#queue.size > 0) {
await this.#flush()
await this.flush()
}
}
add(key: string, item: T): void {
if (this.#disposed) return
if (this.#isDisposed || this.#isErrored) return
this.#queue.enqueue(key, item)
this.#addCheck()
}
addAll(keys: string[], items: T[]): void {
if (this.#disposed) return
if (this.#isDisposed || this.#isErrored) return
this.#queue.enqueueAll(keys, items)
this.#addCheck()
}
#addCheck(): void {
if (this.#disposed) return
if (this.#isDisposed) return
if (this.#queue.size >= this.#batchSize) {
// Fire and forget, no need to await
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.#flush()
this.flush()
} else {
if (this.#timeoutId) {
this.#getClearTimeoutFn()(this.#timeoutId)
}
// eslint-disable-next-line @typescript-eslint/no-misused-promises
this.#timeoutId = this.#getSetTimeoutFn()(() => this.#flush(), this.#batchTimeout)
this.#timeoutId = this.#getSetTimeoutFn()(() => this.flush(), this.#batchTimeout)
}
}
async #flush(): Promise<void> {
async flush(): Promise<void> {
if (this.#timeoutId) {
this.#getClearTimeoutFn()(this.#timeoutId)
this.#timeoutId = null
}
if (this.#isProcessing || this.#queue.size === 0) {
if (this.#isErrored || this.#isProcessing || this.#queue.size === 0) {
return
}
this.#isProcessing = true
const batchToProcess = this.#getBatch(this.#batchSize)
if (this.#disposed) return
try {
const batchToProcess = this.#getBatch(this.#batchSize)
await this.#processFunction(batchToProcess)
} catch (error) {
console.error('Batch processing failed:', error)
this.#disposed = true
this.#isErrored = true
} finally {
this.#isProcessing = false
}
@@ -127,7 +127,11 @@ export default class BatchingQueue<T> {
}
isDisposed(): boolean {
return this.#disposed
return this.#isDisposed
}
isErrored(): boolean {
return this.#isErrored
}
#getBatch(batchSize: number): T[] {