From 8b325ca8f68b1fc8fc8bfb6b47fe4ec6e11cf58e Mon Sep 17 00:00:00 2001 From: Gergo Jedlicska Date: Wed, 10 Sep 2025 16:47:15 +0200 Subject: [PATCH] feat(objects): working object streaming with filtering --- .../modules/core/graph/generated/graphql.ts | 4 + .../server/modules/core/rest/diffDownload.ts | 74 +++++++++++-------- 2 files changed, 46 insertions(+), 32 deletions(-) diff --git a/packages/server/modules/core/graph/generated/graphql.ts b/packages/server/modules/core/graph/generated/graphql.ts index 222a3fb13..a2e13510c 100644 --- a/packages/server/modules/core/graph/generated/graphql.ts +++ b/packages/server/modules/core/graph/generated/graphql.ts @@ -1417,6 +1417,10 @@ export type FileUploadMutationsStartFileImportArgs = { }; export type FinishFileImportInput = { + /** + * This is the blob Id of the uploaded file. For legacy reasons it is named jobId. + * Note: This is the not the background job Id. + */ jobId: Scalars['String']['input']; projectId: Scalars['String']['input']; reason?: InputMaybe; diff --git a/packages/server/modules/core/rest/diffDownload.ts b/packages/server/modules/core/rest/diffDownload.ts index 0d833dc6b..c4009f95f 100644 --- a/packages/server/modules/core/rest/diffDownload.ts +++ b/packages/server/modules/core/rest/diffDownload.ts @@ -5,7 +5,7 @@ import { objectDataTransformFactory, SpeckleObjectsStream } from '@/modules/core/rest/speckleObjectsStream' -import { pipeline, PassThrough, Readable } from 'stream' +import { pipeline, PassThrough } from 'stream' import { getObjectsStreamFactory, getProjectObjectStreamFactory @@ -16,18 +16,13 @@ import { getStreamFactory } from '@/modules/core/repositories/streams' import { authorizeResolver, validateScopes } from '@/modules/shared' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { UserInputError } from '@/modules/core/errors/userinput' -import { ensureError, Roles, Scopes } from '@speckle/shared' +import { ensureError } from '@speckle/shared' import { DatabaseError } from '@/modules/shared/errors' import { validateRequest } from 'zod-express' import { z } from 'zod' import { authMiddlewareCreator } from '@/modules/shared/middleware' -import { - streamReadPermissionsPipelineFactory, - validateScope, - validateServerRoleBuilderFactory -} from '@/modules/shared/authz' -import { getRolesFactory } from '@/modules/shared/repositories/roles' -import { chunk, constant, flatten, times } from 'lodash-es' +import { streamReadPermissionsPipelineFactory } from '@/modules/shared/authz' +import { chunk } from 'lodash-es' export default (app: Application) => { const validatePermissionsReadStream = validatePermissionsReadStreamFactory({ @@ -137,9 +132,22 @@ export default (app: Application) => { } }) - app.options('/api/v2/project/:streamId/object-stream', corsMiddlewareFactory()) + const reqBody = z + .object({ + objectIds: z.string().array().min(1), + attributeMask: z + .union([ + // using strict objects here, to make the two types exclusive + z.object({ include: z.string().array().min(1) }).strict(), + z.object({ exclude: z.string().array().min(1) }).strict() + ]) + .optional() + }) + .strict() + + app.options('/api/v2/projects/:streamId/object-stream', corsMiddlewareFactory()) app.post( - '/api/v2/project/:streamId/createObjectStream', + '/api/v2/projects/:streamId/object-stream', corsMiddlewareFactory(), authMiddlewareCreator([ ...streamReadPermissionsPipelineFactory({ @@ -147,15 +155,7 @@ export default (app: Application) => { }) ]), validateRequest({ - body: z.object({ - objectIds: z.string().array().min(1), - attributeMask: z - .union([ - z.object({ include: z.string().array().min(1) }), - z.object({ exclude: z.string().array().min(1) }) - ]) - .optional() - }) + body: reqBody }), async ({ body: { objectIds, attributeMask }, params: { streamId }, log }, res) => { const projectId = streamId @@ -170,6 +170,7 @@ export default (app: Application) => { const objectDataTransform = objectDataTransformFactory({ attributeMask }) const gzipStream = zlib.createGzip() + //create the response pipeline here, but we're not sending chunks just yet pipeline( objectDataTransform, gzipStream, @@ -196,23 +197,32 @@ export default (app: Application) => { ) } ) + + // we start chunking objectId-s here and pipe data to the firts write stream in the pipeline const maxBatchSize = 1000 // TODO, this could potentially be sped up a bit, if we concurrently // pipe multiple db streams into the transform - for (const objectIdChunk of chunk(objectIds, maxBatchSize)) { - const objectStream = streamObjectsFromDb({ - projectId, - objectIds: objectIdChunk - }) - await new Promise((resolve, reject) => { - objectStream.once('end', resolve) - objectStream.once('error', reject) - // this is here, to make sure event handlers are registerd before piping the stream - objectStream.pipe(objectDataTransform, { end: false }) - }) + try { + for (const objectIdChunk of chunk(objectIds, maxBatchSize)) { + const objectStream = streamObjectsFromDb({ + projectId, + objectIds: objectIdChunk + }) + await new Promise((resolve, reject) => { + objectStream.once('end', resolve) + objectStream.once('error', reject) + // this is here, to make sure event handlers are registerd before piping the stream + objectStream.pipe(objectDataTransform, { end: false }) + }) + } + } catch (err) { + log.error(err, `DB Error streaming objects`) + objectDataTransform.emit('error', new DatabaseError('Database streaming error')) + } finally { + // once we're done with streaming data from each chunk, we end the transform stream + objectDataTransform.end() } - objectDataTransform.end() } ) }