From a4ebb9fe124c466447e056b251afbdb2b486215f Mon Sep 17 00:00:00 2001 From: Gergo Jedlicska Date: Tue, 9 Sep 2025 16:50:38 +0200 Subject: [PATCH 1/7] WIP: streaming masking endpoint --- mise.toml | 1 + .../modules/core/repositories/objects.ts | 12 ++ .../server/modules/core/rest/diffDownload.ts | 103 +++++++++++++++++- .../modules/core/rest/speckleObjectsStream.ts | 37 ++++++- 4 files changed, 148 insertions(+), 5 deletions(-) diff --git a/mise.toml b/mise.toml index d4d178d5e..b7c4030dd 100644 --- a/mise.toml +++ b/mise.toml @@ -1,5 +1,6 @@ [tools] node = "22.19.0" +python = "3.12.11" [env] SHARP_IGNORE_GLOBAL_LIBVIPS = 1 diff --git a/packages/server/modules/core/repositories/objects.ts b/packages/server/modules/core/repositories/objects.ts index de2068c23..e40bfd584 100644 --- a/packages/server/modules/core/repositories/objects.ts +++ b/packages/server/modules/core/repositories/objects.ts @@ -176,6 +176,18 @@ export const getObjectsStreamFactory = return res.stream({ highWaterMark: 500 }) } +export const getProjectObjectStreamFactory = + (deps: { db: Knex }) => + ({ projectId, objectIds }: { projectId: string; objectIds: string[] }) => { + const res = tables + .objects(deps.db) + .whereIn('id', objectIds) + .andWhere({ streamId: projectId }) + .orderBy('id') + .select(knex.raw('"id", data::text as "dataText"')) + return res.stream({}) + } + export const hasObjectsFactory = (deps: { db: Knex }): HasObjects => async ({ streamId, objectIds }) => { diff --git a/packages/server/modules/core/rest/diffDownload.ts b/packages/server/modules/core/rest/diffDownload.ts index ecb3ac78d..0d833dc6b 100644 --- a/packages/server/modules/core/rest/diffDownload.ts +++ b/packages/server/modules/core/rest/diffDownload.ts @@ -1,17 +1,33 @@ import zlib from 'zlib' import { corsMiddlewareFactory } from '@/modules/core/configs/cors' import type { Application } from 'express' -import { SpeckleObjectsStream } from '@/modules/core/rest/speckleObjectsStream' -import { pipeline, PassThrough } from 'stream' -import { getObjectsStreamFactory } from '@/modules/core/repositories/objects' +import { + objectDataTransformFactory, + SpeckleObjectsStream +} from '@/modules/core/rest/speckleObjectsStream' +import { pipeline, PassThrough, Readable } from 'stream' +import { + getObjectsStreamFactory, + getProjectObjectStreamFactory +} from '@/modules/core/repositories/objects' import { db } from '@/db/knex' import { validatePermissionsReadStreamFactory } from '@/modules/core/services/streams/auth' import { getStreamFactory } from '@/modules/core/repositories/streams' import { authorizeResolver, validateScopes } from '@/modules/shared' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { UserInputError } from '@/modules/core/errors/userinput' -import { ensureError } from '@speckle/shared' +import { ensureError, Roles, Scopes } from '@speckle/shared' import { DatabaseError } from '@/modules/shared/errors' +import { validateRequest } from 'zod-express' +import { z } from 'zod' +import { authMiddlewareCreator } from '@/modules/shared/middleware' +import { + streamReadPermissionsPipelineFactory, + validateScope, + validateServerRoleBuilderFactory +} from '@/modules/shared/authz' +import { getRolesFactory } from '@/modules/shared/repositories/roles' +import { chunk, constant, flatten, times } from 'lodash-es' export default (app: Application) => { const validatePermissionsReadStream = validatePermissionsReadStreamFactory({ @@ -120,4 +136,83 @@ export default (app: Application) => { speckleObjStream.end() } }) + + app.options('/api/v2/project/:streamId/object-stream', corsMiddlewareFactory()) + app.post( + '/api/v2/project/:streamId/createObjectStream', + corsMiddlewareFactory(), + authMiddlewareCreator([ + ...streamReadPermissionsPipelineFactory({ + getStream: getStreamFactory({ db }) + }) + ]), + validateRequest({ + body: z.object({ + objectIds: z.string().array().min(1), + attributeMask: z + .union([ + z.object({ include: z.string().array().min(1) }), + z.object({ exclude: z.string().array().min(1) }) + ]) + .optional() + }) + }), + async ({ body: { objectIds, attributeMask }, params: { streamId }, log }, res) => { + const projectId = streamId + const projectDb = await getProjectDbClient({ projectId }) + + const streamObjectsFromDb = getProjectObjectStreamFactory({ db: projectDb }) + + res.writeHead(200, { + 'Content-Encoding': 'gzip', + 'Content-Type': 'text/plain; charset=UTF-8' + }) + const objectDataTransform = objectDataTransformFactory({ attributeMask }) + + const gzipStream = zlib.createGzip() + pipeline( + objectDataTransform, + gzipStream, + new PassThrough({ highWaterMark: 16384 * 31 }), + res, + (err) => { + if (err) { + switch (err.code) { + case 'ERR_STREAM_PREMATURE_CLOSE': + log.debug({ err }, 'Stream to client has prematurely closed') + break + default: + log.error(err, 'App error streaming objects') + break + } + return + } + log.info( + { + childCount: objectIds.length, + mbWritten: gzipStream.bytesWritten / 1000000 + }, + 'Streamed {childCount} objects (size: {mbWritten} MB)' + ) + } + ) + const maxBatchSize = 1000 + + // TODO, this could potentially be sped up a bit, if we concurrently + // pipe multiple db streams into the transform + for (const objectIdChunk of chunk(objectIds, maxBatchSize)) { + const objectStream = streamObjectsFromDb({ + projectId, + objectIds: objectIdChunk + }) + await new Promise((resolve, reject) => { + objectStream.once('end', resolve) + objectStream.once('error', reject) + // this is here, to make sure event handlers are registerd before piping the stream + objectStream.pipe(objectDataTransform, { end: false }) + }) + } + objectDataTransform.end() + } + ) } diff --git a/packages/server/modules/core/rest/speckleObjectsStream.ts b/packages/server/modules/core/rest/speckleObjectsStream.ts index 2b17bccec..b2b2d9f53 100644 --- a/packages/server/modules/core/rest/speckleObjectsStream.ts +++ b/packages/server/modules/core/rest/speckleObjectsStream.ts @@ -1,6 +1,9 @@ import { ensureError } from '@speckle/shared' +import { filter } from 'compression' +import { omit, pick } from 'lodash-es' import type { TransformCallback } from 'stream' import { Transform } from 'stream' +import { object } from 'zod' /** * A stream that converts database objects stream to "{id}\t{data_json}\n" stream or a json stream of obj.data fields @@ -47,5 +50,37 @@ class SpeckleObjectsStream extends Transform { callback() } } - export { SpeckleObjectsStream } + +export const objectDataTransformFactory = ({ + attributeMask +}: { + attributeMask?: { include: string[] } | { exclude: string[] } +}) => { + let objectTransform: ((dataText: string) => string) | null + if (attributeMask) { + let objectFilter: (obj: unknown, props: string[]) => unknown + let filteredAttributes: string[] + if ('include' in attributeMask) { + objectFilter = pick + filteredAttributes = attributeMask.include + } + if ('exclude' in attributeMask) { + objectFilter = omit + filteredAttributes = attributeMask.exclude + } + objectTransform = (dataText: string) => + JSON.stringify(objectFilter(JSON.parse(dataText), filteredAttributes)) + } + return new Transform({ + writableObjectMode: true, + transform({ dataText, id }: { dataText: string; id: string }, _, callback) { + try { + const objectDataString = objectTransform ? objectTransform(dataText) : dataText + callback(null, `${id}\t${objectDataString}\n`) + } catch (err) { + callback(ensureError(err)) + } + } + }) +} From 8b325ca8f68b1fc8fc8bfb6b47fe4ec6e11cf58e Mon Sep 17 00:00:00 2001 From: Gergo Jedlicska Date: Wed, 10 Sep 2025 16:47:15 +0200 Subject: [PATCH 2/7] feat(objects): working object streaming with filtering --- .../modules/core/graph/generated/graphql.ts | 4 + .../server/modules/core/rest/diffDownload.ts | 74 +++++++++++-------- 2 files changed, 46 insertions(+), 32 deletions(-) diff --git a/packages/server/modules/core/graph/generated/graphql.ts b/packages/server/modules/core/graph/generated/graphql.ts index 222a3fb13..a2e13510c 100644 --- a/packages/server/modules/core/graph/generated/graphql.ts +++ b/packages/server/modules/core/graph/generated/graphql.ts @@ -1417,6 +1417,10 @@ export type FileUploadMutationsStartFileImportArgs = { }; export type FinishFileImportInput = { + /** + * This is the blob Id of the uploaded file. For legacy reasons it is named jobId. + * Note: This is the not the background job Id. + */ jobId: Scalars['String']['input']; projectId: Scalars['String']['input']; reason?: InputMaybe; diff --git a/packages/server/modules/core/rest/diffDownload.ts b/packages/server/modules/core/rest/diffDownload.ts index 0d833dc6b..c4009f95f 100644 --- a/packages/server/modules/core/rest/diffDownload.ts +++ b/packages/server/modules/core/rest/diffDownload.ts @@ -5,7 +5,7 @@ import { objectDataTransformFactory, SpeckleObjectsStream } from '@/modules/core/rest/speckleObjectsStream' -import { pipeline, PassThrough, Readable } from 'stream' +import { pipeline, PassThrough } from 'stream' import { getObjectsStreamFactory, getProjectObjectStreamFactory @@ -16,18 +16,13 @@ import { getStreamFactory } from '@/modules/core/repositories/streams' import { authorizeResolver, validateScopes } from '@/modules/shared' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { UserInputError } from '@/modules/core/errors/userinput' -import { ensureError, Roles, Scopes } from '@speckle/shared' +import { ensureError } from '@speckle/shared' import { DatabaseError } from '@/modules/shared/errors' import { validateRequest } from 'zod-express' import { z } from 'zod' import { authMiddlewareCreator } from '@/modules/shared/middleware' -import { - streamReadPermissionsPipelineFactory, - validateScope, - validateServerRoleBuilderFactory -} from '@/modules/shared/authz' -import { getRolesFactory } from '@/modules/shared/repositories/roles' -import { chunk, constant, flatten, times } from 'lodash-es' +import { streamReadPermissionsPipelineFactory } from '@/modules/shared/authz' +import { chunk } from 'lodash-es' export default (app: Application) => { const validatePermissionsReadStream = validatePermissionsReadStreamFactory({ @@ -137,9 +132,22 @@ export default (app: Application) => { } }) - app.options('/api/v2/project/:streamId/object-stream', corsMiddlewareFactory()) + const reqBody = z + .object({ + objectIds: z.string().array().min(1), + attributeMask: z + .union([ + // using strict objects here, to make the two types exclusive + z.object({ include: z.string().array().min(1) }).strict(), + z.object({ exclude: z.string().array().min(1) }).strict() + ]) + .optional() + }) + .strict() + + app.options('/api/v2/projects/:streamId/object-stream', corsMiddlewareFactory()) app.post( - '/api/v2/project/:streamId/createObjectStream', + '/api/v2/projects/:streamId/object-stream', corsMiddlewareFactory(), authMiddlewareCreator([ ...streamReadPermissionsPipelineFactory({ @@ -147,15 +155,7 @@ export default (app: Application) => { }) ]), validateRequest({ - body: z.object({ - objectIds: z.string().array().min(1), - attributeMask: z - .union([ - z.object({ include: z.string().array().min(1) }), - z.object({ exclude: z.string().array().min(1) }) - ]) - .optional() - }) + body: reqBody }), async ({ body: { objectIds, attributeMask }, params: { streamId }, log }, res) => { const projectId = streamId @@ -170,6 +170,7 @@ export default (app: Application) => { const objectDataTransform = objectDataTransformFactory({ attributeMask }) const gzipStream = zlib.createGzip() + //create the response pipeline here, but we're not sending chunks just yet pipeline( objectDataTransform, gzipStream, @@ -196,23 +197,32 @@ export default (app: Application) => { ) } ) + + // we start chunking objectId-s here and pipe data to the firts write stream in the pipeline const maxBatchSize = 1000 // TODO, this could potentially be sped up a bit, if we concurrently // pipe multiple db streams into the transform - for (const objectIdChunk of chunk(objectIds, maxBatchSize)) { - const objectStream = streamObjectsFromDb({ - projectId, - objectIds: objectIdChunk - }) - await new Promise((resolve, reject) => { - objectStream.once('end', resolve) - objectStream.once('error', reject) - // this is here, to make sure event handlers are registerd before piping the stream - objectStream.pipe(objectDataTransform, { end: false }) - }) + try { + for (const objectIdChunk of chunk(objectIds, maxBatchSize)) { + const objectStream = streamObjectsFromDb({ + projectId, + objectIds: objectIdChunk + }) + await new Promise((resolve, reject) => { + objectStream.once('end', resolve) + objectStream.once('error', reject) + // this is here, to make sure event handlers are registerd before piping the stream + objectStream.pipe(objectDataTransform, { end: false }) + }) + } + } catch (err) { + log.error(err, `DB Error streaming objects`) + objectDataTransform.emit('error', new DatabaseError('Database streaming error')) + } finally { + // once we're done with streaming data from each chunk, we end the transform stream + objectDataTransform.end() } - objectDataTransform.end() } ) } From 010f4abe37feebf8cdbfd90108398c91b78916d8 Mon Sep 17 00:00:00 2001 From: Gergo Jedlicska Date: Mon, 15 Sep 2025 12:50:47 +0200 Subject: [PATCH 3/7] feat(object-loader): use the new objects endpoint --- packages/objectloader2/src/core/stages/serverDownloader.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/objectloader2/src/core/stages/serverDownloader.ts b/packages/objectloader2/src/core/stages/serverDownloader.ts index 2873922d6..b7216f3cc 100644 --- a/packages/objectloader2/src/core/stages/serverDownloader.ts +++ b/packages/objectloader2/src/core/stages/serverDownloader.ts @@ -51,9 +51,10 @@ export default class ServerDownloader implements Downloader { if (this.#options.token) { this.#headers['Authorization'] = `Bearer ${this.#options.token}` } - this.#requestUrlChildren = `${this.#options.serverUrl}/api/getobjects/${ + this.#requestUrlChildren = `${this.#options.serverUrl}/api/v2/projects/${ this.#options.streamId - }` + }/object-stream/` + this.#requestUrlRootObj = `${this.#options.serverUrl}/objects/${ this.#options.streamId }/${this.#options.objectId}/single` @@ -121,7 +122,7 @@ Chrome's behavior: Chrome generally handles larger data sizes without this speci const response = await this.#fetch(url, { method: 'POST', headers: { ...headers, 'Content-Type': 'application/json' }, - body: JSON.stringify({ objects: JSON.stringify(batch) }) + body: JSON.stringify({ objectIds: batch }) }) this.#validateResponse(response) From 8d2f9708dd49ea9daf6fb392c9131c9be12df0c0 Mon Sep 17 00:00:00 2001 From: Gergo Jedlicska Date: Wed, 17 Sep 2025 17:55:16 +0200 Subject: [PATCH 4/7] fix(server): allow downloading of public project data --- packages/server/modules/core/rest/diffDownload.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/server/modules/core/rest/diffDownload.ts b/packages/server/modules/core/rest/diffDownload.ts index c4009f95f..8e1c836f0 100644 --- a/packages/server/modules/core/rest/diffDownload.ts +++ b/packages/server/modules/core/rest/diffDownload.ts @@ -21,7 +21,11 @@ import { DatabaseError } from '@/modules/shared/errors' import { validateRequest } from 'zod-express' import { z } from 'zod' import { authMiddlewareCreator } from '@/modules/shared/middleware' -import { streamReadPermissionsPipelineFactory } from '@/modules/shared/authz' +import { + allowAnonymousUsersOnPublicStreams, + allowForRegisteredUsersOnPublicStreamsEvenWithoutRole, + streamReadPermissionsPipelineFactory +} from '@/modules/shared/authz' import { chunk } from 'lodash-es' export default (app: Application) => { @@ -152,7 +156,9 @@ export default (app: Application) => { authMiddlewareCreator([ ...streamReadPermissionsPipelineFactory({ getStream: getStreamFactory({ db }) - }) + }), + allowForRegisteredUsersOnPublicStreamsEvenWithoutRole, + allowAnonymousUsersOnPublicStreams ]), validateRequest({ body: reqBody From 2a43ce23d42efe113e19c2628934c7ebd3de7152 Mon Sep 17 00:00:00 2001 From: Gergo Jedlicska Date: Wed, 17 Sep 2025 17:56:40 +0200 Subject: [PATCH 5/7] feat(objectloader-2): include object masking in objectloader --- packages/objectloader2/src/core/objectLoader2Factory.ts | 4 +++- packages/objectloader2/src/core/stages/serverDownloader.ts | 6 ++++-- packages/objectloader2/src/types/types.ts | 2 ++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/objectloader2/src/core/objectLoader2Factory.ts b/packages/objectloader2/src/core/objectLoader2Factory.ts index 0e0d887ca..8dc845a27 100644 --- a/packages/objectloader2/src/core/objectLoader2Factory.ts +++ b/packages/objectloader2/src/core/objectLoader2Factory.ts @@ -1,5 +1,5 @@ import { CustomLogger, getFeatureFlag, ObjectLoader2Flags } from '../types/functions.js' -import { Base } from '../types/types.js' +import { Base, ObjectAttributeMask } from '../types/types.js' import { ObjectLoader2 } from './objectLoader2.js' import { IndexedDatabase } from './stages/indexedDatabase.js' import { MemoryDatabase } from './stages/memory/memoryDatabase.js' @@ -40,6 +40,7 @@ export class ObjectLoader2Factory { token?: string headers?: Headers options?: ObjectLoader2FactoryOptions + attributeMask: ObjectAttributeMask }): ObjectLoader2 { const log = ObjectLoader2Factory.getLogger(params.options?.logger) let database @@ -67,6 +68,7 @@ export class ObjectLoader2Factory { objectId: params.objectId, token: params.token, headers: params.headers, + attributeMask: params.attributeMask, logger: log || ((): void => {}) }), database, diff --git a/packages/objectloader2/src/core/stages/serverDownloader.ts b/packages/objectloader2/src/core/stages/serverDownloader.ts index b7216f3cc..b1b38f01b 100644 --- a/packages/objectloader2/src/core/stages/serverDownloader.ts +++ b/packages/objectloader2/src/core/stages/serverDownloader.ts @@ -2,7 +2,7 @@ import BatchingQueue from '../../queues/batchingQueue.js' import Queue from '../../queues/queue.js' import { ObjectLoaderRuntimeError } from '../../types/errors.js' import { CustomLogger, Fetcher, indexOf, isBase, take } from '../../types/functions.js' -import { Item } from '../../types/types.js' +import { Item, ObjectAttributeMask } from '../../types/types.js' import { Downloader } from '../interfaces.js' export interface ServerDownloaderOptions { @@ -13,6 +13,7 @@ export interface ServerDownloaderOptions { headers?: Headers logger: CustomLogger fetch?: Fetcher + attributeMask: ObjectAttributeMask } const MAX_SAFARI_DECODE_BYTES = 2 * 1024 * 1024 * 1024 - 1024 * 1024 // 2GB minus a margin @@ -118,11 +119,12 @@ Chrome's behavior: Chrome generally handles larger data sizes without this speci const start = performance.now() this.#logger(`Downloading batch of ${batch.length} items...`) + const attributeMask = this.#options.attributeMask const keys = new Set(batch) const response = await this.#fetch(url, { method: 'POST', headers: { ...headers, 'Content-Type': 'application/json' }, - body: JSON.stringify({ objectIds: batch }) + body: JSON.stringify({ objectIds: batch, attributeMask }) }) this.#validateResponse(response) diff --git a/packages/objectloader2/src/types/types.ts b/packages/objectloader2/src/types/types.ts index a482b89f0..2b26ab29a 100644 --- a/packages/objectloader2/src/types/types.ts +++ b/packages/objectloader2/src/types/types.ts @@ -19,3 +19,5 @@ export interface Reference { export interface DataChunk extends Base { data?: Base[] } + +export type ObjectAttributeMask = { include: string[] } | { exclude: string[] } From b0ed356282df000f8d4e0cb5b268d2c4f33a9af9 Mon Sep 17 00:00:00 2001 From: Gergo Jedlicska Date: Wed, 17 Sep 2025 18:00:59 +0200 Subject: [PATCH 6/7] feat(objectloader2): make attribute masking optional --- packages/objectloader2/src/core/objectLoader2Factory.ts | 2 +- packages/objectloader2/src/core/stages/serverDownloader.ts | 2 +- packages/objectloader2/src/types/types.ts | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/objectloader2/src/core/objectLoader2Factory.ts b/packages/objectloader2/src/core/objectLoader2Factory.ts index 8dc845a27..74cdf8c22 100644 --- a/packages/objectloader2/src/core/objectLoader2Factory.ts +++ b/packages/objectloader2/src/core/objectLoader2Factory.ts @@ -40,7 +40,7 @@ export class ObjectLoader2Factory { token?: string headers?: Headers options?: ObjectLoader2FactoryOptions - attributeMask: ObjectAttributeMask + attributeMask?: ObjectAttributeMask }): ObjectLoader2 { const log = ObjectLoader2Factory.getLogger(params.options?.logger) let database diff --git a/packages/objectloader2/src/core/stages/serverDownloader.ts b/packages/objectloader2/src/core/stages/serverDownloader.ts index b1b38f01b..da869776f 100644 --- a/packages/objectloader2/src/core/stages/serverDownloader.ts +++ b/packages/objectloader2/src/core/stages/serverDownloader.ts @@ -13,7 +13,7 @@ export interface ServerDownloaderOptions { headers?: Headers logger: CustomLogger fetch?: Fetcher - attributeMask: ObjectAttributeMask + attributeMask?: ObjectAttributeMask } const MAX_SAFARI_DECODE_BYTES = 2 * 1024 * 1024 * 1024 - 1024 * 1024 // 2GB minus a margin diff --git a/packages/objectloader2/src/types/types.ts b/packages/objectloader2/src/types/types.ts index 2b26ab29a..b5333b904 100644 --- a/packages/objectloader2/src/types/types.ts +++ b/packages/objectloader2/src/types/types.ts @@ -20,4 +20,7 @@ export interface DataChunk extends Base { data?: Base[] } -export type ObjectAttributeMask = { include: string[] } | { exclude: string[] } +export type ObjectAttributeMask = + | { include: string[] } + | { exclude: string[] } + | undefined From cb825f23cc52a1533cebd5949be6259d8d24c12e Mon Sep 17 00:00:00 2001 From: Gergo Jedlicska Date: Thu, 18 Sep 2025 14:27:12 +0200 Subject: [PATCH 7/7] fix(server): remove unused imports --- packages/server/modules/core/rest/speckleObjectsStream.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/server/modules/core/rest/speckleObjectsStream.ts b/packages/server/modules/core/rest/speckleObjectsStream.ts index b2b2d9f53..6b3e168eb 100644 --- a/packages/server/modules/core/rest/speckleObjectsStream.ts +++ b/packages/server/modules/core/rest/speckleObjectsStream.ts @@ -1,9 +1,7 @@ import { ensureError } from '@speckle/shared' -import { filter } from 'compression' import { omit, pick } from 'lodash-es' import type { TransformCallback } from 'stream' import { Transform } from 'stream' -import { object } from 'zod' /** * A stream that converts database objects stream to "{id}\t{data_json}\n" stream or a json stream of obj.data fields