diff --git a/mise.toml b/mise.toml index d4d178d5e..b7c4030dd 100644 --- a/mise.toml +++ b/mise.toml @@ -1,5 +1,6 @@ [tools] node = "22.19.0" +python = "3.12.11" [env] SHARP_IGNORE_GLOBAL_LIBVIPS = 1 diff --git a/packages/objectloader2/src/core/objectLoader2Factory.ts b/packages/objectloader2/src/core/objectLoader2Factory.ts index 4621e9666..6c716d8ab 100644 --- a/packages/objectloader2/src/core/objectLoader2Factory.ts +++ b/packages/objectloader2/src/core/objectLoader2Factory.ts @@ -4,7 +4,7 @@ import { getFeatureFlag, ObjectLoader2Flags } from '../types/functions.js' -import { Base } from '../types/types.js' +import { Base, ObjectAttributeMask } from '../types/types.js' import { ObjectLoader2 } from './objectLoader2.js' import { IndexedDatabase } from './stages/indexedDatabase.js' import { MemoryDatabase } from './stages/memory/memoryDatabase.js' @@ -48,6 +48,7 @@ export class ObjectLoader2Factory { token?: string headers?: Headers options?: ObjectLoader2FactoryOptions + attributeMask?: ObjectAttributeMask }): ObjectLoader2 { const log = ObjectLoader2Factory.getLogger(params.options?.logger) let database diff --git a/packages/objectloader2/src/core/stages/serverDownloader.ts b/packages/objectloader2/src/core/stages/serverDownloader.ts index 2873922d6..da869776f 100644 --- a/packages/objectloader2/src/core/stages/serverDownloader.ts +++ b/packages/objectloader2/src/core/stages/serverDownloader.ts @@ -2,7 +2,7 @@ import BatchingQueue from '../../queues/batchingQueue.js' import Queue from '../../queues/queue.js' import { ObjectLoaderRuntimeError } from '../../types/errors.js' import { CustomLogger, Fetcher, indexOf, isBase, take } from '../../types/functions.js' -import { Item } from '../../types/types.js' +import { Item, ObjectAttributeMask } from '../../types/types.js' import { Downloader } from '../interfaces.js' export interface ServerDownloaderOptions { @@ -13,6 +13,7 @@ export interface ServerDownloaderOptions { headers?: Headers logger: CustomLogger fetch?: Fetcher + attributeMask?: ObjectAttributeMask } const MAX_SAFARI_DECODE_BYTES = 2 * 1024 * 1024 * 1024 - 1024 * 1024 // 2GB minus a margin @@ -51,9 +52,10 @@ export default class ServerDownloader implements Downloader { if (this.#options.token) { this.#headers['Authorization'] = `Bearer ${this.#options.token}` } - this.#requestUrlChildren = `${this.#options.serverUrl}/api/getobjects/${ + this.#requestUrlChildren = `${this.#options.serverUrl}/api/v2/projects/${ this.#options.streamId - }` + }/object-stream/` + this.#requestUrlRootObj = `${this.#options.serverUrl}/objects/${ this.#options.streamId }/${this.#options.objectId}/single` @@ -117,11 +119,12 @@ Chrome's behavior: Chrome generally handles larger data sizes without this speci const start = performance.now() this.#logger(`Downloading batch of ${batch.length} items...`) + const attributeMask = this.#options.attributeMask const keys = new Set(batch) const response = await this.#fetch(url, { method: 'POST', headers: { ...headers, 'Content-Type': 'application/json' }, - body: JSON.stringify({ objects: JSON.stringify(batch) }) + body: JSON.stringify({ objectIds: batch, attributeMask }) }) this.#validateResponse(response) diff --git a/packages/objectloader2/src/types/types.ts b/packages/objectloader2/src/types/types.ts index a482b89f0..b5333b904 100644 --- a/packages/objectloader2/src/types/types.ts +++ b/packages/objectloader2/src/types/types.ts @@ -19,3 +19,8 @@ export interface Reference { export interface DataChunk extends Base { data?: Base[] } + +export type ObjectAttributeMask = + | { include: string[] } + | { exclude: string[] } + | undefined diff --git a/packages/server/modules/core/repositories/objects.ts b/packages/server/modules/core/repositories/objects.ts index de2068c23..e40bfd584 100644 --- a/packages/server/modules/core/repositories/objects.ts +++ b/packages/server/modules/core/repositories/objects.ts @@ -176,6 +176,18 @@ export const getObjectsStreamFactory = return res.stream({ highWaterMark: 500 }) } +export const getProjectObjectStreamFactory = + (deps: { db: Knex }) => + ({ projectId, objectIds }: { projectId: string; objectIds: string[] }) => { + const res = tables + .objects(deps.db) + .whereIn('id', objectIds) + .andWhere({ streamId: projectId }) + .orderBy('id') + .select(knex.raw('"id", data::text as "dataText"')) + return res.stream({}) + } + export const hasObjectsFactory = (deps: { db: Knex }): HasObjects => async ({ streamId, objectIds }) => { diff --git a/packages/server/modules/core/rest/diffDownload.ts b/packages/server/modules/core/rest/diffDownload.ts index ecb3ac78d..8e1c836f0 100644 --- a/packages/server/modules/core/rest/diffDownload.ts +++ b/packages/server/modules/core/rest/diffDownload.ts @@ -1,9 +1,15 @@ import zlib from 'zlib' import { corsMiddlewareFactory } from '@/modules/core/configs/cors' import type { Application } from 'express' -import { SpeckleObjectsStream } from '@/modules/core/rest/speckleObjectsStream' +import { + objectDataTransformFactory, + SpeckleObjectsStream +} from '@/modules/core/rest/speckleObjectsStream' import { pipeline, PassThrough } from 'stream' -import { getObjectsStreamFactory } from '@/modules/core/repositories/objects' +import { + getObjectsStreamFactory, + getProjectObjectStreamFactory +} from '@/modules/core/repositories/objects' import { db } from '@/db/knex' import { validatePermissionsReadStreamFactory } from '@/modules/core/services/streams/auth' import { getStreamFactory } from '@/modules/core/repositories/streams' @@ -12,6 +18,15 @@ import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { UserInputError } from '@/modules/core/errors/userinput' import { ensureError } from '@speckle/shared' import { DatabaseError } from '@/modules/shared/errors' +import { validateRequest } from 'zod-express' +import { z } from 'zod' +import { authMiddlewareCreator } from '@/modules/shared/middleware' +import { + allowAnonymousUsersOnPublicStreams, + allowForRegisteredUsersOnPublicStreamsEvenWithoutRole, + streamReadPermissionsPipelineFactory +} from '@/modules/shared/authz' +import { chunk } from 'lodash-es' export default (app: Application) => { const validatePermissionsReadStream = validatePermissionsReadStreamFactory({ @@ -120,4 +135,100 @@ export default (app: Application) => { speckleObjStream.end() } }) + + const reqBody = z + .object({ + objectIds: z.string().array().min(1), + attributeMask: z + .union([ + // using strict objects here, to make the two types exclusive + z.object({ include: z.string().array().min(1) }).strict(), + z.object({ exclude: z.string().array().min(1) }).strict() + ]) + .optional() + }) + .strict() + + app.options('/api/v2/projects/:streamId/object-stream', corsMiddlewareFactory()) + app.post( + '/api/v2/projects/:streamId/object-stream', + corsMiddlewareFactory(), + authMiddlewareCreator([ + ...streamReadPermissionsPipelineFactory({ + getStream: getStreamFactory({ db }) + }), + allowForRegisteredUsersOnPublicStreamsEvenWithoutRole, + allowAnonymousUsersOnPublicStreams + ]), + validateRequest({ + body: reqBody + }), + async ({ body: { objectIds, attributeMask }, params: { streamId }, log }, res) => { + const projectId = streamId + const projectDb = await getProjectDbClient({ projectId }) + + const streamObjectsFromDb = getProjectObjectStreamFactory({ db: projectDb }) + + res.writeHead(200, { + 'Content-Encoding': 'gzip', + 'Content-Type': 'text/plain; charset=UTF-8' + }) + const objectDataTransform = objectDataTransformFactory({ attributeMask }) + + const gzipStream = zlib.createGzip() + //create the response pipeline here, but we're not sending chunks just yet + pipeline( + objectDataTransform, + gzipStream, + new PassThrough({ highWaterMark: 16384 * 31 }), + res, + (err) => { + if (err) { + switch (err.code) { + case 'ERR_STREAM_PREMATURE_CLOSE': + log.debug({ err }, 'Stream to client has prematurely closed') + break + default: + log.error(err, 'App error streaming objects') + break + } + return + } + log.info( + { + childCount: objectIds.length, + mbWritten: gzipStream.bytesWritten / 1000000 + }, + 'Streamed {childCount} objects (size: {mbWritten} MB)' + ) + } + ) + + // we start chunking objectId-s here and pipe data to the firts write stream in the pipeline + const maxBatchSize = 1000 + + // TODO, this could potentially be sped up a bit, if we concurrently + // pipe multiple db streams into the transform + try { + for (const objectIdChunk of chunk(objectIds, maxBatchSize)) { + const objectStream = streamObjectsFromDb({ + projectId, + objectIds: objectIdChunk + }) + await new Promise((resolve, reject) => { + objectStream.once('end', resolve) + objectStream.once('error', reject) + // this is here, to make sure event handlers are registerd before piping the stream + objectStream.pipe(objectDataTransform, { end: false }) + }) + } + } catch (err) { + log.error(err, `DB Error streaming objects`) + objectDataTransform.emit('error', new DatabaseError('Database streaming error')) + } finally { + // once we're done with streaming data from each chunk, we end the transform stream + objectDataTransform.end() + } + } + ) } diff --git a/packages/server/modules/core/rest/speckleObjectsStream.ts b/packages/server/modules/core/rest/speckleObjectsStream.ts index 2b17bccec..6b3e168eb 100644 --- a/packages/server/modules/core/rest/speckleObjectsStream.ts +++ b/packages/server/modules/core/rest/speckleObjectsStream.ts @@ -1,4 +1,5 @@ import { ensureError } from '@speckle/shared' +import { omit, pick } from 'lodash-es' import type { TransformCallback } from 'stream' import { Transform } from 'stream' @@ -47,5 +48,37 @@ class SpeckleObjectsStream extends Transform { callback() } } - export { SpeckleObjectsStream } + +export const objectDataTransformFactory = ({ + attributeMask +}: { + attributeMask?: { include: string[] } | { exclude: string[] } +}) => { + let objectTransform: ((dataText: string) => string) | null + if (attributeMask) { + let objectFilter: (obj: unknown, props: string[]) => unknown + let filteredAttributes: string[] + if ('include' in attributeMask) { + objectFilter = pick + filteredAttributes = attributeMask.include + } + if ('exclude' in attributeMask) { + objectFilter = omit + filteredAttributes = attributeMask.exclude + } + objectTransform = (dataText: string) => + JSON.stringify(objectFilter(JSON.parse(dataText), filteredAttributes)) + } + return new Transform({ + writableObjectMode: true, + transform({ dataText, id }: { dataText: string; id: string }, _, callback) { + try { + const objectDataString = objectTransform ? objectTransform(dataText) : dataText + callback(null, `${id}\t${objectDataString}\n`) + } catch (err) { + callback(ensureError(err)) + } + } + }) +}