Merge remote-tracking branch 'origin/gergo/cxpla-278-v2-server-api-endpoint-with-masking-support' into adam/add-ol2-options

# Conflicts:
#	packages/objectloader2/src/core/objectLoader2Factory.ts
This commit is contained in:
Adam Hathcock
2025-09-18 14:41:53 +01:00
7 changed files with 174 additions and 8 deletions
+1
View File
@@ -1,5 +1,6 @@
[tools]
node = "22.19.0"
python = "3.12.11"
[env]
SHARP_IGNORE_GLOBAL_LIBVIPS = 1
@@ -4,7 +4,7 @@ import {
getFeatureFlag,
ObjectLoader2Flags
} from '../types/functions.js'
import { Base } from '../types/types.js'
import { Base, ObjectAttributeMask } from '../types/types.js'
import { ObjectLoader2 } from './objectLoader2.js'
import { IndexedDatabase } from './stages/indexedDatabase.js'
import { MemoryDatabase } from './stages/memory/memoryDatabase.js'
@@ -48,6 +48,7 @@ export class ObjectLoader2Factory {
token?: string
headers?: Headers
options?: ObjectLoader2FactoryOptions
attributeMask?: ObjectAttributeMask
}): ObjectLoader2 {
const log = ObjectLoader2Factory.getLogger(params.options?.logger)
let database
@@ -2,7 +2,7 @@ import BatchingQueue from '../../queues/batchingQueue.js'
import Queue from '../../queues/queue.js'
import { ObjectLoaderRuntimeError } from '../../types/errors.js'
import { CustomLogger, Fetcher, indexOf, isBase, take } from '../../types/functions.js'
import { Item } from '../../types/types.js'
import { Item, ObjectAttributeMask } from '../../types/types.js'
import { Downloader } from '../interfaces.js'
export interface ServerDownloaderOptions {
@@ -13,6 +13,7 @@ export interface ServerDownloaderOptions {
headers?: Headers
logger: CustomLogger
fetch?: Fetcher
attributeMask?: ObjectAttributeMask
}
const MAX_SAFARI_DECODE_BYTES = 2 * 1024 * 1024 * 1024 - 1024 * 1024 // 2GB minus a margin
@@ -51,9 +52,10 @@ export default class ServerDownloader implements Downloader {
if (this.#options.token) {
this.#headers['Authorization'] = `Bearer ${this.#options.token}`
}
this.#requestUrlChildren = `${this.#options.serverUrl}/api/getobjects/${
this.#requestUrlChildren = `${this.#options.serverUrl}/api/v2/projects/${
this.#options.streamId
}`
}/object-stream/`
this.#requestUrlRootObj = `${this.#options.serverUrl}/objects/${
this.#options.streamId
}/${this.#options.objectId}/single`
@@ -117,11 +119,12 @@ Chrome's behavior: Chrome generally handles larger data sizes without this speci
const start = performance.now()
this.#logger(`Downloading batch of ${batch.length} items...`)
const attributeMask = this.#options.attributeMask
const keys = new Set<string>(batch)
const response = await this.#fetch(url, {
method: 'POST',
headers: { ...headers, 'Content-Type': 'application/json' },
body: JSON.stringify({ objects: JSON.stringify(batch) })
body: JSON.stringify({ objectIds: batch, attributeMask })
})
this.#validateResponse(response)
@@ -19,3 +19,8 @@ export interface Reference {
export interface DataChunk extends Base {
data?: Base[]
}
export type ObjectAttributeMask =
| { include: string[] }
| { exclude: string[] }
| undefined
@@ -176,6 +176,18 @@ export const getObjectsStreamFactory =
return res.stream({ highWaterMark: 500 })
}
export const getProjectObjectStreamFactory =
(deps: { db: Knex }) =>
({ projectId, objectIds }: { projectId: string; objectIds: string[] }) => {
const res = tables
.objects(deps.db)
.whereIn('id', objectIds)
.andWhere({ streamId: projectId })
.orderBy('id')
.select(knex.raw('"id", data::text as "dataText"'))
return res.stream({})
}
export const hasObjectsFactory =
(deps: { db: Knex }): HasObjects =>
async ({ streamId, objectIds }) => {
@@ -1,9 +1,15 @@
import zlib from 'zlib'
import { corsMiddlewareFactory } from '@/modules/core/configs/cors'
import type { Application } from 'express'
import { SpeckleObjectsStream } from '@/modules/core/rest/speckleObjectsStream'
import {
objectDataTransformFactory,
SpeckleObjectsStream
} from '@/modules/core/rest/speckleObjectsStream'
import { pipeline, PassThrough } from 'stream'
import { getObjectsStreamFactory } from '@/modules/core/repositories/objects'
import {
getObjectsStreamFactory,
getProjectObjectStreamFactory
} from '@/modules/core/repositories/objects'
import { db } from '@/db/knex'
import { validatePermissionsReadStreamFactory } from '@/modules/core/services/streams/auth'
import { getStreamFactory } from '@/modules/core/repositories/streams'
@@ -12,6 +18,15 @@ import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import { UserInputError } from '@/modules/core/errors/userinput'
import { ensureError } from '@speckle/shared'
import { DatabaseError } from '@/modules/shared/errors'
import { validateRequest } from 'zod-express'
import { z } from 'zod'
import { authMiddlewareCreator } from '@/modules/shared/middleware'
import {
allowAnonymousUsersOnPublicStreams,
allowForRegisteredUsersOnPublicStreamsEvenWithoutRole,
streamReadPermissionsPipelineFactory
} from '@/modules/shared/authz'
import { chunk } from 'lodash-es'
export default (app: Application) => {
const validatePermissionsReadStream = validatePermissionsReadStreamFactory({
@@ -120,4 +135,100 @@ export default (app: Application) => {
speckleObjStream.end()
}
})
const reqBody = z
.object({
objectIds: z.string().array().min(1),
attributeMask: z
.union([
// using strict objects here, to make the two types exclusive
z.object({ include: z.string().array().min(1) }).strict(),
z.object({ exclude: z.string().array().min(1) }).strict()
])
.optional()
})
.strict()
app.options('/api/v2/projects/:streamId/object-stream', corsMiddlewareFactory())
app.post(
'/api/v2/projects/:streamId/object-stream',
corsMiddlewareFactory(),
authMiddlewareCreator([
...streamReadPermissionsPipelineFactory({
getStream: getStreamFactory({ db })
}),
allowForRegisteredUsersOnPublicStreamsEvenWithoutRole,
allowAnonymousUsersOnPublicStreams
]),
validateRequest({
body: reqBody
}),
async ({ body: { objectIds, attributeMask }, params: { streamId }, log }, res) => {
const projectId = streamId
const projectDb = await getProjectDbClient({ projectId })
const streamObjectsFromDb = getProjectObjectStreamFactory({ db: projectDb })
res.writeHead(200, {
'Content-Encoding': 'gzip',
'Content-Type': 'text/plain; charset=UTF-8'
})
const objectDataTransform = objectDataTransformFactory({ attributeMask })
const gzipStream = zlib.createGzip()
//create the response pipeline here, but we're not sending chunks just yet
pipeline(
objectDataTransform,
gzipStream,
new PassThrough({ highWaterMark: 16384 * 31 }),
res,
(err) => {
if (err) {
switch (err.code) {
case 'ERR_STREAM_PREMATURE_CLOSE':
log.debug({ err }, 'Stream to client has prematurely closed')
break
default:
log.error(err, 'App error streaming objects')
break
}
return
}
log.info(
{
childCount: objectIds.length,
mbWritten: gzipStream.bytesWritten / 1000000
},
'Streamed {childCount} objects (size: {mbWritten} MB)'
)
}
)
// we start chunking objectId-s here and pipe data to the firts write stream in the pipeline
const maxBatchSize = 1000
// TODO, this could potentially be sped up a bit, if we concurrently
// pipe multiple db streams into the transform
try {
for (const objectIdChunk of chunk(objectIds, maxBatchSize)) {
const objectStream = streamObjectsFromDb({
projectId,
objectIds: objectIdChunk
})
await new Promise((resolve, reject) => {
objectStream.once('end', resolve)
objectStream.once('error', reject)
// this is here, to make sure event handlers are registerd before piping the stream
objectStream.pipe(objectDataTransform, { end: false })
})
}
} catch (err) {
log.error(err, `DB Error streaming objects`)
objectDataTransform.emit('error', new DatabaseError('Database streaming error'))
} finally {
// once we're done with streaming data from each chunk, we end the transform stream
objectDataTransform.end()
}
}
)
}
@@ -1,4 +1,5 @@
import { ensureError } from '@speckle/shared'
import { omit, pick } from 'lodash-es'
import type { TransformCallback } from 'stream'
import { Transform } from 'stream'
@@ -47,5 +48,37 @@ class SpeckleObjectsStream extends Transform {
callback()
}
}
export { SpeckleObjectsStream }
export const objectDataTransformFactory = ({
attributeMask
}: {
attributeMask?: { include: string[] } | { exclude: string[] }
}) => {
let objectTransform: ((dataText: string) => string) | null
if (attributeMask) {
let objectFilter: (obj: unknown, props: string[]) => unknown
let filteredAttributes: string[]
if ('include' in attributeMask) {
objectFilter = pick
filteredAttributes = attributeMask.include
}
if ('exclude' in attributeMask) {
objectFilter = omit
filteredAttributes = attributeMask.exclude
}
objectTransform = (dataText: string) =>
JSON.stringify(objectFilter(JSON.parse(dataText), filteredAttributes))
}
return new Transform({
writableObjectMode: true,
transform({ dataText, id }: { dataText: string; id: string }, _, callback) {
try {
const objectDataString = objectTransform ? objectTransform(dataText) : dataText
callback(null, `${id}\t${objectDataString}\n`)
} catch (err) {
callback(ensureError(err))
}
}
})
}