feat(objects): working object streaming with filtering

This commit is contained in:
Gergo Jedlicska
2025-09-10 16:47:15 +02:00
parent 6695093eff
commit 8b325ca8f6
2 changed files with 46 additions and 32 deletions
@@ -1417,6 +1417,10 @@ export type FileUploadMutationsStartFileImportArgs = {
};
export type FinishFileImportInput = {
/**
* This is the blob Id of the uploaded file. For legacy reasons it is named jobId.
* Note: This is the not the background job Id.
*/
jobId: Scalars['String']['input'];
projectId: Scalars['String']['input'];
reason?: InputMaybe<Scalars['String']['input']>;
@@ -5,7 +5,7 @@ import {
objectDataTransformFactory,
SpeckleObjectsStream
} from '@/modules/core/rest/speckleObjectsStream'
import { pipeline, PassThrough, Readable } from 'stream'
import { pipeline, PassThrough } from 'stream'
import {
getObjectsStreamFactory,
getProjectObjectStreamFactory
@@ -16,18 +16,13 @@ import { getStreamFactory } from '@/modules/core/repositories/streams'
import { authorizeResolver, validateScopes } from '@/modules/shared'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import { UserInputError } from '@/modules/core/errors/userinput'
import { ensureError, Roles, Scopes } from '@speckle/shared'
import { ensureError } from '@speckle/shared'
import { DatabaseError } from '@/modules/shared/errors'
import { validateRequest } from 'zod-express'
import { z } from 'zod'
import { authMiddlewareCreator } from '@/modules/shared/middleware'
import {
streamReadPermissionsPipelineFactory,
validateScope,
validateServerRoleBuilderFactory
} from '@/modules/shared/authz'
import { getRolesFactory } from '@/modules/shared/repositories/roles'
import { chunk, constant, flatten, times } from 'lodash-es'
import { streamReadPermissionsPipelineFactory } from '@/modules/shared/authz'
import { chunk } from 'lodash-es'
export default (app: Application) => {
const validatePermissionsReadStream = validatePermissionsReadStreamFactory({
@@ -137,9 +132,22 @@ export default (app: Application) => {
}
})
app.options('/api/v2/project/:streamId/object-stream', corsMiddlewareFactory())
const reqBody = z
.object({
objectIds: z.string().array().min(1),
attributeMask: z
.union([
// using strict objects here, to make the two types exclusive
z.object({ include: z.string().array().min(1) }).strict(),
z.object({ exclude: z.string().array().min(1) }).strict()
])
.optional()
})
.strict()
app.options('/api/v2/projects/:streamId/object-stream', corsMiddlewareFactory())
app.post(
'/api/v2/project/:streamId/createObjectStream',
'/api/v2/projects/:streamId/object-stream',
corsMiddlewareFactory(),
authMiddlewareCreator([
...streamReadPermissionsPipelineFactory({
@@ -147,15 +155,7 @@ export default (app: Application) => {
})
]),
validateRequest({
body: z.object({
objectIds: z.string().array().min(1),
attributeMask: z
.union([
z.object({ include: z.string().array().min(1) }),
z.object({ exclude: z.string().array().min(1) })
])
.optional()
})
body: reqBody
}),
async ({ body: { objectIds, attributeMask }, params: { streamId }, log }, res) => {
const projectId = streamId
@@ -170,6 +170,7 @@ export default (app: Application) => {
const objectDataTransform = objectDataTransformFactory({ attributeMask })
const gzipStream = zlib.createGzip()
//create the response pipeline here, but we're not sending chunks just yet
pipeline(
objectDataTransform,
gzipStream,
@@ -196,23 +197,32 @@ export default (app: Application) => {
)
}
)
// we start chunking objectId-s here and pipe data to the firts write stream in the pipeline
const maxBatchSize = 1000
// TODO, this could potentially be sped up a bit, if we concurrently
// pipe multiple db streams into the transform
for (const objectIdChunk of chunk(objectIds, maxBatchSize)) {
const objectStream = streamObjectsFromDb({
projectId,
objectIds: objectIdChunk
})
await new Promise((resolve, reject) => {
objectStream.once('end', resolve)
objectStream.once('error', reject)
// this is here, to make sure event handlers are registerd before piping the stream
objectStream.pipe(objectDataTransform, { end: false })
})
try {
for (const objectIdChunk of chunk(objectIds, maxBatchSize)) {
const objectStream = streamObjectsFromDb({
projectId,
objectIds: objectIdChunk
})
await new Promise((resolve, reject) => {
objectStream.once('end', resolve)
objectStream.once('error', reject)
// this is here, to make sure event handlers are registerd before piping the stream
objectStream.pipe(objectDataTransform, { end: false })
})
}
} catch (err) {
log.error(err, `DB Error streaming objects`)
objectDataTransform.emit('error', new DatabaseError('Database streaming error'))
} finally {
// once we're done with streaming data from each chunk, we end the transform stream
objectDataTransform.end()
}
objectDataTransform.end()
}
)
}