import zlib from 'zlib' import { corsMiddlewareFactory } from '@/modules/core/configs/cors' import type { Application } from 'express' import { objectDataTransformFactory, SpeckleObjectsStream } from '@/modules/core/rest/speckleObjectsStream' import { pipeline, PassThrough } from 'stream' import { getObjectsStreamFactory, getProjectObjectStreamFactory } from '@/modules/core/repositories/objects' import { db } from '@/db/knex' import { validatePermissionsReadStreamFactory } from '@/modules/core/services/streams/auth' import { getStreamFactory } from '@/modules/core/repositories/streams' import { authorizeResolver, validateScopes } from '@/modules/shared' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { UserInputError } from '@/modules/core/errors/userinput' import { ensureError } from '@speckle/shared' import { DatabaseError } from '@/modules/shared/errors' import { validateRequest } from 'zod-express' import { z } from 'zod' import { authMiddlewareCreator } from '@/modules/shared/middleware' import { allowAnonymousUsersOnPublicStreams, allowForRegisteredUsersOnPublicStreamsEvenWithoutRole, streamReadPermissionsPipelineFactory } from '@/modules/shared/authz' import { chunk } from 'lodash-es' export default (app: Application) => { const validatePermissionsReadStream = validatePermissionsReadStreamFactory({ getStream: getStreamFactory({ db }), validateScopes, authorizeResolver }) app.options('/api/getobjects/:streamId', corsMiddlewareFactory()) app.post('/api/getobjects/:streamId', corsMiddlewareFactory(), async (req, res) => { req.log = req.log.child({ userId: req.context.userId || '-', streamId: req.params.streamId }) const hasStreamAccess = await validatePermissionsReadStream( req.params.streamId, req ) if (!hasStreamAccess.result) { return res.status(hasStreamAccess.status).end() } const projectDb = await getProjectDbClient({ projectId: req.params.streamId }) const getObjectsStream = getObjectsStreamFactory({ db: projectDb }) let childrenList: string[] try { childrenList = JSON.parse(req.body.objects) } catch (err) { throw new UserInputError( 'Invalid body. Please provide a JSON object containing the property "objects" of type string. The value must be a JSON string representation of an array of object IDs.', ensureError(err, 'Unknown JSON parsing issue') ) } const simpleText = req.headers.accept === 'text/plain' res.writeHead(200, { 'Content-Encoding': 'gzip', 'Content-Type': simpleText ? 'text/plain; charset=UTF-8' : 'application/json' }) // "output" stream, connected to res with `pipeline` (auto-closing res) const speckleObjStream = new SpeckleObjectsStream(simpleText) const gzipStream = zlib.createGzip() pipeline( speckleObjStream, gzipStream, new PassThrough({ highWaterMark: 16384 * 31 }), res, (err) => { if (err) { switch (err.code) { case 'ERR_STREAM_PREMATURE_CLOSE': req.log.debug({ err }, 'Stream to client has prematurely closed') break default: req.log.error(err, 'App error streaming objects') break } return } req.log.info( { childCount: childrenList.length, mbWritten: gzipStream.bytesWritten / 1000000 }, 'Streamed {childCount} objects (size: {mbWritten} MB)' ) } ) const cSize = 1000 try { for (let cStart = 0; cStart < childrenList.length; cStart += cSize) { if (!speckleObjStream.writable) break const childrenChunk = childrenList.slice(cStart, cStart + cSize) const dbStream = await getObjectsStream({ streamId: req.params.streamId, objectIds: childrenChunk }) // https://knexjs.org/faq/recipes.html#manually-closing-streams // https://github.com/knex/knex/issues/2324 const responseCloseHandler = () => { dbStream.end() dbStream.destroy() } dbStream.on('close', () => { res.removeListener('close', responseCloseHandler) }) res.on('close', responseCloseHandler) await new Promise((resolve, reject) => { dbStream.once('end', resolve) dbStream.once('error', reject) dbStream.pipe(speckleObjStream, { end: false }) // will not call end on the speckleObjStream, so it remains open for the next batch of objects }) } } catch (ex) { req.log.error(ex, `DB Error streaming objects`) speckleObjStream.emit('error', new DatabaseError('Database streaming error')) } finally { speckleObjStream.end() } }) const reqBody = z .object({ objectIds: z.string().array().min(1), attributeMask: z .union([ // using strict objects here, to make the two types exclusive z.object({ include: z.string().array().min(1) }).strict(), z.object({ exclude: z.string().array().min(1) }).strict() ]) .optional() }) .strict() app.options('/api/v2/projects/:streamId/object-stream', corsMiddlewareFactory()) app.post( '/api/v2/projects/:streamId/object-stream', corsMiddlewareFactory(), authMiddlewareCreator([ ...streamReadPermissionsPipelineFactory({ getStream: getStreamFactory({ db }) }), allowForRegisteredUsersOnPublicStreamsEvenWithoutRole, allowAnonymousUsersOnPublicStreams ]), validateRequest({ body: reqBody }), async ({ body: { objectIds, attributeMask }, params: { streamId }, log }, res) => { const projectId = streamId const projectDb = await getProjectDbClient({ projectId }) const streamObjectsFromDb = getProjectObjectStreamFactory({ db: projectDb }) res.writeHead(200, { 'Content-Encoding': 'gzip', 'Content-Type': 'text/plain; charset=UTF-8' }) const objectDataTransform = objectDataTransformFactory({ attributeMask }) const gzipStream = zlib.createGzip() //create the response pipeline here, but we're not sending chunks just yet pipeline( objectDataTransform, gzipStream, new PassThrough({ highWaterMark: 16384 * 31 }), res, (err) => { if (err) { switch (err.code) { case 'ERR_STREAM_PREMATURE_CLOSE': log.debug({ err }, 'Stream to client has prematurely closed') break default: log.error(err, 'App error streaming objects') break } return } log.info( { childCount: objectIds.length, mbWritten: gzipStream.bytesWritten / 1000000 }, 'Streamed {childCount} objects (size: {mbWritten} MB)' ) } ) // we start chunking objectId-s here and pipe data to the firts write stream in the pipeline const maxBatchSize = 1000 // TODO, this could potentially be sped up a bit, if we concurrently // pipe multiple db streams into the transform try { for (const objectIdChunk of chunk(objectIds, maxBatchSize)) { const objectStream = streamObjectsFromDb({ projectId, objectIds: objectIdChunk }) await new Promise((resolve, reject) => { objectStream.once('end', resolve) objectStream.once('error', reject) // this is here, to make sure event handlers are registerd before piping the stream objectStream.pipe(objectDataTransform, { end: false }) }) } } catch (err) { log.error(err, `DB Error streaming objects`) objectDataTransform.emit('error', new DatabaseError('Database streaming error')) } finally { // once we're done with streaming data from each chunk, we end the transform stream objectDataTransform.end() } } ) }