feat(viewer): objectloader2 integration (#4267)
* feat(viewer-sandbox): Made a sandbox function that will only invoke the object-loader loading objects * first pass of creating an objectloader2 * updated build + added vitest * try to get viewer sandbox to use new code * sandbox type fix * refactor a bit * can download root * intermediate commit for downloader/caching queue * can download stuff! * refactor files * intro isBase and fix isString * move single download to downloader * fix download * PR feedback * some intermediate commit * do clean up and download better * clean up promises and linting * can generate values while downloading and caching * add a finish method * remove unused functions * remove asBase * add temporary docs * add more docs with mermaid * add more test models * add response validation * add tests and redo options * add test for download batch * fix downloader tests and change Item to have clearer Base items * add tests and refactor a little * use fetch in downloader as an option * use optional in-memory indexdb instead of monkey patching the global one * more refactors for options for objectloader2 * add tests for objectloader2 * adjust single download * benchmark loading and adjust ol2 batches * download more! * adjust to use hash privates * refactored again with renaming * cleanup * make setupCacheDb throw instead * use BatchedPool for downloads! * fix tests * adjust timings and add adaptive waiting * Only wait if queue wasn't empty and queue size was full * fix tests * fix file names and some private usage * fix interval and private usage * rename vars * use params for methods * fix params for constructors and tests * fix params for constructors and tests again * using dexie * faster settings but doesn't end well * fixed end, optimized and removed logs * fix tests * fix types? * update lock with WSL * add e2e small model test * fix/update yarn.lock * Remove unused eslint ignore to fix pre-commit * prettier fixes * fix real DB usage * rename methods to better match OL1 * rename methods to better match OL1 again * add extra header collection * add headers correctly * test getTotalObjectCount * feat(viewer-lib): Replaced old object loader with Adam's objectloder2 * fix(viewer-lib): Removed the old object loader. Removed unneeded pause time in speckle loader * Testing * only deferred if not downloaded....don't save everything * Lockfile * pool isn't adjustable, adjust download buckets, dexie read is faster * chore * fix(viewer-lib): Fixed compiler errors * fix getObject access with real indexeddb...adjust buffer for deferred access * Fix disposal and pausing * don't index item! * fix dockerfiles to use OL2 * fix Dockerfile * Fix dockerfile * defer correctly and use record to add/lookup/remove to * delete stuff correctly * chore(sandbox): Enabled viewer loading * use objects instead of arrays to avoid findIndex * remove extra count * add a found cache to avoid some db hits * order matters for deferment * move found map to deferment * change option numbers * 2 level cache with expiry * defer everything, use loader to track what is requested....expire only found items * add deferment disposal * oops mismerge * chore(sandbox): Default stream * Beta version of CachePump and CacheReader * Clean up initialization * More clean up * chore(objectloader2): Fixed CI compiler error * chore(objectloader2): Fixed prettier * add cachePump tests * add cacheReader tests * fixed more tests * fixed final tests * moving stuff around and lock return value * try to move stuff out of objectloader2 * use a factory * rename factory * formatting * eslist fixes * try allocating no strings * add comments * small refactor and add another test * fix deferment expiration and have test * use byte size for max memory cache size * fix deferment manager tests * saved comment * fix(viewer-sandbox): Fixed compiler error * ignore tshy * chore(frontend): Attempt to make viewer loading sequential --------- Co-authored-by: Adam Hathcock <adam@hathcock.uk> Co-authored-by: Kristaps Fabians Geikins <fabis94@live.com> Co-authored-by: Iain Sproat <68657+iainsproat@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
aaa4e1ab71
commit
a385823b2d
+2
-1
@@ -82,4 +82,5 @@ bin/
|
||||
|
||||
# Server
|
||||
multiregion.json
|
||||
multiregion.test.json
|
||||
multiregion.test.json
|
||||
packages/*/.tshy/
|
||||
|
||||
@@ -13,7 +13,7 @@ COPY package.json yarn.lock ./
|
||||
COPY utils/ensure-tailwind-deps.mjs ./utils/
|
||||
|
||||
COPY packages/viewer/package.json ./packages/viewer/
|
||||
COPY packages/objectloader/package.json ./packages/objectloader/
|
||||
COPY packages/objectloader2/package.json ./packages/objectloader2/
|
||||
COPY packages/shared/package.json ./packages/shared/
|
||||
COPY packages/ui-components/package.json ./packages/ui-components/
|
||||
COPY packages/ui-components-nuxt/package.json ./packages/ui-components-nuxt/
|
||||
@@ -21,7 +21,7 @@ COPY packages/tailwind-theme/package.json ./packages/tailwind-theme/
|
||||
COPY packages/frontend-2/package.json ./packages/frontend-2/
|
||||
COPY packages/frontend-2/type-augmentations ./packages/frontend-2/
|
||||
|
||||
COPY packages/objectloader ./packages/objectloader/
|
||||
COPY packages/objectloader2 ./packages/objectloader2/
|
||||
COPY packages/viewer ./packages/viewer/
|
||||
COPY packages/shared ./packages/shared/
|
||||
COPY packages/ui-components ./packages/ui-components/
|
||||
|
||||
@@ -111,7 +111,7 @@ function useViewerObjectAutoLoading() {
|
||||
|
||||
const consolidateProgressThorttled = useThrottleFn(consolidateProgressInternal, 250)
|
||||
|
||||
const loadObject = (
|
||||
const loadObject = async (
|
||||
objectId: string,
|
||||
unload?: boolean,
|
||||
options?: Partial<{ zoomToObject: boolean }>
|
||||
@@ -119,7 +119,7 @@ function useViewerObjectAutoLoading() {
|
||||
const objectUrl = getObjectUrl(projectId.value, objectId)
|
||||
|
||||
if (unload) {
|
||||
viewer.unloadObject(objectUrl)
|
||||
return viewer.unloadObject(objectUrl)
|
||||
} else {
|
||||
const loader = new SpeckleLoader(
|
||||
viewer.getWorldTree(),
|
||||
@@ -135,7 +135,7 @@ function useViewerObjectAutoLoading() {
|
||||
consolidateProgressInternal({ id, progress: 1 })
|
||||
})
|
||||
|
||||
viewer.loadObject(loader, options?.zoomToObject)
|
||||
return viewer.loadObject(loader, options?.zoomToObject)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -155,9 +155,15 @@ function useViewerObjectAutoLoading() {
|
||||
if (!newHasDoneInitialLoad) {
|
||||
const allObjectIds = getUniqueObjectIds(newResources)
|
||||
|
||||
const res = await Promise.all(
|
||||
allObjectIds.map((i) => loadObject(i, false, { zoomToObject }))
|
||||
)
|
||||
/** Load sequentially */
|
||||
const res = []
|
||||
for (const i of allObjectIds) {
|
||||
res.push(await loadObject(i, false, { zoomToObject }))
|
||||
}
|
||||
/** Load in parallel */
|
||||
// const res = await Promise.all(
|
||||
// allObjectIds.map((i) => loadObject(i, false, { zoomToObject }))
|
||||
// )
|
||||
if (res.length) {
|
||||
hasDoneInitialLoad.value = true
|
||||
}
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
{
|
||||
"extends": "../tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"rootDir": "../src",
|
||||
"module": "nodenext",
|
||||
"moduleResolution": "nodenext"
|
||||
}
|
||||
}
|
||||
@@ -1,17 +0,0 @@
|
||||
{
|
||||
"extends": "./build.json",
|
||||
"include": [
|
||||
"../src/**/*.ts",
|
||||
"../src/**/*.cts",
|
||||
"../src/**/*.tsx",
|
||||
"../src/**/*.json"
|
||||
],
|
||||
"exclude": [
|
||||
"../**/*.spec.ts",
|
||||
"../src/**/*.mts",
|
||||
"../src/package.json"
|
||||
],
|
||||
"compilerOptions": {
|
||||
"outDir": "../.tshy-build/commonjs"
|
||||
}
|
||||
}
|
||||
@@ -1,16 +0,0 @@
|
||||
{
|
||||
"extends": "./build.json",
|
||||
"include": [
|
||||
"../src/**/*.ts",
|
||||
"../src/**/*.mts",
|
||||
"../src/**/*.tsx",
|
||||
"../src/**/*.json"
|
||||
],
|
||||
"exclude": [
|
||||
"../**/*.spec.ts",
|
||||
"../src/package.json"
|
||||
],
|
||||
"compilerOptions": {
|
||||
"outDir": "../.tshy-build/esm"
|
||||
}
|
||||
}
|
||||
@@ -29,7 +29,9 @@ const configs = [
|
||||
}
|
||||
},
|
||||
rules: {
|
||||
'@typescript-eslint/restrict-template-expressions': 'off'
|
||||
'@typescript-eslint/restrict-template-expressions': 'off',
|
||||
'@typescript-eslint/await-thenable': 'error',
|
||||
'@typescript-eslint/explicit-function-return-type': 'error'
|
||||
}
|
||||
},
|
||||
{
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html
|
||||
|
||||
exports[`CachePump testing > write two items to queue use pumpItems that are NOT found 1`] = `[]`;
|
||||
|
||||
exports[`CachePump testing > write two items to queue use pumpItems that are NOT found 2`] = `
|
||||
[
|
||||
"id1",
|
||||
"id2",
|
||||
]
|
||||
`;
|
||||
|
||||
exports[`CachePump testing > write two items to queue use pumpItems that are found 1`] = `
|
||||
[
|
||||
{
|
||||
"base": {
|
||||
"id": "id",
|
||||
"speckle_type": "type",
|
||||
},
|
||||
"baseId": "id1",
|
||||
},
|
||||
{
|
||||
"base": {
|
||||
"id": "id",
|
||||
"speckle_type": "type",
|
||||
},
|
||||
"baseId": "id2",
|
||||
},
|
||||
]
|
||||
`;
|
||||
|
||||
exports[`CachePump testing > write two items to queue use pumpItems that are found 2`] = `[]`;
|
||||
@@ -0,0 +1,8 @@
|
||||
// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html
|
||||
|
||||
exports[`CacheReader testing > deferred getObject 1`] = `
|
||||
{
|
||||
"id": "id",
|
||||
"speckle_type": "type",
|
||||
}
|
||||
`;
|
||||
@@ -0,0 +1,20 @@
|
||||
import Queue from './queue.js'
|
||||
|
||||
export default class AggregateQueue<T> implements Queue<T> {
|
||||
#queue1: Queue<T>
|
||||
#queue2: Queue<T>
|
||||
|
||||
constructor(queue1: Queue<T>, queue2: Queue<T>) {
|
||||
this.#queue1 = queue1
|
||||
this.#queue2 = queue2
|
||||
}
|
||||
|
||||
add(value: T): void {
|
||||
this.#queue1.add(value)
|
||||
this.#queue2.add(value)
|
||||
}
|
||||
|
||||
values(): T[] {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
}
|
||||
@@ -27,7 +27,7 @@ export default class BatchedPool<T> {
|
||||
return this.#queue.splice(0, Math.min(batchSize, this.#queue.length))
|
||||
}
|
||||
|
||||
async #runWorker(batchSize: number) {
|
||||
async #runWorker(batchSize: number): Promise<void> {
|
||||
while (!this.#finished || this.#queue.length > 0) {
|
||||
if (this.#queue.length > 0) {
|
||||
const batch = this.getBatch(batchSize)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import Queue from './queue.js'
|
||||
import KeyedQueue from './keyedQueue.js'
|
||||
|
||||
export default class BatchingQueue<T> implements Queue<T> {
|
||||
#queue: T[] = []
|
||||
export default class BatchingQueue<T> {
|
||||
#queue: KeyedQueue<string, T> = new KeyedQueue<string, T>()
|
||||
#batchSize: number
|
||||
#processFunction: (batch: T[]) => Promise<void>
|
||||
|
||||
@@ -30,28 +30,32 @@ export default class BatchingQueue<T> implements Queue<T> {
|
||||
await this.#processingLoop
|
||||
}
|
||||
|
||||
add(item: T): void {
|
||||
this.#queue.push(item)
|
||||
add(key: string, item: T): void {
|
||||
this.#queue.enqueue(key, item)
|
||||
}
|
||||
|
||||
get(id: string): T | undefined {
|
||||
return this.#queue.get(id)
|
||||
}
|
||||
|
||||
count(): number {
|
||||
return this.#queue.length
|
||||
return this.#queue.size
|
||||
}
|
||||
|
||||
#getBatch(batchSize: number): T[] {
|
||||
return this.#queue.splice(0, Math.min(batchSize, this.#queue.length))
|
||||
return this.#queue.spliceValues(0, Math.min(batchSize, this.#queue.size))
|
||||
}
|
||||
|
||||
async #loop(): Promise<void> {
|
||||
let interval = this.#baseInterval
|
||||
while (!this.#finished || this.#queue.length > 0) {
|
||||
while (!this.#finished || this.#queue.size > 0) {
|
||||
const startTime = performance.now()
|
||||
if (this.#queue.length > 0) {
|
||||
if (this.#queue.size > 0) {
|
||||
const batch = this.#getBatch(this.#batchSize)
|
||||
//console.log('running with queue size of ' + this.#queue.length)
|
||||
await this.#processFunction(batch)
|
||||
}
|
||||
if (this.#queue.length < this.#batchSize / 2) {
|
||||
if (this.#queue.size < this.#batchSize / 2) {
|
||||
//refigure interval
|
||||
const endTime = performance.now()
|
||||
const duration = endTime - startTime
|
||||
@@ -62,7 +66,7 @@ export default class BatchingQueue<T> implements Queue<T> {
|
||||
}
|
||||
/*console.log(
|
||||
'queue is waiting ' +
|
||||
interval / TIME_MS.second +
|
||||
interval / 1000 +
|
||||
' with queue size of ' +
|
||||
this.#queue.length
|
||||
)*/
|
||||
|
||||
@@ -0,0 +1,74 @@
|
||||
import { describe, expect, test } from 'vitest'
|
||||
import { CachePump } from './cachePump.js'
|
||||
import { Base, Item } from '../types/types.js'
|
||||
import BufferQueue from './bufferQueue.js'
|
||||
import AsyncGeneratorQueue from './asyncGeneratorQueue.js'
|
||||
import { DefermentManager } from './defermentManager.js'
|
||||
import { MemoryDatabase } from '../operations/databases/memoryDatabase.js'
|
||||
|
||||
describe('CachePump testing', () => {
|
||||
test('write two items to queue use pumpItems that are NOT found', async () => {
|
||||
const i1: Item = { baseId: 'id1', base: { id: 'id', speckle_type: 'type' } }
|
||||
const i2: Item = { baseId: 'id2', base: { id: 'id', speckle_type: 'type' } }
|
||||
|
||||
const gathered = new AsyncGeneratorQueue<Item>()
|
||||
const deferments = new DefermentManager({ maxSize: 1, ttl: 1 })
|
||||
const cachePump = new CachePump(new MemoryDatabase({}), gathered, deferments, {
|
||||
maxCacheReadSize: 1,
|
||||
maxCacheWriteSize: 1,
|
||||
maxCacheBatchWriteWait: 1,
|
||||
maxCacheBatchReadWait: 1,
|
||||
maxWriteQueueSize: 1
|
||||
})
|
||||
|
||||
const foundItems = new BufferQueue<Item>()
|
||||
const notFoundItems = new BufferQueue<string>()
|
||||
|
||||
await cachePump.pumpItems({
|
||||
ids: [i1.baseId, i2.baseId],
|
||||
foundItems,
|
||||
notFoundItems
|
||||
})
|
||||
|
||||
expect(foundItems.values()).toMatchSnapshot()
|
||||
expect(notFoundItems.values()).toMatchSnapshot()
|
||||
await cachePump.disposeAsync()
|
||||
})
|
||||
|
||||
test('write two items to queue use pumpItems that are found', async () => {
|
||||
const i1: Item = { baseId: 'id1', base: { id: 'id', speckle_type: 'type' } }
|
||||
const i2: Item = { baseId: 'id2', base: { id: 'id', speckle_type: 'type' } }
|
||||
|
||||
const db = new Map<string, Base>()
|
||||
db.set(i1.baseId, i1.base)
|
||||
db.set(i2.baseId, i2.base)
|
||||
|
||||
const gathered = new AsyncGeneratorQueue<Item>()
|
||||
const deferments = new DefermentManager({ maxSize: 1, ttl: 1 })
|
||||
const cachePump = new CachePump(
|
||||
new MemoryDatabase({ items: db }),
|
||||
gathered,
|
||||
deferments,
|
||||
{
|
||||
maxCacheReadSize: 1,
|
||||
maxCacheWriteSize: 1,
|
||||
maxCacheBatchWriteWait: 1,
|
||||
maxCacheBatchReadWait: 1,
|
||||
maxWriteQueueSize: 1
|
||||
}
|
||||
)
|
||||
|
||||
const foundItems = new BufferQueue<Item>()
|
||||
const notFoundItems = new BufferQueue<string>()
|
||||
|
||||
await cachePump.pumpItems({
|
||||
ids: [i1.baseId, i2.baseId],
|
||||
foundItems,
|
||||
notFoundItems
|
||||
})
|
||||
|
||||
expect(foundItems.values()).toMatchSnapshot()
|
||||
expect(notFoundItems.values()).toMatchSnapshot()
|
||||
await cachePump.disposeAsync()
|
||||
})
|
||||
})
|
||||
@@ -0,0 +1,98 @@
|
||||
import { TIME } from '@speckle/shared'
|
||||
import { Database } from '../operations/interfaces.js'
|
||||
import { CacheOptions } from '../operations/options.js'
|
||||
import { CustomLogger, Item } from '../types/types.js'
|
||||
import BatchingQueue from './batchingQueue.js'
|
||||
import Queue from './queue.js'
|
||||
import { Downloader } from '../operations/interfaces.js'
|
||||
import { DefermentManager } from './defermentManager.js'
|
||||
import AsyncGeneratorQueue from './asyncGeneratorQueue.js'
|
||||
import { Pump } from './pump.js'
|
||||
|
||||
export class CachePump implements Pump {
|
||||
#writeQueue: BatchingQueue<Item> | undefined
|
||||
#database: Database
|
||||
#logger: CustomLogger
|
||||
#deferments: DefermentManager
|
||||
|
||||
#gathered: AsyncGeneratorQueue<Item>
|
||||
|
||||
#options: CacheOptions
|
||||
|
||||
constructor(
|
||||
database: Database,
|
||||
gathered: AsyncGeneratorQueue<Item>,
|
||||
deferments: DefermentManager,
|
||||
options: CacheOptions
|
||||
) {
|
||||
this.#database = database
|
||||
this.#gathered = gathered
|
||||
this.#deferments = deferments
|
||||
this.#options = options
|
||||
this.#logger = options.logger || ((): void => {})
|
||||
}
|
||||
|
||||
add(item: Item): void {
|
||||
if (!this.#writeQueue) {
|
||||
this.#writeQueue = new BatchingQueue({
|
||||
batchSize: this.#options.maxCacheWriteSize,
|
||||
maxWaitTime: this.#options.maxCacheBatchWriteWait,
|
||||
processFunction: (batch: Item[]): Promise<void> =>
|
||||
this.#database.cacheSaveBatch({ batch })
|
||||
})
|
||||
}
|
||||
this.#writeQueue.add(item.baseId, item)
|
||||
}
|
||||
|
||||
async disposeAsync(): Promise<void> {
|
||||
await this.#writeQueue?.disposeAsync()
|
||||
}
|
||||
|
||||
async pumpItems(params: {
|
||||
ids: string[]
|
||||
foundItems: Queue<Item>
|
||||
notFoundItems: Queue<string>
|
||||
}): Promise<void> {
|
||||
const { ids, foundItems, notFoundItems } = params
|
||||
const maxCacheReadSize = this.#options.maxCacheReadSize
|
||||
|
||||
for (let i = 0; i < ids.length; ) {
|
||||
if ((this.#writeQueue?.count() ?? 0) > this.#options.maxWriteQueueSize) {
|
||||
this.#logger(
|
||||
'pausing reads (# in write queue: ' + this.#writeQueue?.count() + ')'
|
||||
)
|
||||
await new Promise((resolve) => setTimeout(resolve, TIME.second)) // Pause for 1 second, protects against out of memory
|
||||
continue
|
||||
}
|
||||
const batch = ids.slice(i, i + maxCacheReadSize)
|
||||
const cachedData = await this.#database.getAll(batch)
|
||||
for (let i = 0; i < cachedData.length; i++) {
|
||||
if (cachedData[i]) {
|
||||
foundItems.add(cachedData[i]!)
|
||||
} else {
|
||||
notFoundItems.add(batch[i])
|
||||
}
|
||||
}
|
||||
i += maxCacheReadSize
|
||||
}
|
||||
}
|
||||
|
||||
async *gather(ids: string[], downloader: Downloader): AsyncGenerator<Item> {
|
||||
const total = ids.length
|
||||
const pumpPromise = this.pumpItems({
|
||||
ids,
|
||||
foundItems: this.#gathered,
|
||||
notFoundItems: downloader
|
||||
})
|
||||
let count = 0
|
||||
for await (const item of this.#gathered.consume()) {
|
||||
this.#deferments.undefer(item)
|
||||
yield item
|
||||
count++
|
||||
if (count >= total) {
|
||||
this.#gathered.dispose()
|
||||
}
|
||||
}
|
||||
await pumpPromise
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
import { describe, expect, test } from 'vitest'
|
||||
import { Base, Item } from '../types/types.js'
|
||||
import { DefermentManager } from './defermentManager.js'
|
||||
import { CacheReader } from './cacheReader.js'
|
||||
import { MemoryDatabase } from '../operations/databases/memoryDatabase.js'
|
||||
|
||||
describe('CacheReader testing', () => {
|
||||
test('deferred getObject', async () => {
|
||||
const i1: Item = { baseId: 'id1', base: { id: 'id', speckle_type: 'type' } }
|
||||
|
||||
const deferments = new DefermentManager({ maxSize: 1, ttl: 1 })
|
||||
const cacheReader = new CacheReader(
|
||||
new MemoryDatabase({
|
||||
items: new Map<string, Base>([[i1.baseId, i1.base]])
|
||||
}),
|
||||
deferments,
|
||||
{
|
||||
maxCacheReadSize: 1,
|
||||
maxCacheWriteSize: 1,
|
||||
maxCacheBatchWriteWait: 1,
|
||||
maxCacheBatchReadWait: 1,
|
||||
maxWriteQueueSize: 1
|
||||
}
|
||||
)
|
||||
|
||||
const objPromise = cacheReader.getObject({
|
||||
id: i1.baseId
|
||||
})
|
||||
deferments.undefer(i1)
|
||||
const base = await objPromise
|
||||
|
||||
expect(base).toMatchSnapshot()
|
||||
await cacheReader.disposeAsync()
|
||||
})
|
||||
})
|
||||
@@ -0,0 +1,64 @@
|
||||
import { Database } from '../operations/interfaces.js'
|
||||
import { CacheOptions } from '../operations/options.js'
|
||||
import { Base, CustomLogger, Item } from '../types/types.js'
|
||||
import BatchingQueue from './batchingQueue.js'
|
||||
import { DefermentManager } from './defermentManager.js'
|
||||
|
||||
export class CacheReader {
|
||||
#database: Database
|
||||
#defermentManager: DefermentManager
|
||||
#logger: CustomLogger
|
||||
#options: CacheOptions
|
||||
#readQueue: BatchingQueue<string> | undefined
|
||||
|
||||
constructor(
|
||||
database: Database,
|
||||
defermentManager: DefermentManager,
|
||||
options: CacheOptions
|
||||
) {
|
||||
this.#database = database
|
||||
this.#defermentManager = defermentManager
|
||||
this.#options = options
|
||||
this.#logger = options.logger || ((): void => {})
|
||||
}
|
||||
|
||||
async getObject(params: { id: string }): Promise<Base> {
|
||||
if (!this.#defermentManager.isDeferred(params.id)) {
|
||||
this.#getItem(params.id)
|
||||
}
|
||||
return await this.#defermentManager.defer({ id: params.id })
|
||||
}
|
||||
|
||||
#getItem(id: string): void {
|
||||
if (!this.#readQueue) {
|
||||
this.#readQueue = new BatchingQueue({
|
||||
batchSize: this.#options.maxCacheReadSize,
|
||||
maxWaitTime: this.#options.maxCacheBatchReadWait,
|
||||
processFunction: this.#processBatch
|
||||
})
|
||||
}
|
||||
if (!this.#readQueue.get(id)) {
|
||||
this.#readQueue.add(id, id)
|
||||
}
|
||||
}
|
||||
|
||||
async getAll(keys: string[]): Promise<(Item | undefined)[]> {
|
||||
return this.#database.getAll(keys)
|
||||
}
|
||||
|
||||
#processBatch = async (batch: string[]): Promise<void> => {
|
||||
const items = await this.#database.getAll(batch)
|
||||
for (let i = 0; i < items.length; i++) {
|
||||
if (items[i]) {
|
||||
this.#defermentManager.undefer(items[i]!)
|
||||
} else {
|
||||
//this is okay!
|
||||
//this.#logger(`Item ${batch[i]} not found in cache`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async disposeAsync(): Promise<void> {
|
||||
await this.#readQueue?.disposeAsync()
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,37 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable @typescript-eslint/no-explicit-any */
|
||||
import { describe, expect, test } from 'vitest'
|
||||
import { DefermentManager } from './defermentManager.js'
|
||||
|
||||
describe('deferments', () => {
|
||||
test('defer one', async () => {
|
||||
const deferments = new DefermentManager()
|
||||
const deferments = new DefermentManager({ maxSizeInMb: 1, ttlms: 1 })
|
||||
const x = deferments.defer({ id: 'id' })
|
||||
expect(x).toBeInstanceOf(Promise)
|
||||
deferments.undefer({ baseId: 'id', base: { id: 'id', speckle_type: 'type' } })
|
||||
const b = await x
|
||||
expect(b).toMatchSnapshot()
|
||||
})
|
||||
|
||||
test('expireAt timeout', async () => {
|
||||
const now = 1
|
||||
const deferments = new DefermentManager({ maxSizeInMb: 1, ttlms: 1 })
|
||||
deferments['now'] = (): number => now
|
||||
const x = deferments.defer({ id: 'id' })
|
||||
expect(x).toBeInstanceOf(Promise)
|
||||
const d = deferments.get('id')
|
||||
expect(d).toBeDefined()
|
||||
expect(d?.getId()).toBe('id')
|
||||
expect((d as any).expiresAt).toBe(2)
|
||||
expect((d as any).ttl).toBe(1)
|
||||
expect((d as any).item).toBeUndefined()
|
||||
expect(d?.isExpired(1)).toBe(false)
|
||||
deferments.undefer({ baseId: 'id', base: { id: 'id', speckle_type: 'type' } })
|
||||
await x
|
||||
expect((d as any).expiresAt).toBe(2)
|
||||
expect((d as any).ttl).toBe(1)
|
||||
expect((d as any).item).toBeDefined()
|
||||
expect(d?.isExpired(1)).toBe(false)
|
||||
expect(d?.isExpired(3)).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,25 +1,135 @@
|
||||
import { DeferredBase } from './deferredBase.js'
|
||||
import { Base, Item } from '../types/types.js'
|
||||
import { Base, CustomLogger, Item } from '../types/types.js'
|
||||
import { DefermentManagerOptions } from '../operations/options.js'
|
||||
|
||||
export class DefermentManager {
|
||||
#deferments: DeferredBase[] = []
|
||||
private deferments: Map<string, DeferredBase> = new Map()
|
||||
private timer?: ReturnType<typeof setTimeout>
|
||||
private logger: CustomLogger
|
||||
private currentSize = 0
|
||||
|
||||
constructor(private options: DefermentManagerOptions) {
|
||||
this.resetGlobalTimer()
|
||||
this.logger = options.logger || ((): void => {})
|
||||
}
|
||||
|
||||
private now(): number {
|
||||
return Date.now()
|
||||
}
|
||||
|
||||
isDeferred(id: string): boolean {
|
||||
return this.deferments.has(id)
|
||||
}
|
||||
|
||||
get(id: string): DeferredBase | undefined {
|
||||
return this.deferments.get(id)
|
||||
}
|
||||
|
||||
async defer(params: { id: string }): Promise<Base> {
|
||||
const deferredBase = this.#deferments.find((x) => x.id === params.id)
|
||||
const now = this.now()
|
||||
const deferredBase = this.deferments.get(params.id)
|
||||
if (deferredBase) {
|
||||
return await deferredBase.promise
|
||||
deferredBase.setAccess(now)
|
||||
return deferredBase.getPromise()
|
||||
}
|
||||
const d = new DeferredBase(params.id)
|
||||
this.#deferments.push(d)
|
||||
return d.promise
|
||||
const notYetFound = new DeferredBase(
|
||||
this.options.ttlms,
|
||||
params.id,
|
||||
now + this.options.ttlms
|
||||
)
|
||||
this.deferments.set(params.id, notYetFound)
|
||||
return notYetFound.getPromise()
|
||||
}
|
||||
|
||||
undefer(item: Item): void {
|
||||
const deferredIndex = this.#deferments.findIndex((x) => x.id === item.baseId)
|
||||
if (deferredIndex !== -1) {
|
||||
const deferredBase = this.#deferments[deferredIndex]
|
||||
deferredBase.resolve(item.base)
|
||||
this.#deferments.splice(deferredIndex, 1)
|
||||
const now = this.now()
|
||||
this.currentSize += item.size || 0
|
||||
//order matters here with found before undefer
|
||||
const deferredBase = this.deferments.get(item.baseId)
|
||||
if (deferredBase) {
|
||||
deferredBase.found(item)
|
||||
deferredBase.setAccess(now)
|
||||
} else {
|
||||
const existing = new DeferredBase(this.options.ttlms, item.baseId, now)
|
||||
existing.found(item)
|
||||
this.deferments.set(item.baseId, existing)
|
||||
}
|
||||
}
|
||||
|
||||
private resetGlobalTimer(): void {
|
||||
const run = (): void => {
|
||||
this.cleanDeferments()
|
||||
this.timer = setTimeout(run, this.options.ttlms)
|
||||
}
|
||||
this.timer = setTimeout(run, this.options.ttlms)
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
if (this.timer) {
|
||||
clearTimeout(this.timer)
|
||||
this.timer = undefined
|
||||
}
|
||||
this.clearDeferments()
|
||||
}
|
||||
|
||||
private clearDeferments(): void {
|
||||
let waiting = 0
|
||||
for (const deferredBase of this.deferments.values()) {
|
||||
deferredBase.done(0)
|
||||
if (deferredBase.getItem() === undefined) {
|
||||
waiting++
|
||||
}
|
||||
}
|
||||
this.currentSize = 0
|
||||
this.deferments.clear()
|
||||
this.logger('cleared deferments, left', waiting)
|
||||
}
|
||||
|
||||
private cleanDeferments(): void {
|
||||
const maxSizeBytes = this.options.maxSizeInMb * 1024 * 1024
|
||||
if (this.currentSize < maxSizeBytes) {
|
||||
this.logger(
|
||||
'deferments size is ok, no need to clean',
|
||||
this.currentSize,
|
||||
maxSizeBytes
|
||||
)
|
||||
return
|
||||
}
|
||||
const now = this.now()
|
||||
let cleaned = 0
|
||||
const start = performance.now()
|
||||
for (const deferredBase of Array.from(this.deferments.values())
|
||||
.filter((x) => x.isExpired(now))
|
||||
.sort((a, b) => this.compareMaybeBasesBySize(a.getItem(), b.getItem()))) {
|
||||
if (deferredBase.done(now)) {
|
||||
this.currentSize -= deferredBase.getItem()?.size || 0
|
||||
this.deferments.delete(deferredBase.getId())
|
||||
cleaned++
|
||||
if (this.currentSize < maxSizeBytes) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
this.logger(
|
||||
'cleaned deferments, cleaned, left',
|
||||
cleaned,
|
||||
this.deferments.size,
|
||||
performance.now() - start
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
compareMaybeBasesBySize(a: Item | undefined, b: Item | undefined): number {
|
||||
if (a === undefined && b === undefined) return 0
|
||||
if (a === undefined) return -1
|
||||
if (b === undefined) return 1
|
||||
return this.compareMaybe(a.size, b.size)
|
||||
}
|
||||
|
||||
compareMaybe(a: number | undefined, b: number | undefined): number {
|
||||
if (a === undefined && b === undefined) return 0
|
||||
if (a === undefined) return -1
|
||||
if (b === undefined) return 1
|
||||
return a - b
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,17 +1,55 @@
|
||||
import { Base } from '../types/types.js'
|
||||
import { Base, Item } from '../types/types.js'
|
||||
|
||||
export class DeferredBase {
|
||||
promise: Promise<Base>
|
||||
resolve!: (value: Base) => void
|
||||
reject!: (reason?: Error) => void
|
||||
private promise: Promise<Base>
|
||||
private resolve!: (value: Base) => void
|
||||
private reject!: (reason?: Error) => void
|
||||
private item?: Item
|
||||
|
||||
readonly id: string
|
||||
private readonly id: string
|
||||
private expiresAt: number // Timestamp in ms
|
||||
private ttl: number // ttl in ms
|
||||
|
||||
constructor(id: string) {
|
||||
constructor(ttl: number, id: string, expiresAt: number) {
|
||||
this.expiresAt = expiresAt
|
||||
this.ttl = ttl
|
||||
this.id = id
|
||||
this.promise = new Promise<Base>((resolve, reject) => {
|
||||
this.resolve = resolve
|
||||
this.reject = reject
|
||||
})
|
||||
}
|
||||
|
||||
getId(): string {
|
||||
return this.id
|
||||
}
|
||||
|
||||
getItem(): Item | undefined {
|
||||
return this.item
|
||||
}
|
||||
|
||||
getPromise(): Promise<Base> {
|
||||
return this.promise
|
||||
}
|
||||
|
||||
isExpired(now: number): boolean {
|
||||
return this.item !== undefined && now > this.expiresAt
|
||||
}
|
||||
setAccess(now: number): void {
|
||||
this.expiresAt = now + this.ttl
|
||||
}
|
||||
|
||||
found(value: Item): void {
|
||||
this.item = value
|
||||
this.resolve(value.base)
|
||||
}
|
||||
done(now: number): boolean {
|
||||
if (this.item) {
|
||||
this.resolve(this.item.base)
|
||||
}
|
||||
if (this.isExpired(now)) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
export default class KeyedQueue<K, V> {
|
||||
private _map: Map<K, V>
|
||||
private _order: K[]
|
||||
|
||||
constructor() {
|
||||
this._map = new Map<K, V>()
|
||||
this._order = []
|
||||
}
|
||||
|
||||
enqueue(key: K, value: V): boolean {
|
||||
if (this._map.has(key)) {
|
||||
return false // Key already exists
|
||||
}
|
||||
this._map.set(key, value)
|
||||
this._order.push(key)
|
||||
return true
|
||||
}
|
||||
|
||||
get(key: K): V | undefined {
|
||||
return this._map.get(key)
|
||||
}
|
||||
|
||||
has(key: K): boolean {
|
||||
return this._map.has(key)
|
||||
}
|
||||
|
||||
get size(): number {
|
||||
return this._order.length
|
||||
}
|
||||
|
||||
spliceValues(start: number, deleteCount: number): V[] {
|
||||
const splicedKeys = this._order.splice(start, deleteCount)
|
||||
const result: V[] = []
|
||||
|
||||
for (const key of splicedKeys) {
|
||||
const value = this._map.get(key)
|
||||
if (value !== undefined) {
|
||||
result.push(value)
|
||||
this._map.delete(key)
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
import { Item } from '../types/types.js'
|
||||
import { Pump } from './pump.js'
|
||||
import Queue from './queue.js'
|
||||
|
||||
export class MemoryPump implements Pump {
|
||||
#items: Map<string, Item> = new Map()
|
||||
|
||||
add(item: Item): void {
|
||||
this.#items.set(item.baseId, item)
|
||||
}
|
||||
|
||||
async pumpItems(params: {
|
||||
ids: string[]
|
||||
foundItems: Queue<Item>
|
||||
notFoundItems: Queue<string>
|
||||
}): Promise<void> {
|
||||
const { ids, foundItems, notFoundItems } = params
|
||||
for (const id of ids) {
|
||||
const item = this.#items.get(id)
|
||||
if (item) {
|
||||
foundItems.add(item)
|
||||
} else {
|
||||
notFoundItems.add(id)
|
||||
}
|
||||
}
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
async *gather(ids: string[]): AsyncGenerator<Item> {
|
||||
for (const id of ids) {
|
||||
const item = this.#items.get(id)
|
||||
if (item) {
|
||||
yield item
|
||||
}
|
||||
}
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
async disposeAsync(): Promise<void> {}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
import { Downloader } from '../operations/interfaces.js'
|
||||
import { Item } from '../types/types.js'
|
||||
import Queue from './queue.js'
|
||||
|
||||
export interface Pump extends Queue<Item> {
|
||||
gather(ids: string[], downloader: Downloader): AsyncGenerator<Item>
|
||||
disposeAsync(): Promise<void>
|
||||
}
|
||||
@@ -1,4 +1,3 @@
|
||||
import ObjectLoader2 from './operations/objectLoader2.js'
|
||||
|
||||
export default ObjectLoader2
|
||||
export { MemoryDatabase } from './operations/memoryDatabase.js'
|
||||
export { MemoryDatabase } from './operations/databases/memoryDatabase.js'
|
||||
export { ObjectLoader2 } from './operations/objectLoader2.js'
|
||||
export { ObjectLoader2Factory } from './operations/objectLoader2Factory.js'
|
||||
|
||||
@@ -2,35 +2,35 @@
|
||||
|
||||
exports[`objectloader2 > add extra header 1`] = `
|
||||
{
|
||||
"baseId": "baseId",
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`objectloader2 > can get a root object from cache 1`] = `
|
||||
{
|
||||
"baseId": "baseId",
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`objectloader2 > can get a root object from downloader 1`] = `
|
||||
{
|
||||
"baseId": "baseId",
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`objectloader2 > can get root/child object from cache using iterator 1`] = `
|
||||
[
|
||||
{
|
||||
"base": {
|
||||
"__closure": {
|
||||
"child1Id": 100,
|
||||
},
|
||||
"id": "rootId",
|
||||
"speckle_type": "type",
|
||||
},
|
||||
{
|
||||
"id": "child1Id",
|
||||
"baseId": "rootId",
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`objectloader2 > can get a root object from cache 1`] = `
|
||||
{
|
||||
"base": {
|
||||
"id": "baseId",
|
||||
"speckle_type": "type",
|
||||
},
|
||||
]
|
||||
"baseId": "baseId",
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`objectloader2 > can get a root object from downloader 1`] = `
|
||||
{
|
||||
"base": {
|
||||
"id": "baseId",
|
||||
"speckle_type": "type",
|
||||
},
|
||||
"baseId": "baseId",
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`objectloader2 > can get root/child object from memory cache using iterator and getObject 1`] = `
|
||||
|
||||
-21
@@ -29,24 +29,3 @@ exports[`database cache > write two items to queue use getItem 2`] = `
|
||||
"baseId": "id2",
|
||||
}
|
||||
`;
|
||||
|
||||
exports[`database cache > write two items to queue use processItems 1`] = `
|
||||
[
|
||||
{
|
||||
"base": {
|
||||
"id": "id",
|
||||
"speckle_type": "type",
|
||||
},
|
||||
"baseId": "id1",
|
||||
},
|
||||
{
|
||||
"base": {
|
||||
"id": "id",
|
||||
"speckle_type": "type",
|
||||
},
|
||||
"baseId": "id2",
|
||||
},
|
||||
]
|
||||
`;
|
||||
|
||||
exports[`database cache > write two items to queue use processItems 2`] = `[]`;
|
||||
+2
-28
@@ -1,16 +1,14 @@
|
||||
import { describe, expect, test } from 'vitest'
|
||||
import IndexedDatabase from './indexedDatabase.js'
|
||||
import { IDBFactory, IDBKeyRange } from 'fake-indexeddb'
|
||||
import { Item } from '../types/types.js'
|
||||
import BufferQueue from '../helpers/bufferQueue.js'
|
||||
import { Item } from '../../types/types.js'
|
||||
|
||||
describe('database cache', () => {
|
||||
test('write single item to queue use getItem', async () => {
|
||||
const i: Item = { baseId: 'id', base: { id: 'id', speckle_type: 'type' } }
|
||||
const database = new IndexedDatabase({
|
||||
indexedDB: new IDBFactory(),
|
||||
keyRange: IDBKeyRange,
|
||||
maxCacheBatchWriteWait: 200
|
||||
keyRange: IDBKeyRange
|
||||
})
|
||||
await database.add(i)
|
||||
await database.disposeAsync()
|
||||
@@ -36,28 +34,4 @@ describe('database cache', () => {
|
||||
const x2 = await database.getItem({ id: i2.baseId })
|
||||
expect(x2).toMatchSnapshot()
|
||||
})
|
||||
|
||||
test('write two items to queue use processItems', async () => {
|
||||
const i1: Item = { baseId: 'id1', base: { id: 'id', speckle_type: 'type' } }
|
||||
const i2: Item = { baseId: 'id2', base: { id: 'id', speckle_type: 'type' } }
|
||||
const database = new IndexedDatabase({
|
||||
indexedDB: new IDBFactory(),
|
||||
keyRange: IDBKeyRange
|
||||
})
|
||||
await database.add(i1)
|
||||
await database.add(i2)
|
||||
await database.disposeAsync()
|
||||
|
||||
const foundItems = new BufferQueue<Item>()
|
||||
const notFoundItems = new BufferQueue<string>()
|
||||
|
||||
await database.processItems({
|
||||
ids: [i1.baseId, i2.baseId],
|
||||
foundItems,
|
||||
notFoundItems
|
||||
})
|
||||
|
||||
expect(foundItems.values()).toMatchSnapshot()
|
||||
expect(notFoundItems.values()).toMatchSnapshot()
|
||||
})
|
||||
})
|
||||
@@ -0,0 +1,150 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-function-type */
|
||||
import BatchingQueue from '../../helpers/batchingQueue.js'
|
||||
import { CustomLogger, Item } from '../../types/types.js'
|
||||
import { isSafari } from '@speckle/shared'
|
||||
import { Dexie, DexieOptions, Table } from 'dexie'
|
||||
import { Database } from '../interfaces.js'
|
||||
|
||||
class ObjectStore extends Dexie {
|
||||
static #databaseName: string = 'speckle-cache'
|
||||
objects!: Table<Item, string> // Table type: <entity, primaryKey>
|
||||
|
||||
constructor(options: DexieOptions) {
|
||||
super(ObjectStore.#databaseName, options)
|
||||
|
||||
this.version(1).stores({
|
||||
objects: 'baseId, item' // baseId is primary key
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
export interface IndexedDatabaseOptions {
|
||||
logger?: CustomLogger
|
||||
indexedDB?: IDBFactory
|
||||
keyRange?: {
|
||||
bound: Function
|
||||
lowerBound: Function
|
||||
upperBound: Function
|
||||
}
|
||||
}
|
||||
|
||||
export default class IndexedDatabase implements Database {
|
||||
#options: IndexedDatabaseOptions
|
||||
#logger: CustomLogger
|
||||
|
||||
#cacheDB?: ObjectStore
|
||||
|
||||
#writeQueue: BatchingQueue<Item> | undefined
|
||||
|
||||
// #count: number = 0
|
||||
|
||||
constructor(options: IndexedDatabaseOptions) {
|
||||
this.#options = options
|
||||
this.#logger = options.logger || ((): void => {})
|
||||
}
|
||||
|
||||
async getAll(keys: string[]): Promise<(Item | undefined)[]> {
|
||||
await this.#setupCacheDb()
|
||||
let items: (Item | undefined)[] = []
|
||||
// this.#count++
|
||||
// const startTime = performance.now()
|
||||
// this.#logger('Start read ' + x + ' ' + batch.length)
|
||||
|
||||
//faster than BulkGet with dexie
|
||||
await this.#cacheDB!.transaction('r', this.#cacheDB!.objects, async () => {
|
||||
const gets = keys.map((key) => this.#cacheDB!.objects.get(key))
|
||||
const cachedData = await Promise.all(gets)
|
||||
items = cachedData
|
||||
})
|
||||
// const endTime = performance.now()
|
||||
// const duration = endTime - startTime
|
||||
//this.#logger('Saved batch ' + x + ' ' + batch.length + ' ' + duration / TIME_MS.second)
|
||||
|
||||
return items
|
||||
}
|
||||
|
||||
async #openDatabase(): Promise<ObjectStore> {
|
||||
const db = new ObjectStore({
|
||||
indexedDB: this.#options.indexedDB ?? globalThis.indexedDB,
|
||||
IDBKeyRange: this.#options.keyRange ?? IDBKeyRange,
|
||||
chromeTransactionDurability: 'relaxed'
|
||||
})
|
||||
await db.open()
|
||||
return db
|
||||
}
|
||||
|
||||
async #setupCacheDb(): Promise<void> {
|
||||
if (this.#cacheDB !== undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
// Initialize
|
||||
await this.#safariFix()
|
||||
this.#cacheDB = await this.#openDatabase()
|
||||
}
|
||||
|
||||
//this is for testing only - in the real world we will not use this
|
||||
async add(item: Item): Promise<void> {
|
||||
await this.#setupCacheDb()
|
||||
await this.#cacheDB!.transaction('rw', this.#cacheDB!.objects, async () => {
|
||||
return await this.#cacheDB?.objects.add(item)
|
||||
})
|
||||
}
|
||||
|
||||
async getItem(params: { id: string }): Promise<Item | undefined> {
|
||||
const { id } = params
|
||||
await this.#setupCacheDb()
|
||||
//might not be in the real DB yet, so check the write queue first
|
||||
if (this.#writeQueue) {
|
||||
const item = this.#writeQueue.get(id)
|
||||
if (item) {
|
||||
return item
|
||||
}
|
||||
}
|
||||
|
||||
return this.#cacheDB!.transaction('r', this.#cacheDB!.objects, async () => {
|
||||
return await this.#cacheDB?.objects.get(id)
|
||||
})
|
||||
}
|
||||
|
||||
async cacheSaveBatch(params: { batch: Item[] }): Promise<void> {
|
||||
await this.#setupCacheDb()
|
||||
const { batch } = params
|
||||
//const x = this.#count
|
||||
//this.#count++
|
||||
|
||||
// const startTime = performance.now()
|
||||
// this.#logger('Start save ' + x + ' ' + batch.length)
|
||||
await this.#cacheDB!.objects.bulkPut(batch)
|
||||
// const endTime = performance.now()
|
||||
// const duration = endTime - startTime
|
||||
//this.#logger('Saved batch ' + x + ' ' + batch.length + ' ' + duration / TIME_MS.second)
|
||||
}
|
||||
|
||||
/**
|
||||
* Fixes a Safari bug where IndexedDB requests get lost and never resolve - invoke before you use IndexedDB
|
||||
* @link Credits and more info: https://github.com/jakearchibald/safari-14-idb-fix
|
||||
*/
|
||||
async #safariFix(): Promise<void> {
|
||||
// No point putting other browsers or older versions of Safari through this mess.
|
||||
if (!isSafari() || !this.#options.indexedDB?.databases) return Promise.resolve()
|
||||
|
||||
let intervalId: ReturnType<typeof setInterval>
|
||||
|
||||
return new Promise<void>((resolve: () => void) => {
|
||||
const tryIdb = (): Promise<IDBDatabaseInfo[]> | undefined =>
|
||||
this.#options.indexedDB?.databases().finally(resolve)
|
||||
intervalId = setInterval(() => {
|
||||
void tryIdb()
|
||||
}, 100)
|
||||
void tryIdb()
|
||||
}).finally(() => clearInterval(intervalId))
|
||||
}
|
||||
|
||||
async disposeAsync(): Promise<void> {
|
||||
this.#cacheDB?.close()
|
||||
this.#cacheDB = undefined
|
||||
await this.#writeQueue?.disposeAsync()
|
||||
this.#writeQueue = undefined
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
import { Base, Item } from '../../types/types.js'
|
||||
import { Database } from '../interfaces.js'
|
||||
import { MemoryDatabaseOptions } from '../options.js'
|
||||
|
||||
export class MemoryDatabase implements Database {
|
||||
private items: Map<string, Base>
|
||||
|
||||
constructor(options?: MemoryDatabaseOptions) {
|
||||
this.items = options?.items || new Map<string, Base>()
|
||||
}
|
||||
|
||||
getAll(keys: string[]): Promise<(Item | undefined)[]> {
|
||||
const found: (Item | undefined)[] = []
|
||||
for (const key of keys) {
|
||||
const item = this.items.get(key)
|
||||
if (item) {
|
||||
found.push({ baseId: key, base: item })
|
||||
} else {
|
||||
found.push(undefined)
|
||||
}
|
||||
}
|
||||
return Promise.resolve(found)
|
||||
}
|
||||
|
||||
cacheSaveBatch({ batch }: { batch: Item[] }): Promise<void> {
|
||||
for (const item of batch) {
|
||||
this.items.set(item.baseId, item.base)
|
||||
}
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
getItem(params: { id: string }): Promise<Item | undefined> {
|
||||
const item = this.items.get(params.id)
|
||||
if (item) {
|
||||
return Promise.resolve({ baseId: params.id, base: item })
|
||||
}
|
||||
return Promise.resolve(undefined)
|
||||
}
|
||||
|
||||
disposeAsync(): Promise<void> {
|
||||
return Promise.resolve()
|
||||
}
|
||||
}
|
||||
+34
@@ -10,6 +10,7 @@ exports[`downloader > add extra header 1`] = `
|
||||
"speckle_type": "type",
|
||||
},
|
||||
"baseId": "id",
|
||||
"size": 0,
|
||||
}
|
||||
`;
|
||||
|
||||
@@ -21,6 +22,36 @@ exports[`downloader > download batch of one 1`] = `
|
||||
"speckle_type": "type",
|
||||
},
|
||||
"baseId": "id",
|
||||
"size": 33,
|
||||
},
|
||||
]
|
||||
`;
|
||||
|
||||
exports[`downloader > download batch of three 1`] = `
|
||||
[
|
||||
{
|
||||
"base": {
|
||||
"id": "id1",
|
||||
"speckle_type": "type",
|
||||
},
|
||||
"baseId": "id1",
|
||||
"size": 34,
|
||||
},
|
||||
{
|
||||
"base": {
|
||||
"id": "id2",
|
||||
"speckle_type": "type",
|
||||
},
|
||||
"baseId": "id2",
|
||||
"size": 34,
|
||||
},
|
||||
{
|
||||
"base": {
|
||||
"id": "id3",
|
||||
"speckle_type": "type",
|
||||
},
|
||||
"baseId": "id3",
|
||||
"size": 34,
|
||||
},
|
||||
]
|
||||
`;
|
||||
@@ -33,6 +64,7 @@ exports[`downloader > download batch of two 1`] = `
|
||||
"speckle_type": "type",
|
||||
},
|
||||
"baseId": "id1",
|
||||
"size": 34,
|
||||
},
|
||||
{
|
||||
"base": {
|
||||
@@ -40,6 +72,7 @@ exports[`downloader > download batch of two 1`] = `
|
||||
"speckle_type": "type",
|
||||
},
|
||||
"baseId": "id2",
|
||||
"size": 34,
|
||||
},
|
||||
]
|
||||
`;
|
||||
@@ -54,5 +87,6 @@ exports[`downloader > download single exists 1`] = `
|
||||
"speckle_type": "type",
|
||||
},
|
||||
"baseId": "id",
|
||||
"size": 0,
|
||||
}
|
||||
`;
|
||||
+15
-14
@@ -1,24 +1,25 @@
|
||||
import AsyncGeneratorQueue from '../helpers/asyncGeneratorQueue.js'
|
||||
import { Base, Item } from '../types/types.js'
|
||||
import { Downloader } from './interfaces.js'
|
||||
import Queue from '../../helpers/queue.js'
|
||||
import { Base, Item } from '../../types/types.js'
|
||||
import { Downloader } from '../interfaces.js'
|
||||
|
||||
export class MemoryDownloader implements Downloader {
|
||||
#items: Record<string, Base>
|
||||
#items: Map<string, Base>
|
||||
#rootId: string
|
||||
#results?: AsyncGeneratorQueue<Item>
|
||||
#results?: Queue<Item>
|
||||
|
||||
constructor(
|
||||
rootId: string,
|
||||
items: Record<string, Base>,
|
||||
results?: AsyncGeneratorQueue<Item>
|
||||
) {
|
||||
constructor(rootId: string, items: Map<string, Base>) {
|
||||
this.#rootId = rootId
|
||||
this.#items = items
|
||||
this.#results = results
|
||||
}
|
||||
initializePool(): void {}
|
||||
initializePool(params: {
|
||||
results: Queue<Item>
|
||||
total: number
|
||||
maxDownloadBatchWait?: number
|
||||
}): void {
|
||||
this.#results = params.results
|
||||
}
|
||||
downloadSingle(): Promise<Item> {
|
||||
const root = this.#items[this.#rootId]
|
||||
const root = this.#items.get(this.#rootId)
|
||||
if (root) {
|
||||
return Promise.resolve({ baseId: this.#rootId, base: root })
|
||||
}
|
||||
@@ -28,7 +29,7 @@ export class MemoryDownloader implements Downloader {
|
||||
return Promise.resolve()
|
||||
}
|
||||
add(id: string): void {
|
||||
const base = this.#items[id]
|
||||
const base = this.#items.get(id)
|
||||
if (base) {
|
||||
this.#results?.add({ baseId: id, base })
|
||||
return
|
||||
+44
-42
@@ -1,38 +1,28 @@
|
||||
import { describe, expect, test } from 'vitest'
|
||||
import createFetchMock from 'vitest-fetch-mock'
|
||||
import { vi } from 'vitest'
|
||||
import AsyncGeneratorQueue from '../helpers/asyncGeneratorQueue.js'
|
||||
import { Item } from '../types/types.js'
|
||||
import { Cache } from './interfaces.js'
|
||||
import { Item } from '../../types/types.js'
|
||||
import ServerDownloader from './serverDownloader.js'
|
||||
import { MemoryPump } from '../../helpers/memoryPump.js'
|
||||
|
||||
describe('downloader', () => {
|
||||
test('download batch of one', async () => {
|
||||
const fetchMocker = createFetchMock(vi)
|
||||
const i: Item = { baseId: 'id', base: { id: 'id', speckle_type: 'type' } }
|
||||
fetchMocker.mockResponseOnce('id\t' + JSON.stringify(i.base) + '\n')
|
||||
const results = new AsyncGeneratorQueue()
|
||||
const db = {
|
||||
async add(): Promise<void> {
|
||||
return Promise.resolve()
|
||||
}
|
||||
} as unknown as Cache
|
||||
const pump = new MemoryPump()
|
||||
const downloader = new ServerDownloader({
|
||||
database: db,
|
||||
results,
|
||||
serverUrl: 'http://speckle.test',
|
||||
streamId: 'streamId',
|
||||
objectId: 'objectId',
|
||||
token: 'token',
|
||||
|
||||
fetch: fetchMocker
|
||||
})
|
||||
downloader.initializePool({ total: 1, maxDownloadBatchWait: 200 })
|
||||
downloader.initializePool({ results: pump, total: 1, maxDownloadBatchWait: 200 })
|
||||
downloader.add('id')
|
||||
await downloader.disposeAsync()
|
||||
results.dispose()
|
||||
const r = []
|
||||
for await (const x of results.consume()) {
|
||||
for await (const x of pump.gather([i.baseId])) {
|
||||
r.push(x)
|
||||
}
|
||||
|
||||
@@ -46,15 +36,9 @@ describe('downloader', () => {
|
||||
fetchMocker.mockResponseOnce(
|
||||
'id1\t' + JSON.stringify(i1.base) + '\nid2\t' + JSON.stringify(i2.base) + '\n'
|
||||
)
|
||||
const results = new AsyncGeneratorQueue()
|
||||
const db = {
|
||||
async add(): Promise<void> {
|
||||
return Promise.resolve()
|
||||
}
|
||||
} as unknown as Cache
|
||||
|
||||
const pump = new MemoryPump()
|
||||
const downloader = new ServerDownloader({
|
||||
database: db,
|
||||
results,
|
||||
serverUrl: 'http://speckle.test',
|
||||
streamId: 'streamId',
|
||||
objectId: 'objectId',
|
||||
@@ -62,12 +46,46 @@ describe('downloader', () => {
|
||||
|
||||
fetch: fetchMocker
|
||||
})
|
||||
downloader.initializePool({ total: 2, maxDownloadBatchWait: 200 })
|
||||
downloader.initializePool({ results: pump, total: 2, maxDownloadBatchWait: 200 })
|
||||
downloader.add('id')
|
||||
await downloader.disposeAsync()
|
||||
results.dispose()
|
||||
const r = []
|
||||
for await (const x of results.consume()) {
|
||||
for await (const x of pump.gather([i1.baseId, i2.baseId])) {
|
||||
r.push(x)
|
||||
}
|
||||
|
||||
expect(r).toMatchSnapshot()
|
||||
})
|
||||
|
||||
test('download batch of three', async () => {
|
||||
const fetchMocker = createFetchMock(vi)
|
||||
const i1: Item = { baseId: 'id1', base: { id: 'id1', speckle_type: 'type' } }
|
||||
const i2: Item = { baseId: 'id2', base: { id: 'id2', speckle_type: 'type' } }
|
||||
const i3: Item = { baseId: 'id3', base: { id: 'id3', speckle_type: 'type' } }
|
||||
fetchMocker.mockResponseOnce(
|
||||
'id1\t' +
|
||||
JSON.stringify(i1.base) +
|
||||
'\nid2\t' +
|
||||
JSON.stringify(i2.base) +
|
||||
'\nid3\t' +
|
||||
JSON.stringify(i3.base) +
|
||||
'\n'
|
||||
)
|
||||
|
||||
const pump = new MemoryPump()
|
||||
const downloader = new ServerDownloader({
|
||||
serverUrl: 'http://speckle.test',
|
||||
streamId: 'streamId',
|
||||
objectId: 'objectId',
|
||||
token: 'token',
|
||||
|
||||
fetch: fetchMocker
|
||||
})
|
||||
downloader.initializePool({ results: pump, total: 2, maxDownloadBatchWait: 200 })
|
||||
downloader.add('id')
|
||||
await downloader.disposeAsync()
|
||||
const r = []
|
||||
for await (const x of pump.gather([i1.baseId, i2.baseId, i3.baseId])) {
|
||||
r.push(x)
|
||||
}
|
||||
|
||||
@@ -81,15 +99,7 @@ describe('downloader', () => {
|
||||
base: { id: 'id', speckle_type: 'type', __closure: { childIds: 1 } }
|
||||
}
|
||||
fetchMocker.mockResponseOnce(JSON.stringify(i.base))
|
||||
const results = new AsyncGeneratorQueue()
|
||||
const db = {
|
||||
async add(): Promise<void> {
|
||||
return Promise.resolve()
|
||||
}
|
||||
} as unknown as Cache
|
||||
const downloader = new ServerDownloader({
|
||||
database: db,
|
||||
results,
|
||||
serverUrl: 'http://speckle.test',
|
||||
streamId: 'streamId',
|
||||
objectId: i.baseId,
|
||||
@@ -111,17 +121,9 @@ describe('downloader', () => {
|
||||
(req) => req.headers.get('x-test') === 'asdf',
|
||||
JSON.stringify(i.base)
|
||||
)
|
||||
const results = new AsyncGeneratorQueue()
|
||||
const db = {
|
||||
async add(): Promise<void> {
|
||||
return Promise.resolve()
|
||||
}
|
||||
} as unknown as Cache
|
||||
const headers = new Headers()
|
||||
headers.set('x-test', 'asdf')
|
||||
const downloader = new ServerDownloader({
|
||||
database: db,
|
||||
results,
|
||||
serverUrl: 'http://speckle.test',
|
||||
headers,
|
||||
streamId: 'streamId',
|
||||
+83
-36
@@ -1,22 +1,33 @@
|
||||
import BatchedPool from '../helpers/batchedPool.js'
|
||||
import Queue from '../helpers/queue.js'
|
||||
import { ObjectLoaderRuntimeError } from '../types/errors.js'
|
||||
import { Fetcher, isBase, Item } from '../types/types.js'
|
||||
import { Downloader } from './interfaces.js'
|
||||
import { BaseDownloadOptions } from './options.js'
|
||||
import BatchedPool from '../../helpers/batchedPool.js'
|
||||
import Queue from '../../helpers/queue.js'
|
||||
import { ObjectLoaderRuntimeError } from '../../types/errors.js'
|
||||
import { Fetcher, isBase, Item } from '../../types/types.js'
|
||||
import { Downloader } from '../interfaces.js'
|
||||
|
||||
export interface ServerDownloaderOptions {
|
||||
serverUrl: string
|
||||
streamId: string
|
||||
objectId: string
|
||||
token?: string
|
||||
headers?: Headers
|
||||
fetch?: Fetcher
|
||||
}
|
||||
|
||||
export default class ServerDownloader implements Downloader {
|
||||
#requestUrlRootObj: string
|
||||
#requestUrlChildren: string
|
||||
#headers: HeadersInit
|
||||
#options: BaseDownloadOptions
|
||||
#options: ServerDownloaderOptions
|
||||
#fetch: Fetcher
|
||||
#results?: Queue<Item>
|
||||
|
||||
#downloadQueue?: BatchedPool<string>
|
||||
#decoder = new TextDecoder()
|
||||
|
||||
constructor(options: BaseDownloadOptions) {
|
||||
constructor(options: ServerDownloaderOptions) {
|
||||
this.#options = options
|
||||
this.#fetch = options.fetch ?? ((...args) => globalThis.fetch(...args))
|
||||
this.#fetch =
|
||||
options.fetch ?? ((...args): Promise<Response> => globalThis.fetch(...args))
|
||||
|
||||
this.#headers = {}
|
||||
if (options.headers) {
|
||||
@@ -45,17 +56,21 @@ export default class ServerDownloader implements Downloader {
|
||||
return [10000, 30000, 10000, 1000]
|
||||
}
|
||||
|
||||
initializePool(params: { total: number; maxDownloadBatchWait?: number }) {
|
||||
const { total } = params
|
||||
initializePool(params: {
|
||||
results: Queue<Item>
|
||||
total: number
|
||||
maxDownloadBatchWait?: number
|
||||
}): void {
|
||||
const { results, total } = params
|
||||
this.#results = results
|
||||
this.#downloadQueue = new BatchedPool<string>({
|
||||
concurrencyAndSizes: this.#getDownloadCountAndSizes(total),
|
||||
maxWaitTime: params.maxDownloadBatchWait,
|
||||
processFunction: (batch: string[]) =>
|
||||
processFunction: (batch: string[]): Promise<void> =>
|
||||
this.downloadBatch({
|
||||
batch,
|
||||
url: this.#requestUrlChildren,
|
||||
headers: this.#headers,
|
||||
results: this.#options.results
|
||||
headers: this.#headers
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -94,9 +109,8 @@ export default class ServerDownloader implements Downloader {
|
||||
batch: string[]
|
||||
url: string
|
||||
headers: HeadersInit
|
||||
results: Queue<Item>
|
||||
}): Promise<void> {
|
||||
const { batch, url, headers, results } = params
|
||||
const { batch, url, headers } = params
|
||||
const response = await this.#fetch(url, {
|
||||
method: 'POST',
|
||||
headers: { ...headers, 'Content-Type': 'application/json' },
|
||||
@@ -109,35 +123,67 @@ export default class ServerDownloader implements Downloader {
|
||||
}
|
||||
|
||||
const reader = response.body.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
let buffer = '' // Temporary buffer to store incoming chunks
|
||||
let leftover = new Uint8Array(0)
|
||||
|
||||
let count = 0
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
if (done) break
|
||||
// Decode the chunk and add to buffer
|
||||
buffer += decoder.decode(value, { stream: true })
|
||||
|
||||
// Try to process JSON objects from the buffer
|
||||
let boundary = buffer.indexOf('\n')
|
||||
while (boundary !== -1) {
|
||||
const jsonString = buffer.slice(0, boundary)
|
||||
buffer = buffer.slice(boundary + 1)
|
||||
boundary = buffer.indexOf('\n')
|
||||
if (jsonString) {
|
||||
const pieces = jsonString.split('\t')
|
||||
const [id, unparsedObj] = pieces
|
||||
const item = this.#processJson(id, unparsedObj)
|
||||
await this.#options.database.add(item)
|
||||
results.add(item)
|
||||
count++
|
||||
if (count % 1000 === 0) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100)) //allow other stuff to happen
|
||||
}
|
||||
leftover = await this.processArray(leftover, value, async () => {
|
||||
count++
|
||||
if (count % 1000 === 0) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100)) //allow other stuff to happen
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async processArray(
|
||||
leftover: Uint8Array,
|
||||
value: Uint8Array,
|
||||
callback: () => Promise<void>
|
||||
): Promise<Uint8Array> {
|
||||
//this concat will allocate a new array
|
||||
const combined = this.concatUint8Arrays(leftover, value)
|
||||
let start = 0
|
||||
|
||||
//subarray doesn't allocate
|
||||
for (let i = 0; i < combined.length; i++) {
|
||||
if (combined[i] === 0x0a) {
|
||||
const line = combined.subarray(start, i) // line without \n
|
||||
//strings are allocated here
|
||||
const item = this.processLine(line)
|
||||
this.#results?.add(item)
|
||||
start = i + 1
|
||||
await callback()
|
||||
}
|
||||
}
|
||||
return combined.subarray(start) // carry over remainder
|
||||
}
|
||||
|
||||
processLine(line: Uint8Array): Item {
|
||||
for (let i = 0; i < line.length; i++) {
|
||||
if (line[i] === 0x09) {
|
||||
//this is a tab
|
||||
const baseId = this.#decoder.decode(line.subarray(0, i))
|
||||
const json = line.subarray(i + 1)
|
||||
const base = this.#decoder.decode(json)
|
||||
const item = this.#processJson(baseId, base)
|
||||
item.size = json.length
|
||||
return item
|
||||
}
|
||||
}
|
||||
throw new ObjectLoaderRuntimeError(
|
||||
'Invalid line format: ' + this.#decoder.decode(line)
|
||||
)
|
||||
}
|
||||
|
||||
concatUint8Arrays(a: Uint8Array, b: Uint8Array): Uint8Array {
|
||||
const c = new Uint8Array(a.length + b.length)
|
||||
c.set(a, 0)
|
||||
c.set(b, a.length)
|
||||
return c
|
||||
}
|
||||
|
||||
async downloadSingle(): Promise<Item> {
|
||||
@@ -147,6 +193,7 @@ export default class ServerDownloader implements Downloader {
|
||||
this.#validateResponse(response)
|
||||
const responseText = await response.text()
|
||||
const item = this.#processJson(this.#options.objectId, responseText)
|
||||
item.size = 0
|
||||
return item
|
||||
}
|
||||
|
||||
@@ -1,167 +0,0 @@
|
||||
import BatchingQueue from '../helpers/batchingQueue.js'
|
||||
import Queue from '../helpers/queue.js'
|
||||
import { CustomLogger, Item } from '../types/types.js'
|
||||
import { isSafari, TIME } from '@speckle/shared'
|
||||
import { BaseDatabaseOptions } from './options.js'
|
||||
import { Cache } from './interfaces.js'
|
||||
import { Dexie, DexieOptions, Table } from 'dexie'
|
||||
|
||||
class ObjectStore extends Dexie {
|
||||
static #databaseName: string = 'speckle-cache'
|
||||
objects!: Table<Item, string> // Table type: <entity, primaryKey>
|
||||
|
||||
constructor(options: DexieOptions) {
|
||||
super(ObjectStore.#databaseName, options)
|
||||
|
||||
this.version(1).stores({
|
||||
objects: 'baseId, item' // baseId is primary key
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
export default class IndexedDatabase implements Cache {
|
||||
#options: BaseDatabaseOptions
|
||||
#logger: CustomLogger
|
||||
|
||||
#cacheDB?: ObjectStore
|
||||
|
||||
#writeQueue: BatchingQueue<Item> | undefined
|
||||
|
||||
// #count: number = 0
|
||||
|
||||
constructor(options: BaseDatabaseOptions) {
|
||||
this.#options = {
|
||||
...{
|
||||
maxCacheReadSize: 10000,
|
||||
maxCacheBatchWriteWait: 1000
|
||||
},
|
||||
...options
|
||||
}
|
||||
this.#logger = options.logger || (() => {})
|
||||
}
|
||||
|
||||
async add(item: Item): Promise<void> {
|
||||
if (!this.#writeQueue) {
|
||||
await this.#setupCacheDb()
|
||||
this.#writeQueue = new BatchingQueue<Item>({
|
||||
batchSize: this.#options.maxCacheWriteSize ?? 10000,
|
||||
maxWaitTime: this.#options.maxCacheBatchWriteWait,
|
||||
processFunction: (batch: Item[]) =>
|
||||
this.#cacheSaveBatch({ batch, cacheDB: this.#cacheDB! })
|
||||
})
|
||||
}
|
||||
this.#writeQueue.add(item)
|
||||
}
|
||||
|
||||
async disposeAsync(): Promise<void> {
|
||||
await this.#writeQueue?.disposeAsync()
|
||||
}
|
||||
|
||||
async #openDatabase(): Promise<ObjectStore> {
|
||||
const db = new ObjectStore({
|
||||
indexedDB: this.#options.indexedDB ?? globalThis.indexedDB,
|
||||
IDBKeyRange: this.#options.keyRange ?? IDBKeyRange,
|
||||
chromeTransactionDurability: 'relaxed'
|
||||
})
|
||||
await db.open()
|
||||
return db
|
||||
}
|
||||
|
||||
async #setupCacheDb(): Promise<void> {
|
||||
if (this.#cacheDB !== undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
// Initialize
|
||||
await this.#safariFix()
|
||||
this.#cacheDB = await this.#openDatabase()
|
||||
}
|
||||
|
||||
async processItems(params: {
|
||||
ids: string[]
|
||||
foundItems: Queue<Item>
|
||||
notFoundItems: Queue<string>
|
||||
}): Promise<void> {
|
||||
const { ids, foundItems, notFoundItems } = params
|
||||
await this.#setupCacheDb()
|
||||
const maxCacheReadSize = this.#options.maxCacheReadSize ?? 10000
|
||||
|
||||
for (let i = 0; i < ids.length; ) {
|
||||
if ((this.#writeQueue?.count() ?? 0) > maxCacheReadSize * 2) {
|
||||
this.#logger(
|
||||
'pausing reads (# in write queue: ' + this.#writeQueue?.count() + ')'
|
||||
)
|
||||
await new Promise((resolve) => setTimeout(resolve, TIME.second)) // Pause for 1 second, protects against out of memory
|
||||
continue
|
||||
}
|
||||
const batch = ids.slice(i, i + maxCacheReadSize)
|
||||
// const x = this.#count
|
||||
// this.#count++
|
||||
// const startTime = performance.now()
|
||||
// this.#logger('Start read ' + x + ' ' + batch.length)
|
||||
|
||||
//faster than BulkGet with dexie
|
||||
await this.#cacheDB!.transaction('r', this.#cacheDB!.objects, async () => {
|
||||
const gets = batch.map((key) => this.#cacheDB!.objects.get(key))
|
||||
const cachedData = await Promise.all(gets)
|
||||
for (let i = 0; i < cachedData.length; i++) {
|
||||
if (cachedData[i]) {
|
||||
foundItems.add(cachedData[i]!)
|
||||
} else {
|
||||
notFoundItems.add(batch[i])
|
||||
}
|
||||
}
|
||||
})
|
||||
// const endTime = performance.now()
|
||||
// const duration = endTime - startTime
|
||||
// this.#logger('Read batch ' + x + ' ' + batch.length + ' ' + duration / TIME_MS.second)
|
||||
|
||||
// interate down here to help with pausing
|
||||
i += maxCacheReadSize
|
||||
}
|
||||
}
|
||||
|
||||
async getItem(params: { id: string }): Promise<Item | undefined> {
|
||||
const { id } = params
|
||||
await this.#setupCacheDb()
|
||||
|
||||
return this.#cacheDB!.transaction('r', this.#cacheDB!.objects, async () => {
|
||||
return await this.#cacheDB?.objects.get(id)
|
||||
})
|
||||
}
|
||||
|
||||
async #cacheSaveBatch(params: {
|
||||
batch: Item[]
|
||||
cacheDB: ObjectStore
|
||||
}): Promise<void> {
|
||||
const { batch, cacheDB } = params
|
||||
//const x = this.#count
|
||||
//this.#count++
|
||||
|
||||
// const startTime = performance.now()
|
||||
// this.#logger('Start save ' + x + ' ' + batch.length)
|
||||
await cacheDB.objects.bulkPut(batch)
|
||||
// const endTime = performance.now()
|
||||
// const duration = endTime - startTime
|
||||
//this.#logger('Saved batch ' + x + ' ' + batch.length + ' ' + duration / TIME_MS.second)
|
||||
}
|
||||
|
||||
/**
|
||||
* Fixes a Safari bug where IndexedDB requests get lost and never resolve - invoke before you use IndexedDB
|
||||
* @link Credits and more info: https://github.com/jakearchibald/safari-14-idb-fix
|
||||
*/
|
||||
async #safariFix(): Promise<void> {
|
||||
// No point putting other browsers or older versions of Safari through this mess.
|
||||
if (!isSafari() || !this.#options.indexedDB?.databases) return Promise.resolve()
|
||||
|
||||
let intervalId: ReturnType<typeof setInterval>
|
||||
|
||||
return new Promise<void>((resolve: () => void) => {
|
||||
const tryIdb = () => this.#options.indexedDB?.databases().finally(resolve)
|
||||
intervalId = setInterval(() => {
|
||||
void tryIdb()
|
||||
}, 100)
|
||||
void tryIdb()
|
||||
}).finally(() => clearInterval(intervalId))
|
||||
}
|
||||
}
|
||||
@@ -1,20 +1,19 @@
|
||||
import Queue from '../helpers/queue.js'
|
||||
import { Item } from '../types/types.js'
|
||||
|
||||
export interface Cache {
|
||||
getItem(params: { id: string }): Promise<Item | undefined>
|
||||
processItems(params: {
|
||||
ids: string[]
|
||||
foundItems: Queue<Item>
|
||||
notFoundItems: Queue<string>
|
||||
}): Promise<void>
|
||||
|
||||
add(item: Item): Promise<void>
|
||||
disposeAsync(): Promise<void>
|
||||
}
|
||||
|
||||
export interface Downloader extends Queue<string> {
|
||||
initializePool(params: { total: number }): void
|
||||
initializePool(params: {
|
||||
results: Queue<Item>
|
||||
total: number
|
||||
maxDownloadBatchWait?: number
|
||||
}): void
|
||||
downloadSingle(): Promise<Item>
|
||||
disposeAsync(): Promise<void>
|
||||
}
|
||||
|
||||
export interface Database {
|
||||
getAll(keys: string[]): Promise<(Item | undefined)[]>
|
||||
getItem(params: { id: string }): Promise<Item | undefined>
|
||||
cacheSaveBatch(params: { batch: Item[] }): Promise<void>
|
||||
disposeAsync(): Promise<void>
|
||||
}
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
import Queue from '../helpers/queue.js'
|
||||
import { Base, Item } from '../types/types.js'
|
||||
import { Cache } from './interfaces.js'
|
||||
import { MemoryDatabaseOptions } from './options.js'
|
||||
|
||||
export class MemoryDatabase implements Cache {
|
||||
#items: Record<string, Base>
|
||||
constructor(options?: MemoryDatabaseOptions) {
|
||||
this.#items = options?.items || {}
|
||||
}
|
||||
|
||||
getItem(params: { id: string }): Promise<Item | undefined> {
|
||||
const item = this.#items[params.id]
|
||||
if (item) {
|
||||
return Promise.resolve({ baseId: params.id, base: item })
|
||||
}
|
||||
return Promise.resolve(undefined)
|
||||
}
|
||||
processItems(params: {
|
||||
ids: string[]
|
||||
foundItems: Queue<Item>
|
||||
notFoundItems: Queue<string>
|
||||
}): Promise<void> {
|
||||
const { ids, foundItems, notFoundItems } = params
|
||||
for (const id of ids) {
|
||||
const item = this.#items[id]
|
||||
if (item) {
|
||||
foundItems.add({ baseId: id, base: item })
|
||||
} else {
|
||||
notFoundItems.add(id)
|
||||
}
|
||||
}
|
||||
return Promise.resolve()
|
||||
}
|
||||
add(item: Item): Promise<void> {
|
||||
this.#items[item.baseId] = item.base
|
||||
return Promise.resolve()
|
||||
}
|
||||
disposeAsync(): Promise<void> {
|
||||
return Promise.resolve()
|
||||
}
|
||||
}
|
||||
@@ -1,56 +1,45 @@
|
||||
import { describe, expect, test } from 'vitest'
|
||||
import ObjectLoader2 from './objectLoader2.js'
|
||||
import { ObjectLoader2 } from './objectLoader2.js'
|
||||
import { Base, Item } from '../types/types.js'
|
||||
import { Cache, Downloader } from './interfaces.js'
|
||||
import Queue from '../helpers/queue.js'
|
||||
import { MemoryDatabase } from './memoryDatabase.js'
|
||||
import { MemoryDownloader } from './memoryDownloader.js'
|
||||
import AsyncGeneratorQueue from '../helpers/asyncGeneratorQueue.js'
|
||||
import { MemoryDownloader } from './downloaders/memoryDownloader.js'
|
||||
import { IDBFactory, IDBKeyRange } from 'fake-indexeddb'
|
||||
import { MemoryDatabase } from './databases/memoryDatabase.js'
|
||||
import IndexedDatabase from './databases/indexedDatabase.js'
|
||||
|
||||
describe('objectloader2', () => {
|
||||
test('can get a root object from cache', async () => {
|
||||
const root = { baseId: 'baseId' } as unknown as Item
|
||||
const cache = {
|
||||
getItem(params: { id: string }): Promise<Item> {
|
||||
expect(params.id).toBe(root.baseId)
|
||||
return Promise.resolve(root)
|
||||
}
|
||||
} as Cache
|
||||
const downloader = {} as Downloader
|
||||
const rootId = 'baseId'
|
||||
const rootBase: Base = { id: 'baseId', speckle_type: 'type' }
|
||||
const downloader = new MemoryDownloader(
|
||||
rootId,
|
||||
new Map<string, Base>([[rootId, rootBase]])
|
||||
)
|
||||
const loader = new ObjectLoader2({
|
||||
serverUrl: 'a',
|
||||
streamId: 'b',
|
||||
objectId: root.baseId,
|
||||
cache,
|
||||
downloader
|
||||
rootId,
|
||||
downloader,
|
||||
database: new IndexedDatabase({
|
||||
indexedDB: new IDBFactory(),
|
||||
keyRange: IDBKeyRange
|
||||
})
|
||||
})
|
||||
const x = await loader.getRootObject()
|
||||
expect(x).toMatchSnapshot()
|
||||
})
|
||||
|
||||
test('can get a root object from downloader', async () => {
|
||||
const root = { baseId: 'baseId' } as unknown as Item
|
||||
const cache = {
|
||||
getItem(params: { id: string }): Promise<Item | undefined> {
|
||||
expect(params.id).toBe(root.baseId)
|
||||
return Promise.resolve<Item | undefined>(undefined)
|
||||
},
|
||||
add(item: Item): Promise<void> {
|
||||
expect(item).toBe(root)
|
||||
return Promise.resolve()
|
||||
}
|
||||
} as Cache
|
||||
const downloader = {
|
||||
downloadSingle(): Promise<Item> {
|
||||
return Promise.resolve(root)
|
||||
}
|
||||
} as Downloader
|
||||
const rootId = 'baseId'
|
||||
const rootBase: Base = { id: 'baseId', speckle_type: 'type' }
|
||||
const downloader = new MemoryDownloader(
|
||||
rootId,
|
||||
new Map<string, Base>([[rootId, rootBase]])
|
||||
)
|
||||
const loader = new ObjectLoader2({
|
||||
serverUrl: 'a',
|
||||
streamId: 'b',
|
||||
objectId: root.baseId,
|
||||
cache,
|
||||
downloader
|
||||
rootId,
|
||||
downloader,
|
||||
database: new IndexedDatabase({
|
||||
indexedDB: new IDBFactory(),
|
||||
keyRange: IDBKeyRange
|
||||
})
|
||||
})
|
||||
const x = await loader.getRootObject()
|
||||
expect(x).toMatchSnapshot()
|
||||
@@ -59,20 +48,18 @@ describe('objectloader2', () => {
|
||||
test('can get single object from cache using iterator', async () => {
|
||||
const rootId = 'baseId'
|
||||
const rootBase: Base = { id: 'baseId', speckle_type: 'type' }
|
||||
const root = { baseId: rootId, base: rootBase } as unknown as Item
|
||||
const cache = {
|
||||
getItem(params: { id: string }): Promise<Item | undefined> {
|
||||
expect(params.id).toBe(rootId)
|
||||
return Promise.resolve(root)
|
||||
}
|
||||
} as Cache
|
||||
const downloader = {} as Downloader
|
||||
|
||||
const downloader = new MemoryDownloader(
|
||||
rootId,
|
||||
new Map<string, Base>([[rootId, rootBase]])
|
||||
)
|
||||
const loader = new ObjectLoader2({
|
||||
serverUrl: 'a',
|
||||
streamId: 'b',
|
||||
objectId: rootId,
|
||||
cache,
|
||||
downloader
|
||||
rootId,
|
||||
downloader,
|
||||
database: new IndexedDatabase({
|
||||
indexedDB: new IDBFactory(),
|
||||
keyRange: IDBKeyRange
|
||||
})
|
||||
})
|
||||
const r = []
|
||||
for await (const x of loader.getObjectIterator()) {
|
||||
@@ -82,63 +69,6 @@ describe('objectloader2', () => {
|
||||
expect(r).toMatchSnapshot()
|
||||
})
|
||||
|
||||
test('can get root/child object from cache using iterator', async () => {
|
||||
const child1Base = { id: 'child1Id' }
|
||||
const child1 = { baseId: 'child1Id', base: child1Base } as unknown as Item
|
||||
|
||||
const rootId = 'rootId'
|
||||
const rootBase: Base = {
|
||||
id: 'rootId',
|
||||
speckle_type: 'type',
|
||||
__closure: { child1Id: 100 }
|
||||
}
|
||||
const root = {
|
||||
baseId: rootId,
|
||||
base: rootBase
|
||||
} as unknown as Item
|
||||
|
||||
const cache = {
|
||||
getItem(params: { id: string }): Promise<Item | undefined> {
|
||||
expect(params.id).toBe(root.baseId)
|
||||
return Promise.resolve(root)
|
||||
},
|
||||
processItems(params: {
|
||||
ids: string[]
|
||||
foundItems: Queue<Item>
|
||||
|
||||
notFoundItems: Queue<string>
|
||||
}): Promise<void> {
|
||||
expect(params.ids.length).toBe(1)
|
||||
expect(params.ids[0]).toBe(child1.baseId)
|
||||
params.foundItems.add(child1)
|
||||
return Promise.resolve()
|
||||
},
|
||||
disposeAsync(): Promise<void> {
|
||||
return Promise.resolve()
|
||||
}
|
||||
} as Cache
|
||||
const downloader = {
|
||||
initializePool(params: { total: number }): void {
|
||||
expect(params.total).toBe(1)
|
||||
},
|
||||
disposeAsync(): Promise<void> {
|
||||
return Promise.resolve()
|
||||
}
|
||||
} as Downloader
|
||||
const loader = new ObjectLoader2({
|
||||
serverUrl: 'a',
|
||||
streamId: 'b',
|
||||
objectId: root.baseId,
|
||||
cache,
|
||||
downloader
|
||||
})
|
||||
const r = []
|
||||
for await (const x of loader.getObjectIterator()) {
|
||||
r.push(x)
|
||||
}
|
||||
expect(r).toMatchSnapshot()
|
||||
})
|
||||
|
||||
test('can get root/child object from memory cache using iterator and getObject', async () => {
|
||||
const child1Base = { id: 'child1Id', speckle_type: 'type' } as Base
|
||||
const child1 = { baseId: 'child1Id', base: child1Base } as unknown as Item
|
||||
@@ -152,19 +82,18 @@ describe('objectloader2', () => {
|
||||
const root = {
|
||||
baseId: rootId,
|
||||
base: rootBase
|
||||
} as unknown as Item
|
||||
} as Item
|
||||
|
||||
const records: Record<string, Base> = {}
|
||||
records[root.baseId] = rootBase
|
||||
records[child1.baseId] = child1Base
|
||||
const records: Map<string, Base> = new Map<string, Base>()
|
||||
records.set(root.baseId, rootBase)
|
||||
records.set(child1.baseId, child1Base)
|
||||
|
||||
const loader = new ObjectLoader2({
|
||||
serverUrl: 'a',
|
||||
streamId: 'b',
|
||||
objectId: root.baseId,
|
||||
cache: new MemoryDatabase({ items: records }),
|
||||
downloader: new MemoryDownloader(rootId, records)
|
||||
rootId: root.baseId,
|
||||
downloader: new MemoryDownloader(rootId, records),
|
||||
database: new MemoryDatabase({ items: records })
|
||||
})
|
||||
|
||||
const r = []
|
||||
const obj = loader.getObject({ id: child1.baseId })
|
||||
for await (const x of loader.getObjectIterator()) {
|
||||
@@ -193,18 +122,17 @@ describe('objectloader2', () => {
|
||||
base: rootBase
|
||||
} as unknown as Item
|
||||
|
||||
const records: Record<string, Base> = {}
|
||||
records[root.baseId] = rootBase
|
||||
records[child1.baseId] = child1Base
|
||||
const records: Map<string, Base> = new Map<string, Base>()
|
||||
records.set(root.baseId, rootBase)
|
||||
records.set(child1.baseId, child1Base)
|
||||
|
||||
const results: AsyncGeneratorQueue<Item> = new AsyncGeneratorQueue<Item>()
|
||||
const loader = new ObjectLoader2({
|
||||
serverUrl: 'a',
|
||||
streamId: 'b',
|
||||
objectId: root.baseId,
|
||||
results,
|
||||
cache: new MemoryDatabase(),
|
||||
downloader: new MemoryDownloader(rootId, records, results)
|
||||
rootId: root.baseId,
|
||||
downloader: new MemoryDownloader(rootId, records),
|
||||
database: new IndexedDatabase({
|
||||
indexedDB: new IDBFactory(),
|
||||
keyRange: IDBKeyRange
|
||||
})
|
||||
})
|
||||
const r = []
|
||||
const obj = loader.getObject({ id: child1.baseId })
|
||||
@@ -220,26 +148,30 @@ describe('objectloader2', () => {
|
||||
})
|
||||
|
||||
test('add extra header', async () => {
|
||||
const root = { baseId: 'baseId' } as unknown as Item
|
||||
const cache = {
|
||||
getItem(params: { id: string }): Promise<Item> {
|
||||
expect(params.id).toBe(root.baseId)
|
||||
return Promise.resolve(root)
|
||||
}
|
||||
} as Cache
|
||||
const downloader = {} as Downloader
|
||||
const rootId = 'rootId'
|
||||
const rootBase: Base = {
|
||||
id: 'rootId',
|
||||
speckle_type: 'type',
|
||||
__closure: { child1Id: 100 }
|
||||
}
|
||||
const root = {
|
||||
baseId: rootId,
|
||||
base: rootBase
|
||||
} as Item
|
||||
|
||||
const records: Map<string, Base> = new Map<string, Base>()
|
||||
records.set(root.baseId, rootBase)
|
||||
const headers = new Headers()
|
||||
headers.set('x-test', 'asdf')
|
||||
const loader = new ObjectLoader2({
|
||||
serverUrl: 'a',
|
||||
streamId: 'b',
|
||||
objectId: root.baseId,
|
||||
headers,
|
||||
cache,
|
||||
downloader
|
||||
rootId: root.baseId,
|
||||
downloader: new MemoryDownloader(rootId, records),
|
||||
database: new IndexedDatabase({
|
||||
indexedDB: new IDBFactory(),
|
||||
keyRange: IDBKeyRange
|
||||
})
|
||||
})
|
||||
const x = await loader.getRootObject()
|
||||
expect(x).toBe(root)
|
||||
expect(x).toMatchSnapshot()
|
||||
})
|
||||
|
||||
|
||||
@@ -1,81 +1,79 @@
|
||||
import AsyncGeneratorQueue from '../helpers/asyncGeneratorQueue.js'
|
||||
import { Cache, Downloader } from './interfaces.js'
|
||||
import IndexedDatabase from './indexedDatabase.js'
|
||||
import ServerDownloader from './serverDownloader.js'
|
||||
import { Downloader, Database } from './interfaces.js'
|
||||
import { CustomLogger, Base, Item } from '../types/types.js'
|
||||
import { ObjectLoader2Options } from './options.js'
|
||||
import { MemoryDownloader } from './memoryDownloader.js'
|
||||
import { MemoryDatabase } from './memoryDatabase.js'
|
||||
import { CacheOptions, ObjectLoader2Options } from './options.js'
|
||||
import { DefermentManager } from '../helpers/defermentManager.js'
|
||||
import { CacheReader } from '../helpers/cacheReader.js'
|
||||
import { CachePump } from '../helpers/cachePump.js'
|
||||
import AggregateQueue from '../helpers/aggregateQueue.js'
|
||||
import { ObjectLoader2Factory } from './objectLoader2Factory.js'
|
||||
|
||||
export default class ObjectLoader2 {
|
||||
#objectId: string
|
||||
export class ObjectLoader2 {
|
||||
#rootId: string
|
||||
|
||||
#logger: CustomLogger
|
||||
|
||||
#database: Cache
|
||||
#database: Database
|
||||
#downloader: Downloader
|
||||
#pump: CachePump
|
||||
#cache: CacheReader
|
||||
|
||||
#deferments: DefermentManager
|
||||
|
||||
#gathered: AsyncGeneratorQueue<Item>
|
||||
|
||||
constructor(options: ObjectLoader2Options) {
|
||||
this.#objectId = options.objectId
|
||||
#root?: Item = undefined
|
||||
|
||||
constructor(options: ObjectLoader2Options) {
|
||||
this.#rootId = options.rootId
|
||||
this.#logger = options.logger || console.log
|
||||
this.#gathered = options.results || new AsyncGeneratorQueue()
|
||||
this.#deferments = new DefermentManager()
|
||||
this.#database =
|
||||
options.cache ||
|
||||
new IndexedDatabase({
|
||||
logger: this.#logger,
|
||||
maxCacheReadSize: 10_000,
|
||||
maxCacheWriteSize: 5_000,
|
||||
indexedDB: options.indexedDB,
|
||||
keyRange: options.keyRange
|
||||
})
|
||||
this.#downloader =
|
||||
options.downloader ||
|
||||
new ServerDownloader({
|
||||
database: this.#database,
|
||||
results: this.#gathered,
|
||||
serverUrl: options.serverUrl,
|
||||
streamId: options.streamId,
|
||||
objectId: this.#objectId,
|
||||
token: options.token,
|
||||
headers: options.headers
|
||||
})
|
||||
|
||||
const cacheOptions: CacheOptions = {
|
||||
logger: this.#logger,
|
||||
maxCacheReadSize: 10_000,
|
||||
maxCacheWriteSize: 10_000,
|
||||
maxWriteQueueSize: 40_000,
|
||||
maxCacheBatchWriteWait: 3_000,
|
||||
maxCacheBatchReadWait: 3_000
|
||||
}
|
||||
|
||||
this.#gathered = new AsyncGeneratorQueue()
|
||||
this.#database = options.database
|
||||
this.#deferments = new DefermentManager({
|
||||
maxSizeInMb: 2_000, // 2 GBs
|
||||
ttlms: 5_000, // 5 seconds
|
||||
logger: this.#logger
|
||||
})
|
||||
this.#cache = new CacheReader(this.#database, this.#deferments, cacheOptions)
|
||||
this.#pump = new CachePump(
|
||||
this.#database,
|
||||
this.#gathered,
|
||||
this.#deferments,
|
||||
cacheOptions
|
||||
)
|
||||
this.#downloader = options.downloader
|
||||
}
|
||||
|
||||
async disposeAsync(): Promise<void> {
|
||||
await Promise.all([
|
||||
this.#database.disposeAsync(),
|
||||
this.#downloader.disposeAsync(),
|
||||
this.#gathered.dispose()
|
||||
])
|
||||
await Promise.all([this.#downloader.disposeAsync(), this.#cache.disposeAsync()])
|
||||
this.#deferments.dispose()
|
||||
}
|
||||
|
||||
async getRootObject(): Promise<Item | undefined> {
|
||||
const cachedRootObject = await this.#database.getItem({ id: this.#objectId })
|
||||
if (cachedRootObject) {
|
||||
return cachedRootObject
|
||||
if (!this.#root) {
|
||||
this.#root = await this.#database.getItem({ id: this.#rootId })
|
||||
if (!this.#root) {
|
||||
this.#root = await this.#downloader.downloadSingle()
|
||||
}
|
||||
}
|
||||
const rootItem = await this.#downloader.downloadSingle()
|
||||
|
||||
await this.#database.add(rootItem)
|
||||
return rootItem
|
||||
return this.#root
|
||||
}
|
||||
|
||||
async getObject(params: { id: string }): Promise<Base> {
|
||||
const item = await this.#database.getItem({ id: params.id })
|
||||
if (item) {
|
||||
return item.base
|
||||
}
|
||||
return await this.#deferments.defer({ id: params.id })
|
||||
return await this.#cache.getObject({ id: params.id })
|
||||
}
|
||||
|
||||
async getTotalObjectCount() {
|
||||
async getTotalObjectCount(): Promise<number> {
|
||||
const rootObj = await this.getRootObject()
|
||||
const totalChildrenCount = Object.keys(rootObj?.base.__closure || {}).length
|
||||
return totalChildrenCount + 1 //count the root
|
||||
@@ -87,47 +85,27 @@ export default class ObjectLoader2 {
|
||||
this.#logger('No root object found!')
|
||||
return
|
||||
}
|
||||
//only for root
|
||||
this.#pump.add(rootItem)
|
||||
yield rootItem.base
|
||||
if (!rootItem.base.__closure) return
|
||||
|
||||
const children = Object.keys(rootItem.base.__closure)
|
||||
const total = children.length
|
||||
this.#downloader.initializePool({ total })
|
||||
const processPromise = this.#database.processItems({
|
||||
ids: children,
|
||||
foundItems: this.#gathered,
|
||||
notFoundItems: this.#downloader
|
||||
this.#downloader.initializePool({
|
||||
results: new AggregateQueue(this.#gathered, this.#pump),
|
||||
total
|
||||
})
|
||||
let count = 0
|
||||
for await (const item of this.#gathered.consume()) {
|
||||
this.#deferments.undefer(item)
|
||||
for await (const item of this.#pump.gather(children, this.#downloader)) {
|
||||
yield item.base
|
||||
count++
|
||||
if (count >= total) {
|
||||
await this.disposeAsync()
|
||||
}
|
||||
}
|
||||
await processPromise
|
||||
}
|
||||
|
||||
static createFromObjects(objects: Base[]): ObjectLoader2 {
|
||||
const root = objects[0]
|
||||
const records: Record<string, Base> = {}
|
||||
objects.forEach((element) => {
|
||||
records[element.id] = element
|
||||
})
|
||||
const loader = new ObjectLoader2({
|
||||
serverUrl: 'dummy',
|
||||
streamId: 'dummy',
|
||||
objectId: root.id,
|
||||
cache: new MemoryDatabase({ items: records }),
|
||||
downloader: new MemoryDownloader(root.id, records)
|
||||
})
|
||||
return loader
|
||||
return ObjectLoader2Factory.createFromObjects(objects)
|
||||
}
|
||||
|
||||
static createFromJSON(json: string): ObjectLoader2 {
|
||||
const jsonObj = JSON.parse(json) as Base[]
|
||||
return this.createFromObjects(jsonObj)
|
||||
return ObjectLoader2Factory.createFromJSON(json)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
import { Base, CustomLogger } from '../types/types.js'
|
||||
import IndexedDatabase from './databases/indexedDatabase.js'
|
||||
import { MemoryDatabase } from './databases/memoryDatabase.js'
|
||||
import { MemoryDownloader } from './downloaders/memoryDownloader.js'
|
||||
import ServerDownloader from './downloaders/serverDownloader.js'
|
||||
import { ObjectLoader2 } from './objectLoader2.js'
|
||||
|
||||
export class ObjectLoader2Factory {
|
||||
static createFromObjects(objects: Base[]): ObjectLoader2 {
|
||||
const root = objects[0]
|
||||
const records: Map<string, Base> = new Map<string, Base>()
|
||||
objects.forEach((element) => {
|
||||
records.set(element.id, element)
|
||||
})
|
||||
const loader = new ObjectLoader2({
|
||||
rootId: root.id,
|
||||
database: new MemoryDatabase({ items: records }),
|
||||
downloader: new MemoryDownloader(root.id, records)
|
||||
})
|
||||
return loader
|
||||
}
|
||||
|
||||
static createFromJSON(json: string): ObjectLoader2 {
|
||||
const jsonObj = JSON.parse(json) as Base[]
|
||||
return this.createFromObjects(jsonObj)
|
||||
}
|
||||
|
||||
static createFromUrl(params: {
|
||||
serverUrl: string
|
||||
streamId: string
|
||||
objectId: string
|
||||
token?: string
|
||||
headers?: Headers
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
|
||||
keyRange?: { bound: Function; lowerBound: Function; upperBound: Function }
|
||||
indexedDB?: IDBFactory
|
||||
logger?: CustomLogger
|
||||
}): ObjectLoader2 {
|
||||
const loader = new ObjectLoader2({
|
||||
rootId: params.objectId,
|
||||
downloader: new ServerDownloader({
|
||||
serverUrl: params.serverUrl,
|
||||
streamId: params.streamId,
|
||||
objectId: params.objectId,
|
||||
token: params.token,
|
||||
headers: params.headers
|
||||
}),
|
||||
database: new IndexedDatabase({
|
||||
logger: params.logger,
|
||||
indexedDB: params.indexedDB,
|
||||
keyRange: params.keyRange
|
||||
})
|
||||
})
|
||||
return loader
|
||||
}
|
||||
}
|
||||
@@ -1,48 +1,29 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-function-type */
|
||||
import AsyncGeneratorQueue from '../helpers/asyncGeneratorQueue.js'
|
||||
import Queue from '../helpers/queue.js'
|
||||
import { Base, CustomLogger, Fetcher, Item } from '../types/types.js'
|
||||
import { Cache, Downloader } from './interfaces.js'
|
||||
import { Base, CustomLogger } from '../types/types.js'
|
||||
import { Downloader, Database } from './interfaces.js'
|
||||
|
||||
export interface ObjectLoader2Options {
|
||||
keyRange?: { bound: Function; lowerBound: Function; upperBound: Function }
|
||||
indexedDB?: IDBFactory
|
||||
serverUrl: string
|
||||
streamId: string
|
||||
objectId: string
|
||||
token?: string
|
||||
rootId: string
|
||||
downloader: Downloader
|
||||
database: Database
|
||||
logger?: CustomLogger
|
||||
headers?: Headers
|
||||
results?: AsyncGeneratorQueue<Item>
|
||||
cache?: Cache
|
||||
downloader?: Downloader
|
||||
}
|
||||
export interface BaseDatabaseOptions {
|
||||
logger?: CustomLogger
|
||||
indexedDB?: IDBFactory
|
||||
keyRange?: {
|
||||
bound: Function
|
||||
lowerBound: Function
|
||||
upperBound: Function
|
||||
}
|
||||
maxCacheReadSize?: number
|
||||
maxCacheWriteSize?: number
|
||||
maxCacheBatchWriteWait?: number
|
||||
}
|
||||
|
||||
export interface BaseDownloadOptions {
|
||||
serverUrl: string
|
||||
streamId: string
|
||||
objectId: string
|
||||
token?: string
|
||||
headers?: Headers
|
||||
|
||||
fetch?: Fetcher
|
||||
database: Cache
|
||||
results: Queue<Item>
|
||||
export interface CacheOptions {
|
||||
logger?: CustomLogger
|
||||
maxCacheReadSize: number
|
||||
maxCacheWriteSize: number
|
||||
maxCacheBatchWriteWait: number
|
||||
maxCacheBatchReadWait: number
|
||||
maxWriteQueueSize: number
|
||||
}
|
||||
|
||||
export interface MemoryDatabaseOptions {
|
||||
logger?: CustomLogger
|
||||
items?: Record<string, Base>
|
||||
items?: Map<string, Base>
|
||||
}
|
||||
|
||||
export interface DefermentManagerOptions {
|
||||
logger?: CustomLogger
|
||||
maxSizeInMb: number
|
||||
ttlms: number
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { describe, expect, test } from 'vitest'
|
||||
import { Base } from '../types/types.js'
|
||||
import ObjectLoader2 from './objectLoader2.js'
|
||||
import { ObjectLoader2 } from './objectLoader2.js'
|
||||
import Traverser from './traverser.js'
|
||||
|
||||
describe('Traverser', () => {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { Base, DataChunk, isBase, isReference, isScalar } from '../types/types.js'
|
||||
import ObjectLoader2 from './objectLoader2.js'
|
||||
import { ObjectLoader2 } from './objectLoader2.js'
|
||||
|
||||
export type ProgressStage = 'download' | 'construction'
|
||||
export type OnProgress = (e: {
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import { describe, test, expect } from 'vitest'
|
||||
import { IDBFactory, IDBKeyRange } from 'fake-indexeddb'
|
||||
import ObjectLoader2 from '../operations/objectLoader2.js'
|
||||
import { Base } from '../types/types.js'
|
||||
import { TIME } from '@speckle/shared'
|
||||
import { TIME_MS } from '@speckle/shared'
|
||||
import { ObjectLoader2Factory } from '../operations/objectLoader2Factory.js'
|
||||
|
||||
describe('e2e', () => {
|
||||
test(
|
||||
@@ -10,7 +10,7 @@ describe('e2e', () => {
|
||||
async () => {
|
||||
// Revit sample house (good for bim-like stuff with many display meshes)
|
||||
//const resource = 'https://app.speckle.systems/streams/da9e320dad/commits/5388ef24b8'
|
||||
const loader = new ObjectLoader2({
|
||||
const loader = ObjectLoader2Factory.createFromUrl({
|
||||
serverUrl: 'https://app.speckle.systems',
|
||||
streamId: 'da9e320dad',
|
||||
objectId: '31d10c0cea569a1e26809658ed27e281',
|
||||
@@ -36,6 +36,6 @@ describe('e2e', () => {
|
||||
expect(base2).toBeDefined()
|
||||
expect(base2.id).toBe('3841e3cbc45d52c47bc2f1b7b0ad4eb9')
|
||||
},
|
||||
10 * TIME.second
|
||||
10 * TIME_MS.second
|
||||
)
|
||||
})
|
||||
|
||||
@@ -8,6 +8,7 @@ export type Fetcher = (
|
||||
export interface Item {
|
||||
baseId: string
|
||||
base: Base
|
||||
size?: number
|
||||
}
|
||||
|
||||
export interface Base {
|
||||
|
||||
@@ -30,14 +30,14 @@ COPY packages/frontend-2/type-augmentations/stubs ./packages/frontend-2/type-aug
|
||||
COPY packages/preview-frontend/package.json ./packages/preview-frontend/
|
||||
COPY packages/preview-service/package.json ./packages/preview-service/
|
||||
COPY packages/viewer/package.json ./packages/viewer/
|
||||
COPY packages/objectloader/package.json ./packages/objectloader/
|
||||
COPY packages/objectloader2/package.json ./packages/objectloader2/
|
||||
COPY packages/shared/package.json ./packages/shared/
|
||||
|
||||
RUN PUPPETEER_SKIP_DOWNLOAD=true PLAYWRIGHT_SKIP_BROWSER_DOWNLOAD=1 yarn workspaces focus -A
|
||||
|
||||
# Onyl copy in the relevant source files for the dependencies
|
||||
# Only copy in the relevant source files for the dependencies
|
||||
COPY packages/shared ./packages/shared/
|
||||
COPY packages/objectloader ./packages/objectloader/
|
||||
COPY packages/objectloader2 ./packages/objectloader2/
|
||||
COPY packages/viewer ./packages/viewer/
|
||||
COPY packages/preview-frontend ./packages/preview-frontend/
|
||||
COPY packages/preview-service ./packages/preview-service/
|
||||
|
||||
@@ -55,7 +55,7 @@ import Bright from '../assets/hdri/Bright.png'
|
||||
import { Euler, Vector3, Box3, Color, LinearFilter } from 'three'
|
||||
import { GeometryType } from '@speckle/viewer'
|
||||
import { MeshBatch } from '@speckle/viewer'
|
||||
import ObjectLoader2 from '@speckle/objectloader2'
|
||||
import { ObjectLoader2Factory } from '@speckle/objectloader2'
|
||||
|
||||
export default class Sandbox {
|
||||
private viewer: Viewer
|
||||
@@ -1312,10 +1312,16 @@ export default class Sandbox {
|
||||
true,
|
||||
undefined
|
||||
)
|
||||
let progress = 0
|
||||
/** Too spammy */
|
||||
loader.on(LoaderEvent.LoadProgress, (arg: { progress: number; id: string }) => {
|
||||
if (colorImage)
|
||||
colorImage.style.clipPath = `inset(${(1 - arg.progress) * 100}% 0 0 0)`
|
||||
const p = Math.floor(arg.progress * 100)
|
||||
if (p > progress) {
|
||||
if (colorImage)
|
||||
colorImage.style.clipPath = `inset(${(1 - arg.progress) * 100}% 0 0 0)`
|
||||
progress = p
|
||||
console.log(`Loading ${p}%`)
|
||||
}
|
||||
})
|
||||
loader.on(LoaderEvent.LoadCancelled, (resource: string) => {
|
||||
console.warn(`Resource ${resource} loading was canceled`)
|
||||
@@ -1372,7 +1378,7 @@ export default class Sandbox {
|
||||
options: { enableCaching: true }
|
||||
})*/
|
||||
|
||||
const loader = new ObjectLoader2({
|
||||
const loader = ObjectLoader2Factory.createFromUrl({
|
||||
serverUrl,
|
||||
streamId,
|
||||
objectId,
|
||||
|
||||
@@ -111,8 +111,10 @@ const getStream = () => {
|
||||
return (
|
||||
// prettier-ignore
|
||||
// Revit sample house (good for bim-like stuff with many display meshes)
|
||||
// 'https://app.speckle.systems/streams/da9e320dad/commits/5388ef24b8'
|
||||
'https://app.speckle.systems/streams/da9e320dad/commits/5388ef24b8'
|
||||
// 'https://latest.speckle.systems/streams/c1faab5c62/commits/ab1a1ab2b6'
|
||||
// 'https://app.speckle.systems/streams/da9e320dad/commits/5388ef24b8'
|
||||
// 'https://latest.speckle.systems/streams/58b5648c4d/commits/60371ecb2d'
|
||||
// 'Super' heavy revit shit
|
||||
// 'https://app.speckle.systems/streams/e6f9156405/commits/0694d53bb5'
|
||||
// IFC building (good for a tree based structure)
|
||||
@@ -529,7 +531,7 @@ const getStream = () => {
|
||||
// 'https://app.speckle.systems/projects/f7bb16037a/models/5d090c6f07'
|
||||
|
||||
// Large topological stuff
|
||||
'https://app.speckle.systems/projects/7a489ac0d4/models/146d5fbe27,3e481c9a58,65b4cf97d5,6d07577256,903850fa6f'
|
||||
// 'https://app.speckle.systems/projects/7a489ac0d4/models/146d5fbe27,3e481c9a58,65b4cf97d5,6d07577256,903850fa6f'
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -52,7 +52,7 @@
|
||||
"threejs"
|
||||
],
|
||||
"dependencies": {
|
||||
"@speckle/objectloader": "workspace:^",
|
||||
"@speckle/objectloader2": "workspace:^",
|
||||
"@speckle/shared": "workspace:^",
|
||||
"@types/flat": "^5.0.2",
|
||||
"flat": "^5.0.2",
|
||||
|
||||
@@ -3,8 +3,8 @@ import { MathUtils, Matrix4 } from 'three'
|
||||
import { type TreeNode, WorldTree } from '../../tree/WorldTree.js'
|
||||
import { NodeMap } from '../../tree/NodeMap.js'
|
||||
import { SpeckleType, type SpeckleObject } from '../../../index.js'
|
||||
import type ObjectLoader from '@speckle/objectloader'
|
||||
import Logger from '../../utils/Logger.js'
|
||||
import { ObjectLoader2 } from '@speckle/objectloader2'
|
||||
|
||||
export type ConverterResultDelegate = () => Promise<void>
|
||||
export type SpeckleConverterNodeDelegate =
|
||||
@@ -16,7 +16,7 @@ export type SpeckleConverterNodeDelegate =
|
||||
* Warning: HIC SVNT DRACONES.
|
||||
*/
|
||||
export default class SpeckleConverter {
|
||||
protected objectLoader: ObjectLoader
|
||||
protected objectLoader: ObjectLoader2
|
||||
protected activePromises: number
|
||||
protected maxChildrenPromises: number
|
||||
protected spoofIDs = false
|
||||
@@ -63,7 +63,7 @@ export default class SpeckleConverter {
|
||||
|
||||
protected readonly IgnoreNodes = ['Parameter']
|
||||
|
||||
constructor(objectLoader: ObjectLoader, tree: WorldTree) {
|
||||
constructor(objectLoader: ObjectLoader2, tree: WorldTree) {
|
||||
if (!objectLoader) {
|
||||
Logger.warn(
|
||||
'Converter initialized without a corresponding object loader. Any objects that include references will throw errors.'
|
||||
@@ -276,9 +276,9 @@ export default class SpeckleConverter {
|
||||
|
||||
const chunked: unknown[] = []
|
||||
for (const ref of arr) {
|
||||
const real: Record<string, unknown> = await this.objectLoader.getObject(
|
||||
ref.referencedId
|
||||
)
|
||||
const real: Record<string, unknown> = (await this.objectLoader.getObject({
|
||||
id: ref.referencedId
|
||||
})) as unknown as Record<string, number>
|
||||
chunked.push(real.data)
|
||||
// await this.asyncPause()
|
||||
}
|
||||
@@ -296,9 +296,9 @@ export default class SpeckleConverter {
|
||||
*/
|
||||
private async resolveReference(obj: SpeckleObject): Promise<SpeckleObject> {
|
||||
if (obj.referencedId) {
|
||||
const resolvedObj = (await this.objectLoader.getObject(
|
||||
obj.referencedId
|
||||
)) as SpeckleObject
|
||||
const resolvedObj = (await this.objectLoader.getObject({
|
||||
id: obj.referencedId
|
||||
})) as SpeckleObject
|
||||
// this.asyncPause()
|
||||
return resolvedObj
|
||||
} else return obj
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
import SpeckleConverter from './SpeckleConverter.js'
|
||||
import { Loader, LoaderEvent } from '../Loader.js'
|
||||
import ObjectLoader from '@speckle/objectloader'
|
||||
import { SpeckleGeometryConverter } from './SpeckleGeometryConverter.js'
|
||||
import { WorldTree, type SpeckleObject } from '../../../index.js'
|
||||
import { AsyncPause } from '../../World.js'
|
||||
import Logger from '../../utils/Logger.js'
|
||||
import { ObjectLoader2, ObjectLoader2Factory } from '@speckle/objectloader2'
|
||||
import { TIME_MS } from '@speckle/shared'
|
||||
|
||||
export class SpeckleLoader extends Loader {
|
||||
protected loader: ObjectLoader
|
||||
protected loader: ObjectLoader2
|
||||
protected converter: SpeckleConverter
|
||||
protected tree: WorldTree
|
||||
protected isCancelled = false
|
||||
@@ -49,9 +48,9 @@ export class SpeckleLoader extends Loader {
|
||||
protected initObjectLoader(
|
||||
resource: string,
|
||||
authToken?: string,
|
||||
enableCaching?: boolean,
|
||||
_enableCaching?: boolean,
|
||||
resourceData?: unknown
|
||||
): ObjectLoader {
|
||||
): ObjectLoader2 {
|
||||
resourceData
|
||||
let token = undefined
|
||||
try {
|
||||
@@ -81,14 +80,7 @@ export class SpeckleLoader extends Loader {
|
||||
const streamId = segments[2]
|
||||
const objectId = segments[4]
|
||||
|
||||
return new ObjectLoader({
|
||||
serverUrl,
|
||||
token,
|
||||
streamId,
|
||||
objectId,
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
options: { enableCaching, customLogger: (Logger as any).log }
|
||||
})
|
||||
return ObjectLoader2Factory.createFromUrl({ serverUrl, streamId, objectId, token })
|
||||
}
|
||||
|
||||
public async load(): Promise<boolean> {
|
||||
@@ -101,8 +93,6 @@ export class SpeckleLoader extends Loader {
|
||||
|
||||
Logger.warn('Downloading object ', this.resource)
|
||||
|
||||
const pause = new AsyncPause()
|
||||
|
||||
for await (const obj of this.loader.getObjectIterator()) {
|
||||
if (this.isCancelled) {
|
||||
this.emit(LoaderEvent.LoadCancelled, this.resource)
|
||||
@@ -114,10 +104,6 @@ export class SpeckleLoader extends Loader {
|
||||
obj as SpeckleObject,
|
||||
async () => {
|
||||
viewerLoads++
|
||||
pause.tick(100)
|
||||
if (pause.needsWait) {
|
||||
await pause.wait(16)
|
||||
}
|
||||
}
|
||||
)
|
||||
first = false
|
||||
@@ -138,6 +124,7 @@ export class SpeckleLoader extends Loader {
|
||||
(performance.now() - start) / TIME_MS.second
|
||||
} seconds. Node count: ${this.tree.nodeCount}`
|
||||
)
|
||||
await this.loader.disposeAsync()
|
||||
|
||||
if (viewerLoads === 0) {
|
||||
Logger.warn(`Viewer: no 3d objects found in object ${this.resource}`)
|
||||
@@ -185,6 +172,6 @@ export class SpeckleLoader extends Loader {
|
||||
|
||||
dispose() {
|
||||
super.dispose()
|
||||
this.loader.dispose()
|
||||
void this.loader.disposeAsync()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import ObjectLoader from '@speckle/objectloader'
|
||||
import { SpeckleLoader } from './SpeckleLoader.js'
|
||||
import { WorldTree } from '../../tree/WorldTree.js'
|
||||
import Logger from '../../utils/Logger.js'
|
||||
import { ObjectLoader2, ObjectLoader2Factory } from '@speckle/objectloader2'
|
||||
|
||||
export class SpeckleOfflineLoader extends SpeckleLoader {
|
||||
constructor(targetTree: WorldTree, resourceData: unknown, resourceId?: string) {
|
||||
@@ -13,8 +13,13 @@ export class SpeckleOfflineLoader extends SpeckleLoader {
|
||||
_authToken?: string,
|
||||
_enableCaching?: boolean,
|
||||
resourceData?: unknown
|
||||
): ObjectLoader {
|
||||
return ObjectLoader.createFromJSON(resourceData as string)
|
||||
): ObjectLoader2 {
|
||||
_resource
|
||||
_authToken
|
||||
_enableCaching
|
||||
resourceData
|
||||
/** TO DO: Implement either as part of ObjectLoader2 either separate */
|
||||
return ObjectLoader2Factory.createFromObjects([])
|
||||
}
|
||||
|
||||
public async load(): Promise<boolean> {
|
||||
@@ -24,7 +29,8 @@ export class SpeckleOfflineLoader extends SpeckleLoader {
|
||||
return false
|
||||
}
|
||||
/** If not id is provided, we make one up based on the root object id */
|
||||
this._resource = this._resource || `/json/${rootObject.id as string}`
|
||||
this._resource =
|
||||
this._resource || `/json/${(rootObject?.baseId as string) ?? 'unnamed'}`
|
||||
return super.load()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16359,7 +16359,7 @@ __metadata:
|
||||
"@rollup/plugin-babel": "npm:^5.3.1"
|
||||
"@rollup/plugin-image": "npm:^3.0.2"
|
||||
"@rollup/plugin-typescript": "npm:^11.1.6"
|
||||
"@speckle/objectloader": "workspace:^"
|
||||
"@speckle/objectloader2": "workspace:^"
|
||||
"@speckle/shared": "workspace:^"
|
||||
"@types/babel__core": "npm:^7.20.1"
|
||||
"@types/flat": "npm:^5.0.2"
|
||||
|
||||
Reference in New Issue
Block a user