(OL2) refactor read queue (#4948)

* Rename to saveBatch

* forgot a file

* first pass of cacheReader

* OL2 tests have infinite timeout

* OL2 refactor works

* fix for tests

* get rid of pumps and fix test

* lint fix

* redo mermaid diagrams

* add readme section on deferment
This commit is contained in:
Adam Hathcock
2025-06-17 08:24:05 +01:00
committed by GitHub
parent 75fc72b190
commit 2b828a5eeb
26 changed files with 268 additions and 445 deletions
+52 -15
View File
@@ -11,26 +11,51 @@ The main aim for the objectloader is:
## Architecture
To achieve increased concurrency, the different phases of the objectloader are divided into pools of workers with queues to feed them.
To achieve increased concurrency, the different phases of the objectloader are divided into pools of workers with queues to feed them. Below is a sequence diagram of the worker stages
```mermaid
sequenceDiagram
ObjectLoader2->>CacheReader: Root+Children
CacheReader-->>Database: Item exists?
CacheReader->>ObjectLoader2: Item exists in Cache
CacheReader->>Downloader: Item does not exist in Cache
Downloader->>CacheWriter: Save Item to Cache
CacheWriter->>Database: Write Item
Downloader->>ObjectLoader2: Item exists in Cache
```
The queues between stages are illustrated below with the concurrency
```mermaid
flowchart TD
start(Root Commit)
getIds(Parse Root to get all IDs)
cached{Cached?}
download(Download IDs)
save(Write to Cache)
load(Load from Cache)
generate(Generate to Viewer!)
start(ObjectLoader2)
cachedQueue(BatchingQueue)
cachedExists{Exists?}
downloadQueue(BatchingQueue)
download{Download Batch}
saveQueue(BatchingQueue)
save{Save to Database}
asyncGeneratorQueue(Aggregated Async Generator Queue)
loop(Generate to Viewer!)
start --> getIds
getIds --> cached
cached -->|Yes| load
cached -->|No| download
load --> generate
download --> generate
download --> save
start -- Add IDs --> cachedQueue
subgraph CacheReader
cachedQueue -- Checks by Batch --> cachedExists
end
cachedExists -->|Yes| asyncGeneratorQueue
subgraph Downloader
cachedExists -->|No| downloadQueue
downloadQueue --> download
end
subgraph CacheWriter
download -- add to queue --> saveQueue
saveQueue --> save
end
subgraph Viewer
download -- add to queue --> asyncGeneratorQueue
asyncGeneratorQueue -- Generator Loop --> loop
end
```
From the list of IDs, they are moved to a queue to be begin checking the cache from a pool of readers.
@@ -40,3 +65,15 @@ Results are then sent to the viewer, if found, else they're send to the download
The download queue is a batching mechanism that gets what is available, up to a limit or a timeout. The results are parsed and given to the generator and written to another queue.
The write cache queue is processed with a single writer to the indexeddb.
## Deferment
Deferment is what happens with the viewer does a random access to OL2. It returns a promise but it will be fulfilled later if the item isn't in memory.
The `DefermentManager` only holds a subset of the model in memory. If the requested item isn't in memory, then it enqueues the request into the general process laid out above.
When items are returned to the generator loop, `undefer` is called which caches the item in the manager as well as fulfills any outstanding promises.
A cleanup process is ran to be a singleton process. This process sorts by the total number of requests and the size. If anything falls outside the size window, then it is removed from the manager's memory cache.
The aim is to speed up random access while still getting items from the cache in batches. Items that are accessed randomly tend to be references in the model.
@@ -1,31 +0,0 @@
// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html
exports[`CachePump testing > write two items to queue use pumpItems that are NOT found 1`] = `[]`;
exports[`CachePump testing > write two items to queue use pumpItems that are NOT found 2`] = `
[
"id1",
"id2",
]
`;
exports[`CachePump testing > write two items to queue use pumpItems that are found 1`] = `
[
{
"base": {
"id": "id",
"speckle_type": "type",
},
"baseId": "id1",
},
{
"base": {
"id": "id",
"speckle_type": "type",
},
"baseId": "id2",
},
]
`;
exports[`CachePump testing > write two items to queue use pumpItems that are found 2`] = `[]`;
@@ -1,4 +1,6 @@
export default class BatchedPool<T> {
import Queue from './queue.js'
export default class BatchedPool<T> implements Queue<T> {
#queue: T[] = []
#concurrencyAndSizes: number[]
#processFunction: (batch: T[]) => Promise<void>
@@ -34,6 +34,10 @@ export default class BatchingQueue<T> {
this.#queue.enqueue(key, item)
}
addAll(keys: string[], items: T[]): void {
this.#queue.enqueueAll(keys, items)
}
get(id: string): T | undefined {
return this.#queue.get(id)
}
@@ -1,51 +0,0 @@
import { describe, expect, test } from 'vitest'
import { CachePump } from './cachePump.js'
import { Database } from '../operations/interfaces.js'
import AsyncGeneratorQueue from './asyncGeneratorQueue.js'
import { Item } from '../types/types.js'
import { DefermentManager } from './defermentManager.js'
const makeDatabase = (): Database =>
({
cacheSaveBatch: async (): Promise<void> => {},
getAll: async (): Promise<(Item | undefined)[]> => Promise.resolve([]),
getItem: async (): Promise<Item | undefined> => Promise.resolve(undefined),
disposeAsync: async (): Promise<void> => {}
} as unknown as Database)
const makeGathered = (): AsyncGeneratorQueue<Item> =>
({
add: () => {},
async *consume() {}
} as unknown as AsyncGeneratorQueue<Item>)
const makeDeferments = (): DefermentManager =>
({
undefer: () => {}
} as unknown as DefermentManager)
describe('CachePump disposal', () => {
test('disposeAsync is idempotent and always resolves', async () => {
const pump = new CachePump(makeDatabase(), makeGathered(), makeDeferments(), {
maxCacheWriteSize: 2,
maxCacheBatchWriteWait: 100,
maxCacheBatchReadWait: 1,
maxWriteQueueSize: 2,
maxCacheReadSize: 2
})
await pump.disposeAsync()
await expect(pump.disposeAsync()).resolves.toBeUndefined()
})
test('should not throw on add after dispose if writeQueue was never created', async () => {
const pump = new CachePump(makeDatabase(), makeGathered(), makeDeferments(), {
maxCacheWriteSize: 2,
maxCacheBatchWriteWait: 100,
maxCacheBatchReadWait: 1,
maxWriteQueueSize: 2,
maxCacheReadSize: 2
})
await pump.disposeAsync()
// Should not throw, but will not add anything
expect(() =>
pump.add({ baseId: 'a', base: { id: 'b', speckle_type: 'type' } })
).not.toThrow()
})
})
@@ -1,104 +0,0 @@
import { describe, expect, test } from 'vitest'
import { CachePump } from './cachePump.js'
import { Base, Item } from '../types/types.js'
import BufferQueue from './bufferQueue.js'
import AsyncGeneratorQueue from './asyncGeneratorQueue.js'
import { DefermentManager } from './defermentManager.js'
import { MemoryDatabase } from '../operations/databases/memoryDatabase.js'
import { Database } from '../operations/interfaces.js'
describe('CachePump testing', () => {
test('write two items to queue use pumpItems that are NOT found', async () => {
const i1: Item = { baseId: 'id1', base: { id: 'id', speckle_type: 'type' } }
const i2: Item = { baseId: 'id2', base: { id: 'id', speckle_type: 'type' } }
const gathered = new AsyncGeneratorQueue<Item>()
const deferments = new DefermentManager({ maxSizeInMb: 1, ttlms: 1 })
const cachePump = new CachePump(new MemoryDatabase({}), gathered, deferments, {
maxCacheReadSize: 1,
maxCacheWriteSize: 1,
maxCacheBatchWriteWait: 1,
maxCacheBatchReadWait: 1,
maxWriteQueueSize: 1
})
const foundItems = new BufferQueue<Item>()
const notFoundItems = new BufferQueue<string>()
await cachePump.pumpItems({
ids: [i1.baseId, i2.baseId],
foundItems,
notFoundItems
})
expect(foundItems.values()).toMatchSnapshot()
expect(notFoundItems.values()).toMatchSnapshot()
await cachePump.disposeAsync()
})
test('write two items to queue use pumpItems that are found', async () => {
const i1: Item = { baseId: 'id1', base: { id: 'id', speckle_type: 'type' } }
const i2: Item = { baseId: 'id2', base: { id: 'id', speckle_type: 'type' } }
const db = new Map<string, Base>()
db.set(i1.baseId, i1.base)
db.set(i2.baseId, i2.base)
const gathered = new AsyncGeneratorQueue<Item>()
const deferments = new DefermentManager({ maxSizeInMb: 1, ttlms: 1 })
const cachePump = new CachePump(
new MemoryDatabase({ items: db }),
gathered,
deferments,
{
maxCacheReadSize: 1,
maxCacheWriteSize: 1,
maxCacheBatchWriteWait: 1,
maxCacheBatchReadWait: 1,
maxWriteQueueSize: 1
}
)
const foundItems = new BufferQueue<Item>()
const notFoundItems = new BufferQueue<string>()
await cachePump.pumpItems({
ids: [i1.baseId, i2.baseId],
foundItems,
notFoundItems
})
expect(foundItems.values()).toMatchSnapshot()
expect(notFoundItems.values()).toMatchSnapshot()
await cachePump.disposeAsync()
})
test('can dispose while waiting and not wait', async () => {
const i1: Item = { baseId: 'id1', base: { id: 'id', speckle_type: 'type' } }
const i2: Item = { baseId: 'id2', base: { id: 'id', speckle_type: 'type' } }
const db: Database = {
getAll: async () => Promise.resolve([]),
disposeAsync: async (): Promise<void> => {}
} as unknown as Database
const gathered = new AsyncGeneratorQueue<Item>()
const deferments = new DefermentManager({ maxSizeInMb: 1, ttlms: 1 })
const cachePump = new CachePump(db, gathered, deferments, {
maxCacheReadSize: 1,
maxCacheWriteSize: 1,
maxCacheBatchWriteWait: 1,
maxCacheBatchReadWait: 1,
maxWriteQueueSize: 1
})
const foundItems = new BufferQueue<Item>()
const notFoundItems = new BufferQueue<string>()
await cachePump.disposeAsync()
await cachePump.pumpItems({
ids: [i1.baseId, i2.baseId],
foundItems,
notFoundItems
})
})
})
@@ -1,107 +0,0 @@
import { TIME } from '@speckle/shared'
import { Database } from '../operations/interfaces.js'
import { CacheOptions } from '../operations/options.js'
import { CustomLogger, Item } from '../types/types.js'
import BatchingQueue from './batchingQueue.js'
import Queue from './queue.js'
import { Downloader } from '../operations/interfaces.js'
import { DefermentManager } from './defermentManager.js'
import AsyncGeneratorQueue from './asyncGeneratorQueue.js'
import { Pump } from './pump.js'
export class CachePump implements Pump {
#writeQueue: BatchingQueue<Item> | undefined
#database: Database
#logger: CustomLogger
#deferments: DefermentManager
#gathered: AsyncGeneratorQueue<Item>
#options: CacheOptions
#disposed = false
constructor(
database: Database,
gathered: AsyncGeneratorQueue<Item>,
deferments: DefermentManager,
options: CacheOptions
) {
this.#database = database
this.#gathered = gathered
this.#deferments = deferments
this.#options = options
this.#logger = options.logger || ((): void => {})
}
add(item: Item): void {
if (!this.#writeQueue) {
this.#writeQueue = new BatchingQueue({
batchSize: this.#options.maxCacheWriteSize,
maxWaitTime: this.#options.maxCacheBatchWriteWait,
processFunction: (batch: Item[]): Promise<void> =>
this.#database.cacheSaveBatch({ batch })
})
}
this.#writeQueue.add(item.baseId, item)
}
async disposeAsync(): Promise<void> {
await this.#writeQueue?.disposeAsync()
await this.#database.disposeAsync()
this.#disposed = true
}
get isDisposed(): boolean {
return this.#disposed
}
async pumpItems(params: {
ids: string[]
foundItems: Queue<Item>
notFoundItems: Queue<string>
}): Promise<void> {
const { ids, foundItems, notFoundItems } = params
const maxCacheReadSize = this.#options.maxCacheReadSize
for (let i = 0; i < ids.length; ) {
if (this.isDisposed) break
if ((this.#writeQueue?.count() ?? 0) > this.#options.maxWriteQueueSize) {
this.#logger(
'pausing reads (# in write queue: ' + this.#writeQueue?.count() + ')'
)
await new Promise((resolve) => setTimeout(resolve, TIME.second)) // Pause for 1 second, protects against out of memory
continue
}
const batch = ids.slice(i, i + maxCacheReadSize)
const cachedData = await this.#database.getAll(batch)
for (let i = 0; i < cachedData.length; i++) {
if (cachedData[i]) {
foundItems.add(cachedData[i]!)
} else {
notFoundItems.add(batch[i])
}
}
i += maxCacheReadSize
}
}
async *gather(ids: string[], downloader: Downloader): AsyncGenerator<Item> {
const total = ids.length
const pumpPromise = this.pumpItems({
ids,
foundItems: this.#gathered,
notFoundItems: downloader
})
let count = 0
for await (const item of this.#gathered.consume()) {
this.#deferments.undefer(item)
yield item
count++
if (count >= total) {
this.#gathered.dispose()
}
}
await pumpPromise
}
}
@@ -11,7 +11,7 @@ describe('CacheReader testing', () => {
const deferments = new DefermentManager({ maxSizeInMb: 1, ttlms: 1 })
const cacheReader = new CacheReader(
new MemoryDatabase({
items: new Map<string, Base>([[i1.baseId, i1.base]])
items: new Map<string, Base>([[i1.baseId, i1.base!]])
}),
deferments,
{
@@ -3,6 +3,7 @@ import { CacheOptions } from '../operations/options.js'
import { Base, CustomLogger, Item } from '../types/types.js'
import BatchingQueue from './batchingQueue.js'
import { DefermentManager } from './defermentManager.js'
import Queue from './queue.js'
export class CacheReader {
#database: Database
@@ -10,6 +11,8 @@ export class CacheReader {
#logger: CustomLogger
#options: CacheOptions
#readQueue: BatchingQueue<string> | undefined
#foundQueue: Queue<Item> | undefined
#notFoundQueue: Queue<string> | undefined
constructor(
database: Database,
@@ -22,14 +25,19 @@ export class CacheReader {
this.#logger = options.logger || ((): void => {})
}
initializeQueue(foundQueue: Queue<Item>, notFoundQueue: Queue<string>): void {
this.#foundQueue = foundQueue
this.#notFoundQueue = notFoundQueue
}
async getObject(params: { id: string }): Promise<Base> {
if (!this.#defermentManager.isDeferred(params.id)) {
this.#getItem(params.id)
this.#requestItem(params.id)
}
return await this.#defermentManager.defer({ id: params.id })
}
#getItem(id: string): void {
#createReadQueue(): void {
if (!this.#readQueue) {
this.#readQueue = new BatchingQueue({
batchSize: this.#options.maxCacheReadSize,
@@ -37,23 +45,29 @@ export class CacheReader {
processFunction: this.#processBatch
})
}
if (!this.#readQueue.get(id)) {
this.#readQueue.add(id, id)
}
#requestItem(id: string): void {
this.#createReadQueue()
if (!this.#readQueue?.get(id)) {
this.#readQueue?.add(id, id)
}
}
async getAll(keys: string[]): Promise<(Item | undefined)[]> {
return this.#database.getAll(keys)
requestAll(keys: string[]): void {
this.#createReadQueue()
this.#readQueue?.addAll(keys, keys)
}
#processBatch = async (batch: string[]): Promise<void> => {
const items = await this.#database.getAll(batch)
for (let i = 0; i < items.length; i++) {
if (items[i]) {
this.#defermentManager.undefer(items[i]!)
const item = items[i]
if (item) {
this.#foundQueue?.add(item)
this.#defermentManager.undefer(item)
} else {
//this is okay!
//this.#logger(`Item ${batch[i]} not found in cache`)
this.#notFoundQueue?.add(batch[i])
}
}
}
@@ -0,0 +1,48 @@
import { Database } from '../operations/interfaces.js'
import { CacheOptions } from '../operations/options.js'
import { CustomLogger, Item } from '../types/types.js'
import BatchingQueue from './batchingQueue.js'
import { DefermentManager } from './defermentManager.js'
import Queue from './queue.js'
export class CacheWriter implements Queue<Item> {
#writeQueue: BatchingQueue<Item> | undefined
#database: Database
#defermentManager: DefermentManager
#logger: CustomLogger
#options: CacheOptions
#disposed = false
constructor(
database: Database,
defermentManager: DefermentManager,
options: CacheOptions
) {
this.#database = database
this.#defermentManager = defermentManager
this.#options = options
this.#logger = options.logger || ((): void => {})
}
add(item: Item): void {
if (!this.#writeQueue) {
this.#writeQueue = new BatchingQueue({
batchSize: this.#options.maxCacheWriteSize,
maxWaitTime: this.#options.maxCacheBatchWriteWait,
processFunction: (batch: Item[]): Promise<void> =>
this.#database.saveBatch({ batch })
})
}
this.#defermentManager.undefer(item)
this.#writeQueue.add(item.baseId, item)
}
async disposeAsync(): Promise<void> {
await this.#writeQueue?.disposeAsync()
this.#disposed = true
}
get isDisposed(): boolean {
return this.#disposed
}
}
@@ -24,13 +24,13 @@ describe('deferments', () => {
expect(d?.getId()).toBe('id')
expect((d as any).expiresAt).toBe(2)
expect((d as any).ttl).toBe(1)
expect((d as any).item).toBeUndefined()
expect((d as any).base).toBeUndefined()
expect(d?.isExpired(1)).toBe(false)
deferments.undefer({ baseId: 'id', base: { id: 'id', speckle_type: 'type' } })
await x
expect((d as any).expiresAt).toBe(2)
expect((d as any).ttl).toBe(1)
expect((d as any).item).toBeDefined()
expect((d as any).base).toBeDefined()
expect(d?.isExpired(1)).toBe(false)
expect(d?.isExpired(3)).toBe(true)
})
@@ -59,16 +59,21 @@ export class DefermentManager {
undefer(item: Item): void {
if (this.disposed) throw new Error('DefermentManager is disposed')
const base = item.base
if (!base) {
this.logger('undefer called with no base', item)
return
}
const now = this.now()
this.currentSize += item.size || 0
//order matters here with found before undefer
const deferredBase = this.deferments.get(item.baseId)
if (deferredBase) {
deferredBase.found(item)
deferredBase.found(base)
deferredBase.setAccess(now)
} else {
const existing = new DeferredBase(this.options.ttlms, item.baseId, now)
existing.found(item)
existing.found(base)
this.deferments.set(item.baseId, existing)
}
}
@@ -95,7 +100,7 @@ export class DefermentManager {
let waiting = 0
for (const deferredBase of this.deferments.values()) {
deferredBase.done(0)
if (deferredBase.getItem() === undefined) {
if (deferredBase.getBase() === undefined) {
waiting++
}
}
@@ -119,7 +124,7 @@ export class DefermentManager {
const start = performance.now()
for (const deferredBase of Array.from(this.deferments.values())
.filter((x) => x.isExpired(now))
.sort((a, b) => this.compareMaybeBasesBySize(a.getItem(), b.getItem()))) {
.sort((a, b) => this.compareMaybeBasesBySize(a.getSize(), b.getSize()))) {
if (deferredBase.done(now)) {
//if the deferment is done but has been requested multiple times,
//we do not clean it up to allow the requests to resolve
@@ -127,7 +132,7 @@ export class DefermentManager {
if (requestCount && requestCount > 1) {
return
}
this.currentSize -= deferredBase.getItem()?.size || 0
this.currentSize -= deferredBase.getSize() || 0
this.deferments.delete(deferredBase.getId())
cleaned++
if (this.currentSize < maxSizeBytes) {
@@ -144,14 +149,7 @@ export class DefermentManager {
return
}
compareMaybeBasesBySize(a: Item | undefined, b: Item | undefined): number {
if (a === undefined && b === undefined) return 0
if (a === undefined) return -1
if (b === undefined) return 1
return this.compareMaybe(a.size, b.size)
}
compareMaybe(a: number | undefined, b: number | undefined): number {
compareMaybeBasesBySize(a: number | undefined, b: number | undefined): number {
if (a === undefined && b === undefined) return 0
if (a === undefined) return -1
if (b === undefined) return 1
@@ -1,10 +1,11 @@
import { Base, Item } from '../types/types.js'
import { Base } from '../types/types.js'
export class DeferredBase {
private promise: Promise<Base>
private resolve!: (value: Base) => void
private reject!: (reason?: Error) => void
private item?: Item
private base?: Base
private size?: number
private readonly id: string
private expiresAt: number // Timestamp in ms
@@ -24,8 +25,11 @@ export class DeferredBase {
return this.id
}
getItem(): Item | undefined {
return this.item
getBase(): Base | undefined {
return this.base
}
getSize(): number | undefined {
return this.size
}
getPromise(): Promise<Base> {
@@ -33,19 +37,20 @@ export class DeferredBase {
}
isExpired(now: number): boolean {
return this.item !== undefined && now > this.expiresAt
return this.base !== undefined && now > this.expiresAt
}
setAccess(now: number): void {
this.expiresAt = now + this.ttl
}
found(value: Item): void {
this.item = value
this.resolve(value.base)
found(value: Base, size?: number): void {
this.base = value
this.size = size
this.resolve(value)
}
done(now: number): boolean {
if (this.item) {
this.resolve(this.item.base)
if (this.base) {
this.resolve(this.base)
}
if (this.isExpired(now)) {
return true
@@ -16,6 +16,18 @@ export default class KeyedQueue<K, V> {
return true
}
enqueueAll(keys: K[], values: V[]): number {
let count = 0
for (let i = 0; i < keys.length; i++) {
if (!this._map.has(keys[i])) {
this._map.set(keys[i], values[i])
this._order.push(keys[i])
count++
}
}
return count
}
get(key: K): V | undefined {
return this._map.get(key)
}
@@ -1,40 +0,0 @@
import { Item } from '../types/types.js'
import { Pump } from './pump.js'
import Queue from './queue.js'
export class MemoryPump implements Pump {
#items: Map<string, Item> = new Map()
add(item: Item): void {
this.#items.set(item.baseId, item)
}
async pumpItems(params: {
ids: string[]
foundItems: Queue<Item>
notFoundItems: Queue<string>
}): Promise<void> {
const { ids, foundItems, notFoundItems } = params
for (const id of ids) {
const item = this.#items.get(id)
if (item) {
foundItems.add(item)
} else {
notFoundItems.add(id)
}
}
return Promise.resolve()
}
async *gather(ids: string[]): AsyncGenerator<Item> {
for (const id of ids) {
const item = this.#items.get(id)
if (item) {
yield item
}
}
return Promise.resolve()
}
async disposeAsync(): Promise<void> {}
}
@@ -1,8 +0,0 @@
import { Downloader } from '../operations/interfaces.js'
import { Item } from '../types/types.js'
import Queue from './queue.js'
export interface Pump extends Queue<Item> {
gather(ids: string[], downloader: Downloader): AsyncGenerator<Item>
disposeAsync(): Promise<void>
}
@@ -3,16 +3,16 @@
exports[`IndexedDatabase > should add and get multiple items 1`] = `
[
{
"baseId": "id1",
"item": {
"base": {
"foo": "bar",
},
"baseId": "id1",
},
{
"baseId": "id2",
"item": {
"base": {
"foo": "bar",
},
"baseId": "id2",
},
]
`;
@@ -1,10 +1,14 @@
import { describe, it, expect, beforeEach, afterEach } from 'vitest'
import { IDBFactory, IDBKeyRange } from 'fake-indexeddb'
import IndexedDatabase, { IndexedDatabaseOptions } from './indexedDatabase.js'
import { Item } from '../../types/types.js'
import { Item, Base } from '../../types/types.js'
// Mock Item
const defaultItem = (id: string): Item => ({ baseId: id, item: { foo: 'bar' } })
const defaultItem = (id: string): Item => ({
baseId: id,
base: { foo: 'bar' } as unknown as Base
})
describe('IndexedDatabase', () => {
let db: IndexedDatabase
@@ -21,7 +25,7 @@ describe('IndexedDatabase', () => {
it('should add and get multiple items', async () => {
const items = [defaultItem('id1'), defaultItem('id2')]
await db.cacheSaveBatch({ batch: items })
await db.saveBatch({ batch: items })
const result = await db.getAll(['id1', 'id2'])
expect(result).toMatchSnapshot()
expect(result).toEqual(items)
@@ -83,7 +83,7 @@ export default class IndexedDatabase implements Database {
this.#cacheDB = await this.#openDatabase()
}
async cacheSaveBatch(params: { batch: Item[] }): Promise<void> {
async saveBatch(params: { batch: Item[] }): Promise<void> {
await this.#setupCacheDb()
const { batch } = params
//const x = this.#count
@@ -21,14 +21,14 @@ describe('MemoryDatabase', () => {
it('should add and retrieve a single item', async () => {
const item = makeItem('id1')
await db.cacheSaveBatch({ batch: [item] })
await db.saveBatch({ batch: [item] })
const result = await db.getAll(['id1'])
expect(result).toEqual([item])
})
it('should add and retrieve multiple items', async () => {
const items = [makeItem('id1'), makeItem('id2', 'baz')]
await db.cacheSaveBatch({ batch: items })
await db.saveBatch({ batch: items })
const result = await db.getAll(['id1', 'id2'])
expect(result).toEqual(items)
})
@@ -36,8 +36,8 @@ describe('MemoryDatabase', () => {
it('should overwrite items with the same key', async () => {
const item1 = makeItem('id1', 'foo')
const item2 = makeItem('id1', 'bar')
await db.cacheSaveBatch({ batch: [item1] })
await db.cacheSaveBatch({ batch: [item2] })
await db.saveBatch({ batch: [item1] })
await db.saveBatch({ batch: [item2] })
const result = await db.getAll(['id1'])
expect(result).toEqual([item2])
})
@@ -22,8 +22,11 @@ export class MemoryDatabase implements Database {
return Promise.resolve(found)
}
cacheSaveBatch({ batch }: { batch: Item[] }): Promise<void> {
saveBatch({ batch }: { batch: Item[] }): Promise<void> {
for (const item of batch) {
if (!item.baseId || !item.base) {
throw new Error('Item must have a baseId and base')
}
this.items.set(item.baseId, item.base)
}
return Promise.resolve()
@@ -3,14 +3,14 @@ import createFetchMock from 'vitest-fetch-mock'
import { vi } from 'vitest'
import { Item } from '../../types/types.js'
import ServerDownloader from './serverDownloader.js'
import { MemoryPump } from '../../helpers/memoryPump.js'
import AsyncGeneratorQueue from '../../helpers/asyncGeneratorQueue.js'
describe('downloader', () => {
test('download batch of one', async () => {
const fetchMocker = createFetchMock(vi)
const i: Item = { baseId: 'id', base: { id: 'id', speckle_type: 'type' } }
fetchMocker.mockResponseOnce('id\t' + JSON.stringify(i.base) + '\n')
const pump = new MemoryPump()
const gathered = new AsyncGeneratorQueue<Item>()
const downloader = new ServerDownloader({
serverUrl: 'http://speckle.test',
streamId: 'streamId',
@@ -18,12 +18,21 @@ describe('downloader', () => {
token: 'token',
fetch: fetchMocker
})
downloader.initializePool({ results: pump, total: 1, maxDownloadBatchWait: 200 })
downloader.initializePool({
results: gathered,
total: 1,
maxDownloadBatchWait: 200
})
downloader.add('id')
await downloader.disposeAsync()
const r = []
for await (const x of pump.gather([i.baseId])) {
let count = 0
for await (const x of gathered.consume()) {
r.push(x)
count++
if (count >= 1) {
break
}
}
expect(r).toMatchSnapshot()
@@ -38,7 +47,7 @@ describe('downloader', () => {
'id1\t' + JSON.stringify(i1.base) + '\nid2\t' + JSON.stringify(i2.base) + '\n'
)
const pump = new MemoryPump()
const gathered = new AsyncGeneratorQueue<Item>()
const downloader = new ServerDownloader({
serverUrl: 'http://speckle.test',
streamId: 'streamId',
@@ -47,13 +56,22 @@ describe('downloader', () => {
fetch: fetchMocker
})
downloader.initializePool({ results: pump, total: 2, maxDownloadBatchWait: 200 })
downloader.initializePool({
results: gathered,
total: 2,
maxDownloadBatchWait: 200
})
downloader.add('id1')
downloader.add('id2')
await downloader.disposeAsync()
const r = []
for await (const x of pump.gather([i1.baseId, i2.baseId])) {
let count = 0
for await (const x of gathered.consume()) {
r.push(x)
count++
if (count >= 2) {
break
}
}
expect(r).toMatchSnapshot()
@@ -75,7 +93,7 @@ describe('downloader', () => {
'\n'
)
const pump = new MemoryPump()
const gathered = new AsyncGeneratorQueue<Item>()
const downloader = new ServerDownloader({
serverUrl: 'http://speckle.test',
streamId: 'streamId',
@@ -84,14 +102,23 @@ describe('downloader', () => {
fetch: fetchMocker
})
downloader.initializePool({ results: pump, total: 3, maxDownloadBatchWait: 200 })
downloader.initializePool({
results: gathered,
total: 3,
maxDownloadBatchWait: 200
})
downloader.add('id1')
downloader.add('id2')
downloader.add('id3')
await downloader.disposeAsync()
const r = []
for await (const x of pump.gather([i1.baseId, i2.baseId, i3.baseId])) {
let count = 0
for await (const x of gathered.consume()) {
r.push(x)
count++
if (count >= 3) {
break
}
}
expect(r).toMatchSnapshot()
@@ -13,6 +13,6 @@ export interface Downloader extends Queue<string> {
export interface Database {
getAll(keys: string[]): Promise<(Item | undefined)[]>
cacheSaveBatch(params: { batch: Item[] }): Promise<void>
saveBatch(params: { batch: Item[] }): Promise<void>
disposeAsync(): Promise<void>
}
@@ -23,6 +23,7 @@ describe('objectloader2', () => {
})
})
const x = await loader.getRootObject()
await loader.disposeAsync()
expect(x).toMatchSnapshot()
})
@@ -42,6 +43,7 @@ describe('objectloader2', () => {
})
})
const x = await loader.getRootObject()
await loader.disposeAsync()
expect(x).toMatchSnapshot()
})
@@ -65,6 +67,7 @@ describe('objectloader2', () => {
for await (const x of loader.getObjectIterator()) {
r.push(x)
}
await loader.disposeAsync()
expect(r).toMatchSnapshot()
})
@@ -99,6 +102,7 @@ describe('objectloader2', () => {
for await (const x of loader.getObjectIterator()) {
r.push(x)
}
await loader.disposeAsync()
expect(obj).toBeDefined()
expect(r).toMatchSnapshot()
@@ -129,16 +133,14 @@ describe('objectloader2', () => {
const loader = new ObjectLoader2({
rootId: root.baseId,
downloader: new MemoryDownloader(rootId, records),
database: new IndexedDatabase({
indexedDB: new IDBFactory(),
keyRange: IDBKeyRange
})
database: new MemoryDatabase({ items: records })
})
const r = []
const obj = loader.getObject({ id: child1.baseId })
for await (const x of loader.getObjectIterator()) {
r.push(x)
}
await loader.disposeAsync()
expect(obj).toBeDefined()
expect(r).toMatchSnapshot()
@@ -172,6 +174,7 @@ describe('objectloader2', () => {
})
})
const x = await loader.getRootObject()
await loader.disposeAsync()
expect(x).toMatchSnapshot()
})
@@ -224,6 +227,7 @@ describe('objectloader2', () => {
for await (const x of loader.getObjectIterator()) {
r.push(x)
}
await loader.disposeAsync()
expect(r).toMatchSnapshot()
})
})
@@ -4,9 +4,9 @@ import { CustomLogger, Base, Item } from '../types/types.js'
import { CacheOptions, ObjectLoader2Options } from './options.js'
import { DefermentManager } from '../helpers/defermentManager.js'
import { CacheReader } from '../helpers/cacheReader.js'
import { CachePump } from '../helpers/cachePump.js'
import AggregateQueue from '../helpers/aggregateQueue.js'
import { ObjectLoader2Factory } from './objectLoader2Factory.js'
import { CacheWriter } from '../helpers/cacheWriter.js'
export class ObjectLoader2 {
#rootId: string
@@ -15,8 +15,8 @@ export class ObjectLoader2 {
#database: Database
#downloader: Downloader
#pump: CachePump
#cache: CacheReader
#cacheReader: CacheReader
#cacheWriter: CacheWriter
#deferments: DefermentManager
@@ -38,27 +38,25 @@ export class ObjectLoader2 {
}
this.#gathered = new AsyncGeneratorQueue()
this.#database = options.database
this.#deferments = new DefermentManager({
maxSizeInMb: 2_000, // 2 GBs
ttlms: 15_000, // 15 seconds
logger: this.#logger
})
this.#cache = new CacheReader(this.#database, this.#deferments, cacheOptions)
this.#pump = new CachePump(
this.#database,
this.#gathered,
this.#deferments,
cacheOptions
)
this.#downloader = options.downloader
this.#cacheReader = new CacheReader(this.#database, this.#deferments, cacheOptions)
this.#cacheReader.initializeQueue(this.#gathered, this.#downloader)
this.#cacheWriter = new CacheWriter(this.#database, this.#deferments, cacheOptions)
}
async disposeAsync(): Promise<void> {
this.#gathered.dispose()
await Promise.all([
this.#downloader.disposeAsync(),
this.#cache.disposeAsync(),
this.#pump.disposeAsync()
this.#cacheReader.disposeAsync(),
this.#cacheWriter.disposeAsync()
])
this.#deferments.dispose()
}
@@ -74,38 +72,46 @@ export class ObjectLoader2 {
}
async getObject(params: { id: string }): Promise<Base> {
return await this.#cache.getObject({ id: params.id })
return await this.#cacheReader.getObject({ id: params.id })
}
async getTotalObjectCount(): Promise<number> {
const rootObj = await this.getRootObject()
const totalChildrenCount = Object.keys(rootObj?.base.__closure || {}).length
const totalChildrenCount = Object.keys(rootObj?.base?.__closure || {}).length
return totalChildrenCount + 1 //count the root
}
async *getObjectIterator(): AsyncGenerator<Base> {
const rootItem = await this.getRootObject()
if (rootItem === undefined) {
if (rootItem?.base === undefined) {
this.#logger('No root object found!')
return
}
//only for root
this.#pump.add(rootItem)
yield rootItem.base
if (!rootItem.base.__closure) return
if (!rootItem.base.__closure) {
yield rootItem.base
return
}
//sort the closures by their values descending
const sortedClosures = Object.entries(rootItem.base.__closure).sort(
(a, b) => b[1] - a[1]
)
const children = sortedClosures.map((x) => x[0])
const total = children.length
const total = children.length + 1 // +1 for the root object
this.#downloader.initializePool({
results: new AggregateQueue(this.#gathered, this.#pump),
results: new AggregateQueue(this.#gathered, this.#cacheWriter),
total
})
for await (const item of this.#pump.gather(children, this.#downloader)) {
yield item.base
//only for root
this.#gathered.add(rootItem)
this.#cacheReader.requestAll(children)
let count = 0
for await (const item of this.#gathered.consume()) {
yield item.base! //always defined, as we add it to the queue
count++
if (count >= total) {
break
}
}
}
+1 -1
View File
@@ -7,7 +7,7 @@ export type Fetcher = (
export interface Item {
baseId: string
base: Base
base?: Base
size?: number
}