Change and fix BatchingQueue implementation (#5044)

* BatchingQueues allowed processing to happen more than one at a time.  Simplify and don't allow this.

* Use proper logging

* clean up and fix tests

* add more batchingqueue tests

* fixed according to AI

* fix: linting issues

* make deferment more intelligent and fix logging

* add to deferment tracking when bulk adding reads

* format

* fix tests
This commit is contained in:
Adam Hathcock
2025-07-10 10:13:58 +01:00
committed by GitHub
parent 1babe5859d
commit f86893935f
11 changed files with 257 additions and 119 deletions
@@ -34,8 +34,8 @@ export class ObjectLoader2 {
maxCacheReadSize: 10_000,
maxCacheWriteSize: 10_000,
maxWriteQueueSize: 40_000,
maxCacheBatchWriteWait: 3_000,
maxCacheBatchReadWait: 3_000
maxCacheBatchWriteWait: 1_000,
maxCacheBatchReadWait: 1_000
}
this.#gathered = new AsyncGeneratorQueue()
@@ -56,10 +56,10 @@ export class ObjectLoader2 {
await Promise.all([
this.#gathered.disposeAsync(),
this.#downloader.disposeAsync(),
this.#cacheReader.disposeAsync(),
this.#cacheWriter.disposeAsync()
])
this.#deferments.dispose()
this.#cacheReader.dispose()
}
async getRootObject(): Promise<Item | undefined> {
@@ -30,6 +30,6 @@ describe('CacheReader testing', () => {
const base = await objPromise
expect(base).toMatchSnapshot()
await cacheReader.disposeAsync()
cacheReader.dispose()
})
})
@@ -31,11 +31,12 @@ export class CacheReader {
this.#notFoundQueue = notFoundQueue
}
async getObject(params: { id: string }): Promise<Base> {
if (!this.#defermentManager.isDeferred(params.id)) {
getObject(params: { id: string }): Promise<Base> {
const [p, b] = this.#defermentManager.defer({ id: params.id })
if (!b) {
this.#requestItem(params.id)
}
return await this.#defermentManager.defer({ id: params.id })
return p
}
#createReadQueue(): void {
@@ -57,10 +58,15 @@ export class CacheReader {
requestAll(keys: string[]): void {
this.#createReadQueue()
for (const key of keys) {
this.#defermentManager.trackDefermentRequest(key)
}
this.#readQueue?.addAll(keys, keys)
}
#processBatch = async (batch: string[]): Promise<void> => {
const start = performance.now()
const items = await this.#database.getAll(batch)
for (let i = 0; i < items.length; i++) {
const item = items[i]
@@ -71,9 +77,10 @@ export class CacheReader {
this.#notFoundQueue?.add(batch[i])
}
}
this.#logger('readBatch: left, time', items.length, performance.now() - start)
}
async disposeAsync(): Promise<void> {
await this.#readQueue?.disposeAsync()
dispose(): void {
this.#readQueue?.dispose()
}
}
@@ -30,17 +30,25 @@ export class CacheWriter implements Queue<Item> {
this.#writeQueue = new BatchingQueue({
batchSize: this.#options.maxCacheWriteSize,
maxWaitTime: this.#options.maxCacheBatchWriteWait,
processFunction: (batch: Item[]): Promise<void> =>
this.#database.saveBatch({ batch })
processFunction: async (batch: Item[]): Promise<void> => {
await this.writeAll(batch)
}
})
}
this.#defermentManager.undefer(item)
this.#writeQueue.add(item.baseId, item)
}
async writeAll(items: Item[]): Promise<void> {
const start = performance.now()
await this.#database.saveBatch({ batch: items })
this.#logger('writeBatch: left, time', items.length, performance.now() - start)
}
async disposeAsync(): Promise<void> {
await this.#writeQueue?.disposeAsync()
this.#writeQueue?.dispose()
this.#disposed = true
return Promise.resolve()
}
get isDisposed(): boolean {
@@ -121,7 +121,8 @@ export default class IndexedDatabase implements Database {
async disposeAsync(): Promise<void> {
this.#cacheDB?.close()
this.#cacheDB = undefined
await this.#writeQueue?.disposeAsync()
this.#writeQueue?.dispose()
this.#writeQueue = undefined
return Promise.resolve()
}
}
@@ -10,14 +10,12 @@ describe('DefermentManager disposal', () => {
base: { id, speckle_type: 'test' }
})
it('should throw on get/defer/undefer after dispose', async () => {
it('should throw on get/defer/undefer after dispose', () => {
const manager = new DefermentManager(options)
manager.dispose()
expect(() => manager.get('a')).toThrow('DefermentManager is disposed')
expect(() => manager.undefer(makeItem('a'))).toThrow('DefermentManager is disposed')
await expect(manager.defer({ id: 'a' })).rejects.toThrow(
'DefermentManager is disposed'
)
expect(() => manager.defer({ id: 'a' })).toThrow('DefermentManager is disposed')
})
it('dispose is idempotent', () => {
@@ -6,7 +6,8 @@ import { DefermentManager } from './defermentManager.js'
describe('deferments', () => {
test('defer one', async () => {
const deferments = new DefermentManager({ maxSizeInMb: 1, ttlms: 1 })
const x = deferments.defer({ id: 'id' })
const [x, alreadyDeferred] = deferments.defer({ id: 'id' })
expect(alreadyDeferred).toBeFalsy()
expect(x).toBeInstanceOf(Promise)
deferments.undefer({ baseId: 'id', base: { id: 'id', speckle_type: 'type' } })
const b = await x
@@ -17,7 +18,8 @@ describe('deferments', () => {
const now = 1
const deferments = new DefermentManager({ maxSizeInMb: 1, ttlms: 1 })
deferments['now'] = (): number => now
const x = deferments.defer({ id: 'id' })
const [x, alreadyDeferred] = deferments.defer({ id: 'id' })
expect(alreadyDeferred).toBeFalsy()
expect(x).toBeInstanceOf(Promise)
const d = deferments.get('id')
expect(d).toBeDefined()
@@ -22,23 +22,19 @@ export class DefermentManager {
return Date.now()
}
isDeferred(id: string): boolean {
return this.deferments.has(id)
}
get(id: string): DeferredBase | undefined {
if (this.disposed) throw new Error('DefermentManager is disposed')
return this.deferments.get(id)
}
async defer(params: { id: string }): Promise<Base> {
defer(params: { id: string }): [Promise<Base>, boolean] {
if (this.disposed) throw new Error('DefermentManager is disposed')
this.trackDefermentRequest(params.id)
const now = this.now()
const deferredBase = this.deferments.get(params.id)
if (deferredBase) {
deferredBase.setAccess(now)
return deferredBase.getPromise()
return [deferredBase.getPromise(), true]
}
const notYetFound = new DeferredBase(
this.options.ttlms,
@@ -46,10 +42,10 @@ export class DefermentManager {
now + this.options.ttlms
)
this.deferments.set(params.id, notYetFound)
return notYetFound.getPromise()
return [notYetFound.getPromise(), false]
}
private trackDefermentRequest(id: string): void {
trackDefermentRequest(id: string): void {
const request = this.totalDefermentRequests.get(id)
if (request) {
this.totalDefermentRequests.set(id, request + 1)
@@ -131,7 +127,7 @@ export class DefermentManager {
//we do not clean it up to allow the requests to resolve
const requestCount = this.totalDefermentRequests.get(deferredBase.getId())
if (requestCount && requestCount > 1) {
return
break
}
this.currentSize -= deferredBase.getSize() || 0
this.deferments.delete(deferredBase.getId())
@@ -0,0 +1,146 @@
import { describe, test, expect, beforeEach, afterEach, vi } from 'vitest'
import BatchingQueue from './batchingQueue.js'
describe('BatchingQueue', () => {
let queue: BatchingQueue<string>
beforeEach(() => {
queue = new BatchingQueue({
batchSize: 3,
maxWaitTime: 100,
processFunction: async (): Promise<void> => {
await new Promise((resolve) => setTimeout(resolve, 0))
}
})
})
afterEach(() => {
queue.dispose()
})
test('should add items and process them in batches', async () => {
const processSpy = vi.fn()
queue = new BatchingQueue({
batchSize: 2,
maxWaitTime: 100,
processFunction: async (batch: string[]): Promise<void> => {
await new Promise((resolve) => setTimeout(resolve, 0))
processSpy(batch)
}
})
queue.add('key1', 'item1')
queue.add('key2', 'item2')
await new Promise((resolve) => setTimeout(resolve, 200))
expect(processSpy).toHaveBeenCalledTimes(1)
expect(processSpy).toHaveBeenCalledWith(['item1', 'item2'])
})
test('should process items after timeout if batch size is not reached', async () => {
const processSpy = vi.fn()
queue = new BatchingQueue({
batchSize: 5,
maxWaitTime: 100,
processFunction: async (batch: string[]): Promise<void> => {
await new Promise((resolve) => setTimeout(resolve, 0))
processSpy(batch)
}
})
queue.add('key1', 'item1')
queue.add('key2', 'item2')
await new Promise((resolve) => setTimeout(resolve, 200))
expect(processSpy).toHaveBeenCalledTimes(1)
expect(processSpy).toHaveBeenCalledWith(['item1', 'item2'])
})
test('should not process items if disposed', async () => {
const processSpy = vi.fn()
queue = new BatchingQueue({
batchSize: 2,
maxWaitTime: 10000,
processFunction: async (batch: string[]): Promise<void> => {
await new Promise((resolve) => setTimeout(resolve, 0))
processSpy(batch)
}
})
queue.add('key1', 'item1')
queue.dispose()
await new Promise((resolve) => setTimeout(resolve, 200))
expect(processSpy).not.toHaveBeenCalled()
})
test('should handle multiple batches correctly', async () => {
const processSpy = vi.fn()
queue = new BatchingQueue({
batchSize: 2,
maxWaitTime: 100,
processFunction: async (batch: string[]): Promise<void> => {
await new Promise((resolve) => setTimeout(resolve, 0))
processSpy(batch)
}
})
queue.add('key1', 'item1')
queue.add('key2', 'item2')
queue.add('key3', 'item3')
queue.add('key4', 'item4')
await new Promise((resolve) => setTimeout(resolve, 200))
expect(processSpy).toHaveBeenCalledTimes(2)
expect(processSpy).toHaveBeenCalledWith(['item1', 'item2'])
expect(processSpy).toHaveBeenCalledWith(['item3', 'item4'])
})
test('should retrieve items by key', () => {
queue.add('key1', 'item1')
queue.add('key2', 'item2')
expect(queue.get('key1')).toBe('item1')
expect(queue.get('key2')).toBe('item2')
expect(queue.get('key3')).toBeUndefined()
})
test('should return correct count of items', () => {
expect(queue.count()).toBe(0)
queue.add('key1', 'item1')
queue.add('key2', 'item2')
expect(queue.count()).toBe(2)
})
test('should not process items if already processing', async () => {
const processSpy = vi.fn()
queue = new BatchingQueue({
batchSize: 2,
maxWaitTime: 100,
processFunction: async (batch: string[]): Promise<void> => {
processSpy(batch)
await new Promise((resolve) => setTimeout(resolve, 300))
}
})
queue.add('key1', 'item1')
queue.add('key2', 'item2')
queue.add('key3', 'item3')
await new Promise((resolve) => setTimeout(resolve, 200))
expect(processSpy).toHaveBeenCalledTimes(1)
expect(processSpy).toHaveBeenCalledWith(['item1', 'item2'])
await new Promise((resolve) => setTimeout(resolve, 200))
expect(processSpy).toHaveBeenCalledTimes(2)
expect(processSpy).toHaveBeenCalledWith(['item3'])
})
})
@@ -1,41 +1,100 @@
import { CustomLogger } from '../types/functions.js'
import KeyedQueue from './keyedQueue.js'
export default class BatchingQueue<T> {
#queue: KeyedQueue<string, T> = new KeyedQueue<string, T>()
#batchSize: number
#processFunction: (batch: T[]) => Promise<void>
#timeoutId: ReturnType<typeof setTimeout> | null = null
#isProcessing = false
#logger: CustomLogger
#baseInterval: number
#minInterval: number
#maxInterval: number
#processingLoop: Promise<void>
#disposed = false
#batchTimeout: number
// Helper methods for cross-environment timeout handling
#getSetTimeoutFn(): typeof setTimeout {
// First check for window object (browser), then fallback to global (node), then just use setTimeout
return typeof window !== 'undefined'
? window.setTimeout.bind(window)
: typeof global !== 'undefined'
? global.setTimeout
: setTimeout
}
#getClearTimeoutFn(): typeof clearTimeout {
// First check for window object (browser), then fallback to global (node), then just use clearTimeout
return typeof window !== 'undefined'
? window.clearTimeout.bind(window)
: typeof global !== 'undefined'
? global.clearTimeout
: clearTimeout
}
constructor(params: {
batchSize: number
maxWaitTime?: number
maxWaitTime: number
processFunction: (batch: T[]) => Promise<void>
logger?: CustomLogger
}) {
this.#batchSize = params.batchSize
this.#baseInterval = Math.min(params.maxWaitTime ?? 200, 200) // Initial batch time (ms)
this.#minInterval = Math.min(params.maxWaitTime ?? 100, 100) // Minimum batch time
this.#maxInterval = Math.min(params.maxWaitTime ?? 3000, 3000) // Maximum batch time
this.#processFunction = params.processFunction
this.#processingLoop = this.#loop()
this.#batchTimeout = params.maxWaitTime
this.#logger = params.logger || ((): void => {})
}
async disposeAsync(): Promise<void> {
dispose(): void {
this.#disposed = true
await this.#processingLoop
if (this.#timeoutId) {
this.#getClearTimeoutFn()(this.#timeoutId)
}
}
add(key: string, item: T): void {
this.#queue.enqueue(key, item)
this.#addCheck()
}
addAll(keys: string[], items: T[]): void {
this.#queue.enqueueAll(keys, items)
this.#addCheck()
}
#addCheck(): void {
if (this.#queue.size >= this.#batchSize) {
// Fire and forget, no need to await
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.#flush()
} else {
if (this.#timeoutId) {
this.#getClearTimeoutFn()(this.#timeoutId)
}
// eslint-disable-next-line @typescript-eslint/no-misused-promises
this.#timeoutId = this.#getSetTimeoutFn()(() => this.#flush(), this.#batchTimeout)
}
}
async #flush(): Promise<void> {
if (this.#timeoutId) {
this.#getClearTimeoutFn()(this.#timeoutId)
this.#timeoutId = null
}
if (this.#isProcessing || this.#queue.size === 0) {
return
}
this.#isProcessing = true
const batchToProcess = this.#getBatch(this.#batchSize)
try {
await this.#processFunction(batchToProcess)
} catch (error) {
this.#logger('Batch processing failed:', error)
} finally {
this.#isProcessing = false
}
this.#addCheck()
}
get(id: string): T | undefined {
@@ -53,37 +112,4 @@ export default class BatchingQueue<T> {
#getBatch(batchSize: number): T[] {
return this.#queue.spliceValues(0, Math.min(batchSize, this.#queue.size))
}
async #loop(): Promise<void> {
let interval = this.#baseInterval
while (!this.#disposed || this.#queue.size > 0) {
const startTime = performance.now()
if (this.#queue.size > 0) {
const batch = this.#getBatch(this.#batchSize)
//console.log('running with queue size of ' + this.#queue.length)
await this.#processFunction(batch)
}
if (this.#queue.size < this.#batchSize / 2) {
//refigure interval
const endTime = performance.now()
const duration = endTime - startTime
if (duration > interval) {
interval = Math.min(interval * 1.5, this.#maxInterval) // Increase if slow or empty
} else {
interval = Math.max(interval * 0.8, this.#minInterval) // Decrease if fast
}
/*console.log(
'queue is waiting ' +
interval / 1000 +
' with queue size of ' +
this.#queue.length
)*/
await this.#delay(interval)
}
}
}
#delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
}
}
@@ -1,46 +0,0 @@
import { Database } from '../core/interfaces.js'
import { CacheOptions } from '../core/options.js'
import { DefermentManager } from '../deferment/defermentManager.js'
import { Item } from '../types/types.js'
import BatchingQueue from './batchingQueue.js'
import Queue from './queue.js'
export class CacheWriter implements Queue<Item> {
#writeQueue: BatchingQueue<Item> | undefined
#database: Database
#defermentManager: DefermentManager
#options: CacheOptions
#disposed = false
constructor(
database: Database,
defermentManager: DefermentManager,
options: CacheOptions
) {
this.#database = database
this.#defermentManager = defermentManager
this.#options = options
}
add(item: Item): void {
if (!this.#writeQueue) {
this.#writeQueue = new BatchingQueue({
batchSize: this.#options.maxCacheWriteSize,
maxWaitTime: this.#options.maxCacheBatchWriteWait,
processFunction: (batch: Item[]): Promise<void> =>
this.#database.saveBatch({ batch })
})
}
this.#defermentManager.undefer(item)
this.#writeQueue.add(item.baseId, item)
}
async disposeAsync(): Promise<void> {
await this.#writeQueue?.disposeAsync()
this.#disposed = true
}
get isDisposed(): boolean {
return this.#disposed
}
}